Commit ec145b73 authored by Brion Vibber's avatar Brion Vibber

Major refactoring of queue handlers to support running multiple sites in one daemon.

Key changes:
* Initialization code moved from common.php to StatusNet class;
  can now switch configurations during runtime.
* As a consequence, configuration files must now be idempotent...
  Be careful with constant, function or class definitions.
* Control structure for daemons/QueueManager/QueueHandler has been refactored;
  the run loop is now managed by IoMaster run via scripts/queuedaemon.php
  IoManager subclasses are woken to handle socket input or polling, and may
  cover multiple sites.
* Plugins can implement notice queue handlers more easily by registering a
  QueueHandler class; no more need to add a daemon.

The new QueueDaemon runs from scripts/queuedaemon.php:

* This replaces most of the old *handler.php scripts; they've been refactored
  to the bare handler classes.
* Spawns multiple child processes to spread load; defaults to CPU count on
  Linux and Mac OS X systems, or override with --threads=N
* When multithreaded, child processes are automatically respawned on failure.
* Threads gracefully shut down and restart when passing a soft memory limit
  (defaults to 90% of memory_limit), limiting damage from memory leaks.
* Support for UDP-based monitoring: http://www.gitorious.org/snqmon

Rough control flow diagram:
QueueDaemon -> IoMaster -> IoManager
                           QueueManager [listen or poll] -> QueueHandler
                           XmppManager [ping & keepalive]
                           XmppConfirmManager [poll updates]

Todo:

* Respawning features not currently available running single-threaded.
* When running single-site, configuration changes aren't picked up.
* New sites or config changes affecting queue subscriptions are not yet
  handled without a daemon restart.
* SNMP monitoring output to integrate with general tools (nagios, ganglia)
* Convert XMPP confirmation message sends to use stomp queue instead of polling
* Convert xmppdaemon.php to IoManager?
* Convert Twitter status, friends import polling daemons to IoManager
* Clean up some error reporting and failure modes
* May need to adjust queue priorities for best perf in backlog/flood cases

