Commit 0e852def authored by Brion Vibber's avatar Brion Vibber

XMPP queued output & initial retooling of DB queue manager to support non-Notice objects.

Queue handlers for XMPP individual & firehose output now send their XML stanzas
to another output queue instead of connecting directly to the chat server. This
lets us have as many general processing threads as we need, while all actual
XMPP input and output go through a single daemon with a single connection open.

This avoids problems with multiple connected resources:
* multiple windows shown in some chat clients (psi, gajim, kopete)
* extra load on server
* incoming message delivery forwarding issues

Database changes:
* queue_item drops 'notice_id' in favor of a 'frame' blob.
  This is based on Craig Andrews' work branch to generalize queues to take any
  object, but conservatively leaving out the serialization for now.
  Table updater (preserves any existing queued items) in db/rc3to09.sql

Code changes to watch out for:
* Queue handlers should now define a handle() method instead of handle_notice()
* QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning
  thread management (RespawningDaemon) infrastructure.
* The polling XmppConfirmManager has been dropped, as the message is queued
  directly when saving IM settings.
* Enable $config['queue']['debug_memory'] to output current memory usage at
  each run through the event loop to watch for memory leaks

To do:
* Adapt XMPP i/o to component connection mode for multi-site support.
* XMPP input can also be broken out to a queue, which would allow the actual
  notice save etc to be handled by general queue threads.
* Make sure there are no problems with simply pushing serialized Notice objects
  to queues.
* Find a way to improve interactive performance of the database-backed queue
  handler; polling is pretty painful to XMPP.
* Possibly redo the way QueueHandlers are injected into a QueueManager. The
  grouping used to split out the XMPP output queue is a bit awkward.
