git.gnu.io has moved to IP address 209.51.188.249 -- please double check where you are logging in.

Commit 44bcc942 authored by Evan Prodromou's avatar Evan Prodromou

Break up stream code to use separate notice stream classes

Rearchitect (again!) notice stream code to delegate different functionality up and down the stack.

Now, different classes implement NoticeStream.
parent e0cccfc4
......@@ -79,82 +79,18 @@ class Fave extends Memcached_DataObject
function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array('Fave', '_streamDirect'),
array($user_id, $own),
($own) ? 'fave:ids_by_user_own:'.$user_id :
'fave:ids_by_user:'.$user_id);
$stream = new FaveNoticeStream($user_id, $own);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function idStream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $own=false, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array('Fave', '_streamDirect'),
array($user_id, $own),
($own) ? 'fave:ids_by_user_own:'.$user_id :
'fave:ids_by_user:'.$user_id);
$stream = new FaveNoticeStream($user_id, $own);
return $stream->getNoticeIds($offset, $limit, $since_id, $max_id);
}
/**
* Note that the sorting for this is by order of *fave* not order of *notice*.
*
* @fixme add since_id, max_id support?
*
* @param <type> $user_id
* @param <type> $own
* @param <type> $offset
* @param <type> $limit
* @param <type> $since_id
* @param <type> $max_id
* @return <type>
*/
function _streamDirect($user_id, $own, $offset, $limit, $since_id, $max_id)
{
$fav = new Fave();
$qry = null;
if ($own) {
$qry = 'SELECT fave.* FROM fave ';
$qry .= 'WHERE fave.user_id = ' . $user_id . ' ';
} else {
$qry = 'SELECT fave.* FROM fave ';
$qry .= 'INNER JOIN notice ON fave.notice_id = notice.id ';
$qry .= 'WHERE fave.user_id = ' . $user_id . ' ';
$qry .= 'AND notice.is_local != ' . Notice::GATEWAY . ' ';
}
if ($since_id != 0) {
$qry .= 'AND notice_id > ' . $since_id . ' ';
}
if ($max_id != 0) {
$qry .= 'AND notice_id <= ' . $max_id . ' ';
}
// NOTE: we sort by fave time, not by notice time!
$qry .= 'ORDER BY modified DESC ';
if (!is_null($offset)) {
$qry .= "LIMIT $limit OFFSET $offset";
}
$fav->query($qry);
$ids = array();
while ($fav->fetch()) {
$ids[] = $fav->notice_id;
}
$fav->free();
unset($fav);
return $ids;
}
function asActivity()
{
$notice = Notice::staticGet('id', $this->notice_id);
......
......@@ -449,53 +449,10 @@ class File extends Memcached_DataObject
function stream($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array($this, '_streamDirect'),
array(),
'file:notice-ids:'.$this->url);
$stream = new FileNoticeStream($this);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
/**
* Stream of notices linking to this URL
*
* @param integer $offset Offset to show; default is 0
* @param integer $limit Limit of notices to show
* @param integer $since_id Since this notice
* @param integer $max_id Before this notice
*
* @return array ids of notices that link to this file
*/
function _streamDirect($offset, $limit, $since_id, $max_id)
{
$f2p = new File_to_post();
$f2p->selectAdd();
$f2p->selectAdd('post_id');
$f2p->file_id = $this->id;
Notice::addWhereSinceId($f2p, $since_id, 'post_id', 'modified');
Notice::addWhereMaxId($f2p, $max_id, 'post_id', 'modified');
$f2p->orderBy('modified DESC, post_id DESC');
if (!is_null($offset)) {
$f2p->limit($offset, $limit);
}
$ids = array();
if ($f2p->find()) {
while ($f2p->fetch()) {
$ids[] = $f2p->post_id;
}
}
return $ids;
}
function noticeCount()
{
$cacheKey = sprintf('file:notice-count:%d', $this->id);
......
......@@ -45,7 +45,7 @@ require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
/* We keep 200 notices, the max number of notices available per API request,
* in the memcached cache. */
define('NOTICE_CACHE_WINDOW', NoticeStream::CACHE_WINDOW);
define('NOTICE_CACHE_WINDOW', CachingNoticeStream::CACHE_WINDOW);
define('MAX_BOXCARS', 128);
......@@ -548,7 +548,7 @@ class Notice extends Memcached_DataObject
if (empty($profile)) {
return false;
}
$notice = $profile->getNotices(0, NoticeStream::CACHE_WINDOW);
$notice = $profile->getNotices(0, CachingNoticeStream::CACHE_WINDOW);
if (!empty($notice)) {
$last = 0;
while ($notice->fetch()) {
......@@ -632,92 +632,18 @@ class Notice extends Memcached_DataObject
function publicStream($offset=0, $limit=20, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array('Notice', '_publicStreamDirect'),
array(),
'public');
$stream = new PublicNoticeStream();
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function _publicStreamDirect($offset=0, $limit=20, $since_id=0, $max_id=0)
{
$notice = new Notice();
$notice->selectAdd(); // clears it
$notice->selectAdd('id');
$notice->orderBy('created DESC, id DESC');
if (!is_null($offset)) {
$notice->limit($offset, $limit);
}
if (common_config('public', 'localonly')) {
$notice->whereAdd('is_local = ' . Notice::LOCAL_PUBLIC);
} else {
// -1 == blacklisted, -2 == gateway (i.e. Twitter)
$notice->whereAdd('is_local !='. Notice::LOCAL_NONPUBLIC);
$notice->whereAdd('is_local !='. Notice::GATEWAY);
}
Notice::addWhereSinceId($notice, $since_id);
Notice::addWhereMaxId($notice, $max_id);
$ids = array();
if ($notice->find()) {
while ($notice->fetch()) {
$ids[] = $notice->id;
}
}
$notice->free();
$notice = NULL;
return $ids;
}
function conversationStream($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array('Notice', '_conversationStreamDirect'),
array($id),
'notice:conversation_ids:'.$id);
$stream = new ConversationNoticeStream($id);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function _conversationStreamDirect($id, $offset=0, $limit=20, $since_id=0, $max_id=0)
{
$notice = new Notice();
$notice->selectAdd(); // clears it
$notice->selectAdd('id');
$notice->conversation = $id;
$notice->orderBy('created DESC, id DESC');
if (!is_null($offset)) {
$notice->limit($offset, $limit);
}
Notice::addWhereSinceId($notice, $since_id);
Notice::addWhereMaxId($notice, $max_id);
$ids = array();
if ($notice->find()) {
while ($notice->fetch()) {
$ids[] = $notice->id;
}
}
$notice->free();
$notice = NULL;
return $ids;
}
/**
* Is this notice part of an active conversation?
*
......
......@@ -38,42 +38,11 @@ class Notice_tag extends Memcached_DataObject
static function getStream($tag, $offset=0, $limit=20, $sinceId=0, $maxId=0)
{
$stream = new NoticeStream(array('Notice_tag', '_streamDirect'),
array($tag),
'notice_tag:notice_ids:' . Cache::keyize($tag));
$stream = new TagNoticeStream($tag);
return $stream->getNotices($offset, $limit, $sinceId, $maxId);
}
function _streamDirect($tag, $offset, $limit, $since_id, $max_id)
{
$nt = new Notice_tag();
$nt->tag = $tag;
$nt->selectAdd();
$nt->selectAdd('notice_id');
Notice::addWhereSinceId($nt, $since_id, 'notice_id');
Notice::addWhereMaxId($nt, $max_id, 'notice_id');
$nt->orderBy('created DESC, notice_id DESC');
if (!is_null($offset)) {
$nt->limit($offset, $limit);
}
$ids = array();
if ($nt->find()) {
while ($nt->fetch()) {
$ids[] = $nt->notice_id;
}
}
return $ids;
}
function blowCache($blowLast=false)
{
self::blow('notice_tag:notice_ids:%s', Cache::keyize($this->tag));
......
......@@ -198,90 +198,18 @@ class Profile extends Memcached_DataObject
function getTaggedNotices($tag, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array($this, '_streamTaggedDirect'),
array($tag),
'profile:notice_ids_tagged:'.$this->id.':'.$tag);
$stream = new TaggedProfileNoticeStream($this, $tag);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function getNotices($offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array($this, '_streamDirect'),
array(),
'profile:notice_ids:' . $this->id);
$stream = new ProfileNoticeStream($this);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function _streamTaggedDirect($tag, $offset, $limit, $since_id, $max_id)
{
// XXX It would be nice to do this without a join
// (necessary to do it efficiently on accounts with long history)
$notice = new Notice();
$query =
"select id from notice join notice_tag on id=notice_id where tag='".
$notice->escape($tag) .
"' and profile_id=" . intval($this->id);
$since = Notice::whereSinceId($since_id, 'id', 'notice.created');
if ($since) {
$query .= " and ($since)";
}
$max = Notice::whereMaxId($max_id, 'id', 'notice.created');
if ($max) {
$query .= " and ($max)";
}
$query .= ' order by notice.created DESC, id DESC';
if (!is_null($offset)) {
$query .= " LIMIT " . intval($limit) . " OFFSET " . intval($offset);
}
$notice->query($query);
$ids = array();
while ($notice->fetch()) {
$ids[] = $notice->id;
}
return $ids;
}
function _streamDirect($offset, $limit, $since_id, $max_id)
{
$notice = new Notice();
$notice->profile_id = $this->id;
$notice->selectAdd();
$notice->selectAdd('id');
Notice::addWhereSinceId($notice, $since_id);
Notice::addWhereMaxId($notice, $max_id);
$notice->orderBy('created DESC, id DESC');
if (!is_null($offset)) {
$notice->limit($offset, $limit);
}
$notice->find();
$ids = array();
while ($notice->fetch()) {
$ids[] = $notice->id;
}
return $ids;
}
function isMember($group)
{
$mem = new Group_member();
......@@ -551,7 +479,7 @@ class Profile extends Memcached_DataObject
// This is the stream of favorite notices, in rev chron
// order. This forces it into cache.
$ids = Fave::idStream($this->id, 0, NoticeStream::CACHE_WINDOW);
$ids = Fave::idStream($this->id, 0, CachingNoticeStream::CACHE_WINDOW);
// If it's in the list, then it's a fave
......@@ -563,7 +491,7 @@ class Profile extends Memcached_DataObject
// then the cache has all available faves, so this one
// is not a fave.
if (count($ids) < NoticeStream::CACHE_WINDOW) {
if (count($ids) < CachingNoticeStream::CACHE_WINDOW) {
return false;
}
......
......@@ -38,35 +38,8 @@ class Reply extends Memcached_DataObject
function stream($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{
$stream = new NoticeStream(array('Reply', '_streamDirect'),
array($user_id),
'reply:stream:' . $user_id);
$stream = new ReplyNoticeStream($user_id);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function _streamDirect($user_id, $offset=0, $limit=NOTICES_PER_PAGE, $since_id=0, $max_id=0)
{
$reply = new Reply();
$reply->profile_id = $user_id;
Notice::addWhereSinceId($reply, $since_id, 'notice_id', 'modified');
Notice::addWhereMaxId($reply, $max_id, 'notice_id', 'modified');
$reply->orderBy('modified DESC, notice_id DESC');
if (!is_null($offset)) {
$reply->limit($offset, $limit);
}
$ids = array();
if ($reply->find()) {
while ($reply->fetch()) {
$ids[] = $reply->notice_id;
}
}
return $ids;
}
}
......@@ -767,93 +767,18 @@ class User extends Memcached_DataObject
function repeatedByMe($offset=0, $limit=20, $since_id=null, $max_id=null)
{
$stream = new NoticeStream(array($this, '_repeatedByMeDirect'),
array(),
'user:repeated_by_me:'.$this->id);
$stream = new RepeatedByMeNoticeStream($this);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function _repeatedByMeDirect($offset, $limit, $since_id, $max_id)
{
$notice = new Notice();
$notice->selectAdd(); // clears it
$notice->selectAdd('id');
$notice->profile_id = $this->id;
$notice->whereAdd('repeat_of IS NOT NULL');
$notice->orderBy('created DESC, id DESC');
if (!is_null($offset)) {
$notice->limit($offset, $limit);
}
Notice::addWhereSinceId($notice, $since_id);
Notice::addWhereMaxId($notice, $max_id);
$ids = array();
if ($notice->find()) {
while ($notice->fetch()) {
$ids[] = $notice->id;
}
}
$notice->free();
$notice = NULL;
return $ids;
}
function repeatsOfMe($offset=0, $limit=20, $since_id=null, $max_id=null)
{
$stream = new NoticeStream(array($this, '_repeatsOfMeDirect'),
array(),
'user:repeats_of_me:'.$this->id);
$stream = new RepeatsOfMeNoticeStream($this);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function _repeatsOfMeDirect($offset, $limit, $since_id, $max_id)
{
$qry =
'SELECT DISTINCT original.id AS id ' .
'FROM notice original JOIN notice rept ON original.id = rept.repeat_of ' .
'WHERE original.profile_id = ' . $this->id . ' ';
$since = Notice::whereSinceId($since_id, 'original.id', 'original.created');
if ($since) {
$qry .= "AND ($since) ";
}
$max = Notice::whereMaxId($max_id, 'original.id', 'original.created');
if ($max) {
$qry .= "AND ($max) ";
}
$qry .= 'ORDER BY original.created, original.id DESC ';
if (!is_null($offset)) {
$qry .= "LIMIT $limit OFFSET $offset";
}
$ids = array();
$notice = new Notice();
$notice->query($qry);
while ($notice->fetch()) {
$ids[] = $notice->id;
}
$notice->free();
$notice = NULL;
return $ids;
}
function repeatedToMe($offset=0, $limit=20, $since_id=null, $max_id=null)
{
......
......@@ -87,41 +87,11 @@ class User_group extends Memcached_DataObject
function getNotices($offset, $limit, $since_id=null, $max_id=null)
{
$stream = new NoticeStream(array($this, '_streamDirect'),
array(),
'user_group:notice_ids:' . $this->id);
$stream = new GroupNoticeStream($this);
return $stream->getNotices($offset, $limit, $since_id, $max_id);
}
function _streamDirect($offset, $limit, $since_id, $max_id)
{
$inbox = new Group_inbox();
$inbox->group_id = $this->id;
$inbox->selectAdd();
$inbox->selectAdd('notice_id');
Notice::addWhereSinceId($inbox, $since_id, 'notice_id');
Notice::addWhereMaxId($inbox, $max_id, 'notice_id');
$inbox->orderBy('created DESC, notice_id DESC');
if (!is_null($offset)) {
$inbox->limit($offset, $limit);
}
$ids = array();
if ($inbox->find()) {
while ($inbox->fetch()) {
$ids[] = $inbox->notice_id;
}
}
return $ids;
}
function allowedNickname($nickname)
{
......
<?php
/**
* StatusNet - the distributed open-source microblogging tool
* Copyright (C) 2011, StatusNet, Inc.
*
* A stream of notices
*
* PHP version 5
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @category Stream
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
* @copyright 2011 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
* @link http://status.net/
*/
if (!defined('STATUSNET')) {
// This check helps protect against security problems;
// your code file can't be executed directly from the web.
exit(1);
}
/**
* Class for notice streams
*
* @category Stream
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
* @copyright 2011 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html AGPL 3.0
* @link http://status.net/
*/
class CachingNoticeStream extends NoticeStream
{
const CACHE_WINDOW = 200;
public $stream = null;
public $cachekey = null;
function __construct($stream, $cachekey)
{
$this->stream = $stream;
$this->cachekey = $cachekey;
}
function getNoticeIds($offset=0, $limit=20, $sinceId=0, $maxId=0)
{
$cache = Cache::instance();
// We cache self::CACHE_WINDOW elements at the tip of the stream.
// If the cache won't be hit, just generate directly.
if (empty($cache) ||
$sinceId != 0 || $maxId != 0 ||
is_null($limit) ||
($offset + $limit) > self::CACHE_WINDOW) {
return $this->stream->getNoticeIds($offset, $limit, $sinceId, $maxId);
}
// Check the cache to see if we have the stream.
$idkey = Cache::key($this->cachekey);
$idstr = $cache->get($idkey);
if ($idstr !== false) {
// Cache hit! Woohoo!
$window = explode(',', $idstr);
$ids = array_slice($window, $offset, $limit);
return $ids;
}
// Check the cache to see if we have a "last-known-good" version.
// The actual cache gets blown away when new notices are added, but
// the "last" value holds a lot of info. We might need to only generate
// a few at the "tip", which can bound our queries and save lots
// of time.
$laststr = $cache->get($idkey.';last');
if ($laststr !== false) {
$window = explode(',', $laststr);
$last_id = $window[0];
$new_ids = $this->stream->getNoticeIds(0, self::CACHE_WINDOW, $last_id, 0);
$new_window = array_merge($new_ids, $window);
$new_windowstr = implode(',', $new_window);
$result = $cache->set($idkey, $new_windowstr);
$result = $cache->set($idkey . ';last', $new_windowstr);
$ids = array_slice($new_window, $offset, $limit);
return $ids;
}
// No cache hits :( Generate directly and stick the results
// into the cache. Note we generate the full cache window.
$window = $this->stream->getNoticeIds(0, self::CACHE_WINDOW, 0, 0);
$windowstr = implode(',', $window);
$result = $cache->set($idkey, $windowstr);
$result = $cache->set($idkey . ';last', $windowstr);
// Return just the slice that was requested
$ids = array_slice($window, $offset, $limit);
return $ids;
}
}
<?php
class ConversationNoticeStream extends CachingNoticeStream
{
function __construct($id)
{
parent::__construct(new RawConversationNoticeStream($id),
'notice:conversation_ids:'.$id);
}
}
class RawConversationNoticeStream extends NoticeStream
{
protected $id;
function __construct($id)
{
$this->id = $id;
}