Detailed code history available in my daemon-work branch:
http://www.gitorious.org/~brion/statusnet/brion-fixes/commits/daemon-work
parent 2b10e359
......@@ -331,6 +331,29 @@ class Memcached_DataObject extends DB_DataObject
$exists = false;
}
// @fixme horrible evil hack!
//
// In multisite configuration we don't want to keep around a separate
// connection for every database; we could end up with thousands of
// connections open per thread. In an ideal world we might keep
// a connection per server and select different databases, but that'd
// be reliant on having the same db username/pass as well.
//
// MySQL connections are cheap enough we're going to try just
// closing out the old connection and reopening when we encounter
// a new DSN.
//
// WARNING WARNING if we end up actually using multiple DBs at a time
// we'll need some fancier logic here.
if (!$exists && !empty($_DB_DATAOBJECT['CONNECTIONS'])) {
foreach ($_DB_DATAOBJECT['CONNECTIONS'] as $index => $conn) {
if (!empty($conn)) {
$conn->disconnect();
}
unset($_DB_DATAOBJECT['CONNECTIONS'][$index]);
}
}
$result = parent::_connect();
if ($result && !$exists) {
......
......@@ -25,10 +25,12 @@ class Queue_item extends Memcached_DataObject
function sequenceKey()
{ return array(false, false); }
static function top($transport) {
static function top($transport=null) {
$qi = new Queue_item();
$qi->transport = $transport;
if ($transport) {
$qi->transport = $transport;
}
$qi->orderBy('created');
$qi->whereAdd('claimed is null');
......@@ -40,7 +42,8 @@ class Queue_item extends Memcached_DataObject
# XXX: potential race condition
# can we force it to only update if claimed is still null
# (or old)?
common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id . ' for transport ' . $transport);
common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id .
' for transport ' . $qi->transport);
$orig = clone($qi);
$qi->claimed = common_sql_now();
$result = $qi->update($orig);
......
......@@ -49,6 +49,13 @@ class Status_network extends DB_DataObject
static $cache = null;
static $base = null;
/**
* @param string $dbhost
* @param string $dbuser
* @param string $dbpass
* @param string $dbname
* @param array $servers memcached servers to use for caching config info
*/
static function setupDB($dbhost, $dbuser, $dbpass, $dbname, $servers)
{
global $config;
......@@ -60,12 +67,17 @@ class Status_network extends DB_DataObject
if (class_exists('Memcache')) {
self::$cache = new Memcache();
// Can't close persistent connections, making forking painful.
//
// @fixme only do this in *parent* CLI processes.
// single-process and child-processes *should* use persistent.
$persist = php_sapi_name() != 'cli';
if (is_array($servers)) {
foreach($servers as $server) {
self::$cache->addServer($server);
self::$cache->addServer($server, 11211, $persist);
}
} else {
self::$cache->addServer($servers);
self::$cache->addServer($servers, 11211, $persist);
}
}
......@@ -89,7 +101,7 @@ class Status_network extends DB_DataObject
if (empty($sn)) {
$sn = self::staticGet($k, $v);
if (!empty($sn)) {
self::$cache->set($ck, $sn);
self::$cache->set($ck, clone($sn));
}
}
......@@ -121,6 +133,11 @@ class Status_network extends DB_DataObject
return parent::delete();
}
/**
* @param string $servername hostname
* @param string $pathname URL base path
* @param string $wildcard hostname suffix to match wildcard config
*/
static function setupSite($servername, $pathname, $wildcard)
{
global $config;
......
......@@ -179,4 +179,23 @@ class Cache
return $success;
}
/**
* Close or reconnect any remote connections, such as to give
* daemon processes a chance to reconnect on a fresh socket.
*
* @return boolean success flag
*/
function reconnect()
{
$success = false;
if (Event::handle('StartCacheReconnect', array(&$success))) {
$success = true;
Event::handle('EndCacheReconnect', array());
}
return $success;
}
}
......@@ -76,159 +76,14 @@ require_once(INSTALLDIR.'/lib/language.php');
require_once(INSTALLDIR.'/lib/event.php');
require_once(INSTALLDIR.'/lib/plugin.php');
function _sn_to_path($sn)
{
$past_root = substr($sn, 1);
$last_slash = strrpos($past_root, '/');
if ($last_slash > 0) {
$p = substr($past_root, 0, $last_slash);
} else {
$p = '';
}
return $p;
}
// Save our sanity when code gets loaded through subroutines such as PHPUnit tests
global $default, $config, $_server, $_path;
// try to figure out where we are. $server and $path
// can be set by including module, else we guess based
// on HTTP info.
if (isset($server)) {
$_server = $server;
} else {
$_server = array_key_exists('SERVER_NAME', $_SERVER) ?
strtolower($_SERVER['SERVER_NAME']) :
null;
}
if (isset($path)) {
$_path = $path;
} else {
$_path = (array_key_exists('SERVER_NAME', $_SERVER) && array_key_exists('SCRIPT_NAME', $_SERVER)) ?
_sn_to_path($_SERVER['SCRIPT_NAME']) :
null;
}
require_once(INSTALLDIR.'/lib/default.php');
// Set config values initially to default values
$config = $default;
// default configuration, overwritten in config.php
$config['db'] = &PEAR::getStaticProperty('DB_DataObject','options');
$config['db'] = $default['db'];
// Backward compatibility
$config['site']['design'] =& $config['design'];
if (function_exists('date_default_timezone_set')) {
/* Work internally in UTC */
date_default_timezone_set('UTC');
}
function addPlugin($name, $attrs = null)
{
$name = ucfirst($name);
$pluginclass = "{$name}Plugin";
if (!class_exists($pluginclass)) {
$files = array("local/plugins/{$pluginclass}.php",
"local/plugins/{$name}/{$pluginclass}.php",
"local/{$pluginclass}.php",
"local/{$name}/{$pluginclass}.php",
"plugins/{$pluginclass}.php",
"plugins/{$name}/{$pluginclass}.php");
foreach ($files as $file) {
$fullpath = INSTALLDIR.'/'.$file;
if (@file_exists($fullpath)) {
include_once($fullpath);
break;
}
}
}
$inst = new $pluginclass();
if (!empty($attrs)) {
foreach ($attrs as $aname => $avalue) {
$inst->$aname = $avalue;
}
}
return $inst;
}
// From most general to most specific:
// server-wide, then vhost-wide, then for a path,
// finally for a dir (usually only need one of the last two).
if (isset($conffile)) {
$_config_files = array($conffile);
} else {
$_config_files = array('/etc/statusnet/statusnet.php',
'/etc/statusnet/laconica.php',
'/etc/laconica/laconica.php',
'/etc/statusnet/'.$_server.'.php',
'/etc/laconica/'.$_server.'.php');
if (strlen($_path) > 0) {
$_config_files[] = '/etc/statusnet/'.$_server.'_'.$_path.'.php';
$_config_files[] = '/etc/laconica/'.$_server.'_'.$_path.'.php';
}
$_config_files[] = INSTALLDIR.'/config.php';
}
global $_have_a_config;
$_have_a_config = false;
foreach ($_config_files as $_config_file) {
if (@file_exists($_config_file)) {
include_once($_config_file);
$_have_a_config = true;
}
return StatusNet::addPlugin($name, $attrs);
}
function _have_config()
{
global $_have_a_config;
return $_have_a_config;
}
// XXX: Throw a conniption if database not installed
// XXX: Find a way to use htmlwriter for this instead of handcoded markup
if (!_have_config()) {
echo '<p>'. _('No configuration file found. ') .'</p>';
echo '<p>'. _('I looked for configuration files in the following places: ') .'<br /> '. implode($_config_files, '<br />');
echo '<p>'. _('You may wish to run the installer to fix this.') .'</p>';
echo '<a href="install.php">'. _('Go to the installer.') .'</a>';
exit;
}
// Fixup for statusnet.ini
$_db_name = substr($config['db']['database'], strrpos($config['db']['database'], '/') + 1);
if ($_db_name != 'statusnet' && !array_key_exists('ini_'.$_db_name, $config['db'])) {
$config['db']['ini_'.$_db_name] = INSTALLDIR.'/classes/statusnet.ini';
}
// Backwards compatibility
if (array_key_exists('memcached', $config)) {
if ($config['memcached']['enabled']) {
addPlugin('Memcache', array('servers' => $config['memcached']['server']));
}
if (!empty($config['memcached']['base'])) {
$config['cache']['base'] = $config['memcached']['base'];
}
return StatusNet::haveConfig();
}
function __autoload($cls)
......@@ -247,27 +102,6 @@ function __autoload($cls)
}
}
// Load default plugins
foreach ($config['plugins']['default'] as $name => $params) {
if (is_null($params)) {
addPlugin($name);
} else if (is_array($params)) {
if (count($params) == 0) {
addPlugin($name);
} else {
$keys = array_keys($params);
if (is_string($keys[0])) {
addPlugin($name, $params);
} else {
foreach ($params as $paramset) {
addPlugin($name, $paramset);
}
}
}
}
}
// XXX: how many of these could be auto-loaded on use?
// XXX: note that these files should not use config options
// at compile time since DB config options are not yet loaded.
......@@ -283,20 +117,20 @@ require_once INSTALLDIR.'/lib/subs.php';
require_once INSTALLDIR.'/lib/clientexception.php';
require_once INSTALLDIR.'/lib/serverexception.php';
// Load settings from database; note we need autoload for this
Config::loadSettings();
// XXX: if plugins should check the schema at runtime, do that here.
if ($config['db']['schemacheck'] == 'runtime') {
Event::handle('CheckSchema');
try {
StatusNet::init(@$server, @$path, @$conffile);
} catch (NoConfigException $e) {
// XXX: Throw a conniption if database not installed
// XXX: Find a way to use htmlwriter for this instead of handcoded markup
echo '<p>'. _('No configuration file found. ') .'</p>';
echo '<p>'. _('I looked for configuration files in the following places: ') .'<br/> ';
echo implode($e->configFiles, '<br/>');
echo '<p>'. _('You may wish to run the installer to fix this.') .'</p>';
echo '<a href="install.php">'. _('Go to the installer.') .'</a>';
exit;
}
// XXX: other formats here
define('NICKNAME_FMT', VALIDATE_NUM.VALIDATE_ALPHA_LOWER);
// Give plugins a chance to initialize in a fully-prepared environment
Event::handle('InitializePlugin');
......@@ -22,16 +22,20 @@
* @category QueueManager
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
* @copyright 2009 StatusNet, Inc.
* @author Brion Vibber <brion@status.net>
* @copyright 2009-2010 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
* @link http://status.net/
*/
class DBQueueManager extends QueueManager
{
var $qis = array();
function enqueue($object, $queue)
/**
* Saves a notice object reference into the queue item table.
* @return boolean true on success
* @throws ServerException on failure
*/
public function enqueue($object, $queue)
{
$notice = $object;
......@@ -47,70 +51,95 @@ class DBQueueManager extends QueueManager
throw new ServerException('DB error inserting queue item');
}
$this->stats('enqueued', $queue);
return true;
}
function service($queue, $handler)
/**
* Poll every minute for new events during idle periods.
* We'll look in more often when there's data available.
*
* @return int seconds
*/
public function pollInterval()
{
return 60;
}
/**
* Run a polling cycle during idle processing in the input loop.
* @return boolean true if we had a hit
*/
public function poll()
{
while (true) {
$this->_log(LOG_DEBUG, 'Checking for notices...');
$timeout = $handler->timeout();
$notice = $this->_nextItem($queue, $timeout);
if (empty($notice)) {
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
// Nothing in the queue. Do you
// have other tasks, like servicing your
// XMPP connection, to do?
$handler->idle(QUEUE_HANDLER_MISS_IDLE);
$this->_log(LOG_DEBUG, 'Checking for notices...');
$item = $this->_nextItem();
if ($item === false) {
$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
return false;
}
if ($item === true) {
// We dequeued an entry for a deleted or invalid notice.
// Consider it a hit for poll rate purposes.
return true;
}
list($queue, $notice) = $item;
$this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
// Yay! Got one!
$handler = $this->getHandler($queue);
if ($handler) {
if ($handler->handle_notice($notice)) {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
$this->_done($notice, $queue);
} else {
$this->_log(LOG_INFO, 'Got notice '. $notice->id);
// Yay! Got one!
if ($handler->handle_notice($notice)) {
$this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
$this->_done($notice, $queue);
} else {
$this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
$this->_fail($notice, $queue);
}
// Chance to e.g. service your XMPP connection
$this->_log(LOG_DEBUG, 'Idling after success.');
$handler->idle(QUEUE_HANDLER_HIT_IDLE);
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
$this->_fail($notice, $queue);
}
// XXX: when do we give up?
} else {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue");
$this->_fail($notice, $queue);
}
return true;
}
function _nextItem($queue, $timeout=null)
/**
* Pop the oldest unclaimed item off the queue set and claim it.
*
* @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
* giving the queue transport name.
*/
protected function _nextItem()
{
$start = time();
$result = null;
$sleeptime = 1;
$qi = Queue_item::top();
if (empty($qi)) {
return false;
}
do {
$qi = Queue_item::top($queue);
if (empty($qi)) {
$this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds.");
sleep($sleeptime);
$sleeptime *= 2;
} else {
$notice = Notice::staticGet('id', $qi->notice_id);
if (!empty($notice)) {
$result = $notice;
} else {
$this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
$qi->delete();
$qi->free();
$qi = null;
}
$sleeptime = 1;
}
} while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
$queue = $qi->transport;
$notice = Notice::staticGet('id', $qi->notice_id);
if (empty($notice)) {
$this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
$qi->delete();
return true;
}
return $result;
$result = $notice;
return array($queue, $notice);
}
function _done($object, $queue)
/**
* Delete our claimed item from the queue after successful processing.
*
* @param Notice $object
* @param string $queue
*/
protected function _done($object, $queue)
{
// XXX: right now, we only handle notices
......@@ -120,24 +149,29 @@ class DBQueueManager extends QueueManager
'transport' => $queue));
if (empty($qi)) {
$this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
} else {
if (empty($qi->claimed)) {
$this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
'for '.$notice->id.', queue '.$queue);
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
}
$qi->delete();
$qi->free();
$qi = null;
}
$this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
$this->stats('handled', $queue);
$notice->free();
$notice = null;
}
function _fail($object, $queue)
/**
* Free our claimed queue item for later reprocessing in case of
* temporary failure.
*
* @param Notice $object
* @param string $queue
*/
protected function _fail($object, $queue)
{
// XXX: right now, we only handle notices
......@@ -147,11 +181,10 @@ class DBQueueManager extends QueueManager
'transport' => $queue));
if (empty($qi)) {
$this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
$this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
} else {
if (empty($qi->claimed)) {
$this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
'for '.$notice->id.', queue '.$queue);
$this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
} else {
$orig = clone($qi);
$qi->claimed = null;
......@@ -160,13 +193,13 @@ class DBQueueManager extends QueueManager
}
}
$this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
$this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
$this->stats('error', $queue);
$notice->free();
$notice = null;
}
function _log($level, $msg)
protected function _log($level, $msg)
{
common_log($level, 'DBQueueManager: '.$msg);
}
......
......@@ -79,6 +79,8 @@ $default =
'queue_basename' => '/queue/statusnet/',
'stomp_username' => null,
'stomp_password' => null,
'monitor' => null, // URL to monitor ping endpoint (work in progress)
'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
),
'license' =>
array('url' => 'http://creativecommons.org/licenses/by/3.0/',
......
......@@ -138,4 +138,12 @@ class Event {
}
return false;
}
/**
* Disables any and all handlers that have been set up so far;
* use only if you know it's safe to reinitialize all plugins.
*/
public static function clearHandlers() {
Event::$_handlers = array();
}
}
<?php
/**
* StatusNet, the distributed open-source microblogging tool
*
* Abstract class for i/o managers
*
* PHP version 5
*
* LICENCE: This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @category QueueManager
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
* @author Sarven Capadisli <csarven@status.net>
* @author Brion Vibber <brion@status.net>
* @copyright 2009-2010 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
* @link http://status.net/
*/
abstract class IoManager
{
const SINGLE_ONLY = 0;
const INSTANCE_PER_SITE = 1;
const INSTANCE_PER_PROCESS = 2;
/**
* Factory function to get an appropriate subclass.
*/
public abstract static function get();
/**
* Tell the i/o queue master if and how we can handle multi-site
* processes.
*
* Return one of:
* IoManager::SINGLE_ONLY
* IoManager::INSTANCE_PER_SITE
* IoManager::INSTANCE_PER_PROCESS
*/
public static function multiSite()
{
return IoManager::SINGLE_ONLY;
}
/**
* If in a multisite configuration, the i/o master will tell
* your manager about each site you'll have to handle so you
* can do any necessary per-site setup.
*
* @param string $site target site server name
*/
public function addSite($site)
{
/* no-op */
}
/**
* This method is called when data is available on one of your
* i/o manager's sockets. The socket with data is passed in,
* in case you have multiple sockets.
*
* If your i/o manager is based on polling during idle processing,
* you don't need to implement this.
*
* @param resource $socket
* @return boolean true on success, false on failure
*/
public function handleInput($socket)
{
return true;
}
/**
* Return any open sockets that the run loop should listen
* for input on. If input comes in on a listed socket,
* the matching manager's handleInput method will be called.
*
* @return array of resources
*/
function getSockets()
{
return array();
}
/**
* Maximum planned time between poll() calls when input isn't waiting.
* Actual time may vary!
*
* When we get a polling hit, the timeout will be cut down to 0 while
* input is coming in, then will back off to this amount if no further
* input shows up.
*
* By default polling is disabled; you must override this to enable
* polling for this manager.
*
* @return int max poll interval in seconds, or 0 to disable polling
*/
function pollInterval()
{
return 0;
}
/**
* Request a maximum timeout for listeners before the next idle period.
* Actual wait may be shorter, so don't go crazy in your idle()!
* Wait could be longer if other handlers performed some slow activity.
*
* Return 0 to request that listene