parent 0bb23e6f
......@@ -309,6 +309,8 @@ class ImsettingsAction extends ConnectSettingsAction
$confirm->address_type = 'jabber';
$confirm->user_id = $user->id;
$confirm->code = common_confirmation_code(64);
$confirm->sent = common_sql_now();
$confirm->claimed = common_sql_now();
$result = $confirm->insert();
......@@ -318,11 +320,9 @@ class ImsettingsAction extends ConnectSettingsAction
return;
}
if (!common_config('queue', 'enabled')) {
jabber_confirm_address($confirm->code,
$user->nickname,
$jabber);
}
jabber_confirm_address($confirm->code,
$user->nickname,
$jabber);
$msg = sprintf(_('A confirmation code was sent '.
'to the IM address you added. '.
......
......@@ -10,8 +10,8 @@ class Queue_item extends Memcached_DataObject
/* the code below is auto generated do not remove the above tag */
public $__table = 'queue_item'; // table name
public $notice_id; // int(4) primary_key not_null
public $transport; // varchar(8) primary_key not_null
public $id; // int(4) primary_key not_null
public $frame; // blob not_null
public $created; // datetime() not_null
public $claimed; // datetime()
......@@ -22,14 +22,21 @@ class Queue_item extends Memcached_DataObject
/* the code above is auto generated do not remove the tag below */
###END_AUTOCODE
function sequenceKey()
{ return array(false, false); }
static function top($transport=null) {
/**
* @param mixed $transports name of a single queue or array of queues to pull from
* If not specified, checks all queues in the system.
*/
static function top($transports=null) {
$qi = new Queue_item();
if ($transport) {
$qi->transport = $transport;
if ($transports) {
if (is_array($transports)) {
// @fixme use safer escaping
$list = implode("','", array_map('addslashes', $transports));
$qi->whereAdd("transport in ('$list')");
} else {
$qi->transport = $transports;
}
}
$qi->orderBy('created');
$qi->whereAdd('claimed is null');
......@@ -42,7 +49,7 @@ class Queue_item extends Memcached_DataObject
# XXX: potential race condition
# can we force it to only update if claimed is still null
# (or old)?
common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id .
common_log(LOG_INFO, 'claiming queue item id = ' . $qi->id .
' for transport ' . $qi->transport);
$orig = clone($qi);
$qi->claimed = common_sql_now();
......@@ -57,9 +64,4 @@ class Queue_item extends Memcached_DataObject
$qi = null;
return null;
}
function pkeyGet($kv)
{
return Memcached_DataObject::pkeyGet('Queue_item', $kv);
}
}
......@@ -396,14 +396,14 @@ tagged = K
tag = K
[queue_item]
notice_id = 129
id = 129
frame = 66
transport = 130
created = 142
claimed = 14
[queue_item__keys]
notice_id = K
transport = K
id = K
[related_group]
group_id = 129
......
......@@ -94,3 +94,19 @@ create table user_location_prefs (
constraint primary key (user_id)
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
create table queue_item_new (
id integer auto_increment primary key comment 'unique identifier',
frame blob not null comment 'data: object reference or opaque string',
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
created datetime not null comment 'date this record was created',
claimed datetime comment 'date this item was claimed',
index queue_item_created_idx (created)
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
insert into queue_item_new (frame,transport,created,claimed)
select notice_id,transport,created,claimed from queue_item;
alter table queue_item rename to queue_item_old;
alter table queue_item_new rename to queue_item;
create table queue_item_new (
id integer auto_increment primary key comment 'unique identifier',
frame blob not null comment 'data: object reference or opaque string',
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
created datetime not null comment 'date this record was created',
claimed datetime comment 'date this item was claimed',
index queue_item_created_idx (created)
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
insert into queue_item_new (frame,transport,created,claimed)
select notice_id,transport,created,claimed from queue_item;
alter table queue_item rename to queue_item_old;
alter table queue_item_new rename to queue_item;
......@@ -244,13 +244,12 @@ create table remember_me (
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
create table queue_item (
notice_id integer not null comment 'notice queued' references notice (id),
id integer auto_increment primary key comment 'unique identifier',
frame blob not null comment 'data: object reference or opaque string',
transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
created datetime not null comment 'date this record was created',
claimed datetime comment 'date this item was claimed',
constraint primary key (notice_id, transport),
index queue_item_created_idx (created)
) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
......
......@@ -31,19 +31,17 @@
class DBQueueManager extends QueueManager
{
/**
* Saves a notice object reference into the queue item table.
* Saves an object reference into the queue item table.
* @return boolean true on success
* @throws ServerException on failure
*/
public function enqueue($object, $queue)
{
$notice = $object;
$qi = new Queue_item();
$qi->notice_id = $notice->id;
$qi->frame = $this->encode($object);
$qi->transport = $queue;
$qi->created = $notice->created;
$qi->created = common_sql_now();
$result = $qi->insert();
if (!$result) {
......@@ -57,146 +55,92 @@ class DBQueueManager extends QueueManager
}
/**
* Poll every minute for new events during idle periods.
* Poll every 10 seconds for new events during idle periods.
* We'll look in more often when there's data available.
*
* @return int seconds
*/
public function pollInterval()
{
return 60;
return 10;
}
/**
* Run a polling cycle during idle processing in the input loop.
* @return boolean true if we had a hit
* @return boolean true if we should poll again for more data immediately
*/
public function poll()
{
$this->_log(LOG_DEBUG, 'Checking for notices...');
$item = $this->_nextItem();
if ($item === false) {
$qi = Queue_item::top($this->getQueues());
if (empty($qi)) {
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
return false;
}
if ($item === true) {
// We dequeued an entry for a deleted or invalid notice.
// Consider it a hit for poll rate purposes.
return true;
}
list($queue, $notice) = $item;
$this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
// Yay! Got one!
$handler = $this->getHandler($queue);
if ($handler) {
if ($handler->handle_notice($notice)) {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
$this->_done($notice, $queue);
$queue = $qi->transport;
$item = $this->decode($qi->frame);
if ($item) {
$rep = $this->logrep($item);
$this->_log(LOG_INFO, "Got $rep for transport $queue");
$handler = $this->getHandler($queue);
if ($handler) {
if ($handler->handle($item)) {
$this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
$this->_done($qi);
} else {
$this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
$this->_fail($qi);
}
} else {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
$this->_fail($notice, $queue);
$this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
$this->_done($qi);
}
} else {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding.");
$this->_done($notice, $queue);
$this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding");
$this->_fail($qi);
}
return true;
}
/**
* Pop the oldest unclaimed item off the queue set and claim it.
*
* @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
* giving the queue transport name.
*/
protected function _nextItem()
{
$start = time();
$result = null;
$qi = Queue_item::top();
if (empty($qi)) {
return false;
}
$queue = $qi->transport;
$notice = Notice::staticGet('id', $qi->notice_id);
if (empty($notice)) {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
$qi->delete();
return true;
}
$result = $notice;
return array($queue, $notice);
}
/**
* Delete our claimed item from the queue after successful processing.
*
* @param Notice $object
* @param string $queue
* @param QueueItem $qi
*/
protected function _done($object, $queue)
protected function _done($qi)
{
// XXX: right now, we only handle notices
$notice = $object;
$qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
'transport' => $queue));
$queue = $qi->transport;
if (empty($qi)) {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
} else {
if (empty($qi->claimed)) {
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
}
$qi->delete();
$qi->free();
if (empty($qi->claimed)) {
$this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue");
}
$qi->delete();
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
$this->stats('handled', $queue);
$notice->free();
}
/**
* Free our claimed queue item for later reprocessing in case of
* temporary failure.
*
* @param Notice $object
* @param string $queue
* @param QueueItem $qi
*/
protected function _fail($object, $queue)
protected function _fail($qi)
{
// XXX: right now, we only handle notices
$notice = $object;
$qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
'transport' => $queue));
$queue = $qi->transport;
if (empty($qi)) {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
if (empty($qi->claimed)) {
$this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item");
} else {
if (empty($qi->claimed)) {
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
} else {
$orig = clone($qi);
$qi->claimed = null;
$qi->update($orig);
$qi = null;
}
$orig = clone($qi);
$qi->claimed = null;
$qi->update($orig);
}
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
$this->stats('error', $queue);
$notice->free();
}
protected function _log($level, $msg)
......
......@@ -83,6 +83,7 @@ $default =
'stomp_password' => null,
'monitor' => null, // URL to monitor ping endpoint (work in progress)
'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
'debug_memory' => false, // true to spit memory usage to log
),
'license' =>
array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private'
......
......@@ -27,7 +27,7 @@
* @link http://status.net/
*/
class IoMaster
abstract class IoMaster
{
public $id;
......@@ -66,23 +66,18 @@ class IoMaster
if ($site != common_config('site', 'server')) {
StatusNet::init($site);
}
$classes = array();
if (Event::handle('StartIoManagerClasses', array(&$classes))) {
$classes[] = 'QueueManager';
if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
$classes[] = 'XmppManager'; // handles pings/reconnects
$classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
}
}
Event::handle('EndIoManagerClasses', array(&$classes));
foreach ($classes as $class) {
$this->instantiate($class);
}
$this->initManagers();
}
}
/**
* Initialize IoManagers for the currently configured site
* which are appropriate to this instance.
*
* Pass class names into $this->instantiate()
*/
abstract function initManagers();
/**
* Pull all local sites from status_network table.
* @return array of hostnames
......@@ -170,7 +165,7 @@ class IoMaster
$write = array();
$except = array();
$this->logState('listening');
common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
$ready = stream_select($read, $write, $except, $timeout, 0);
if ($ready === false) {
......@@ -190,7 +185,7 @@ class IoMaster
if ($timeout > 0 && empty($sockets)) {
// If we had no listeners, sleep until the pollers' next requested wakeup.
common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
$this->logState('sleep');
sleep($timeout);
}
......@@ -207,6 +202,8 @@ class IoMaster
if ($usage > $memoryLimit) {
common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
break;
} else if (common_config('queue', 'debug_memory')) {
common_log(LOG_DEBUG, "Memory usage $usage");
}
}
}
......@@ -223,8 +220,7 @@ class IoMaster
{
$softLimit = trim(common_config('queue', 'softlimit'));
if (substr($softLimit, -1) == '%') {
$limit = trim(ini_get('memory_limit'));
$limit = $this->parseMemoryLimit($limit);
$limit = $this->parseMemoryLimit(ini_get('memory_limit'));
if ($limit > 0) {
return intval(substr($softLimit, 0, -1) * $limit / 100);
} else {
......@@ -242,9 +238,10 @@ class IoMaster
* @param string $mem
* @return int
*/
protected function parseMemoryLimit($mem)
public function parseMemoryLimit($mem)
{
// http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
$mem = strtolower(trim($mem));
$size = array('k' => 1024,
'm' => 1024*1024,
'g' => 1024*1024*1024);
......@@ -253,7 +250,7 @@ class IoMaster
} else if (is_numeric($mem)) {
return intval($mem);
} else {
$mult = strtolower(substr($mem, -1));
$mult = substr($mem, -1);
if (isset($size[$mult])) {
return substr($mem, 0, -1) * $size[$mult];
} else {
......
......@@ -85,6 +85,27 @@ class Sharing_XMPP extends XMPPHP_XMPP
}
}
/**
* Build an XMPP proxy connection that'll save outgoing messages
* to the 'xmppout' queue to be picked up by xmppdaemon later.
*/
function jabber_proxy()
{
$proxy = new Queued_XMPP(common_config('xmpp', 'host') ?
common_config('xmpp', 'host') :
common_config('xmpp', 'server'),
common_config('xmpp', 'port'),
common_config('xmpp', 'user'),
common_config('xmpp', 'password'),
common_config('xmpp', 'resource') . 'daemon',
common_config('xmpp', 'server'),
common_config('xmpp', 'debug') ?
true : false,
common_config('xmpp', 'debug') ?
XMPPHP_Log::LEVEL_VERBOSE : null);
return $proxy;
}
/**
* Lazy-connect the configured Jabber account to the configured server;
* if already opened, the same connection will be returned.
......@@ -143,7 +164,7 @@ function jabber_connect($resource=null)
}
/**
* send a single notice to a given Jabber address
* Queue send for a single notice to a given Jabber address
*
* @param string $to JID to send the notice to
* @param Notice $notice notice to send
......@@ -153,10 +174,7 @@ function jabber_connect($resource=null)
function jabber_send_notice($to, $notice)
{
$conn = jabber_connect();
if (!$conn) {
return false;
}
$conn = jabber_proxy();
$profile = Profile::staticGet($notice->profile_id);
if (!$profile) {
common_log(LOG_WARNING, 'Refusing to send notice with ' .
......@@ -221,10 +239,7 @@ function jabber_format_entry($profile, $notice)
function jabber_send_message($to, $body, $type='chat', $subject=null)
{
$conn = jabber_connect();
if (!$conn) {
return false;
}
$conn = jabber_proxy();
$conn->message($to, $body, $type, $subject);
return true;
}
......@@ -319,7 +334,7 @@ function jabber_special_presence($type, $to=null, $show=null, $status=null)
}
/**
* broadcast a notice to all subscribers and reply recipients
* Queue broadcast of a notice to all subscribers and reply recipients
*
* This function will send a notice to all subscribers on the local server
* who have Jabber addresses, and have Jabber notification enabled, and
......@@ -354,7 +369,7 @@ function jabber_broadcast_notice($notice)
$sent_to = array();
$conn = jabber_connect();
$conn = jabber_proxy();
$ni = $notice->whoGets();
......@@ -389,14 +404,13 @@ function jabber_broadcast_notice($notice)
'Sending notice ' . $notice->id . ' to ' . $user->jabber,
__FILE__);
$conn->message($user->jabber, $msg, 'chat', null, $entry);
$conn->processTime(0);
}
return true;
}
/**
* send a notice to all public listeners
* Queue send of a notice to all public listeners
*
* For notices that are generated on the local system (by users), we can optionally
* forward them to remote listeners by XMPP.
......@@ -429,7 +443,7 @@ function jabber_public_notice($notice)
$msg = jabber_format_notice($profile, $notice);
$entry = jabber_format_entry($profile, $notice);
$conn = jabber_connect();
$conn = jabber_proxy();
foreach ($public as $address) {
common_log(LOG_INFO,
......@@ -437,7 +451,6 @@ function jabber_public_notice($notice)
' to public listener ' . $address,
__FILE__);
$conn->message($address, $msg, 'chat', null, $entry);
$conn->processTime(0);
}
$profile->free();
}
......
......@@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler
return 'jabber';
}
function handle_notice($notice)
function handle($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_broadcast_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
exit(1);
return false;
}
}
}
......@@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler
* @fixme doesn't currently report failure back to the queue manager
* because omb_broadcast_notice() doesn't report it to us
*/
function handle_notice($notice)
function handle($notice)
{
if ($this->is_remote($notice)) {
$this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
......
......@@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler {
return 'ping';
}
function handle_notice($notice) {
function handle($notice) {
require_once INSTALLDIR . '/lib/ping.php';
return ping_broadcast_notice($notice);
}
......
......@@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler
return 'plugin';
}
function handle_notice($notice)
function handle($notice)
{
Event::handle('HandleQueuedNotice', array(&$notice));
return true;
......
......@@ -23,7 +23,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
/**
* Queue handler for pushing new notices to public XMPP subscribers.
* @fixme correct this exception handling
*/
class PublicQueueHandler extends QueueHandler
{
......@@ -33,15 +32,14 @@ class PublicQueueHandler extends QueueHandler
return 'public';
}
function handle_notice($notice)
function handle($notice)
{
require_once(INSTALLDIR.'/lib/jabber.php');
try {
return jabber_public_notice($notice);
} catch (XMPPHP_Exception $e) {
$this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
die($e->getMessage());
return false;
}
return true;
}
}
<?php
/**
* StatusNet, the distributed open-source microblogging tool
*
* Queue-mediated proxy class for outgoing XMPP messages.
*
* PHP version 5
*
* LICENCE: This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @category Network
* @package StatusNet
* @author Brion Vibber <brion@status.net>
* @copyright 2010 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
* @link http://status.net/
*/
if (!defined('STATUSNET') && !defined('LACONICA')) {
exit(1);
}
require_once INSTALLDIR . '/lib/jabber.php';
class Queued_XMPP extends XMPPHP_XMPP
{
/**
* Constructor
*
* @param string $host
* @param integer $port
* @param string $user
* @param string $password
* @param string $resource
* @param string $server
* @param boolean $printlog
* @param string $loglevel
*/
public function __construct($host, $port, $user, $password, $resource, $server = null, $printlog = false, $loglevel = null)
{
parent::__construct($host, $port, $user, $password, $resource, $server, $printlog, $loglevel);
// Normally the fulljid isn't filled out until resource binding time;
// we need to save it here since we're not talking to a real server.
$this->fulljid = "{$this->basejid}/{$this->resource}";
}
/**
* Send a formatted message to the outgoing queue for later forwarding
* to a real XMPP connection.
*
* @param string $msg
*/
public function send($msg, $timeout=NULL)
{
$qm = QueueManager::get();
$qm->enqueue(strval($msg), 'xmppout');
}
/**
* Since we'll be getting input through a queue system's run loop,
* we'll process one standalone message at a time rather than our
* own XMPP message pump.
*
* @param string $message
*/
public function processMessage($message) {
$frame = array_shift($this->frames);
xml_parse($this->parser, $frame->body, false);
}
//@{
/**