Commit c4d67892 authored by Evan Prodromou's avatar Evan Prodromou

split public stream to its own queue handler

Add another queue handler for the public stream. Should further
parallelize the work of sending out messages.

darcs-hash:20080829181702-84dde-594505aa73d2380b13bd98917b70b02bac597d12.gz
parent f6524188
......@@ -24,32 +24,32 @@ require_once('XMPPHP/XMPP.php');
# XXX: something of a hack to work around problems with the XMPPHP lib
class Laconica_XMPP extends XMPPHP_XMPP {
function messageplus($to, $body, $type = 'chat', $subject = null, $payload = null) {
$to = htmlspecialchars($to);
$body = htmlspecialchars($body);
$subject = htmlspecialchars($subject);
$jid = jabber_daemon_address();
$out = "<message from='$jid' to='$to' type='$type'>";
if($subject) $out .= "<subject>$subject</subject>";
$out .= "<body>$body</body>";
if($payload) $out .= $payload;
$out .= "</message>";
$cnt = strlen($out);
common_log(LOG_DEBUG, "Sending $cnt chars to $to");
$this->send($out);
common_log(LOG_DEBUG, 'Done.');
}
public function presence($status = null, $show = 'available', $to = null, $type='available', $priority=NULL) {
if($type == 'available') $type = '';
$to = htmlspecialchars($to);
$status = htmlspecialchars($status);
if($show == 'unavailable') $type = 'unavailable';
$out = "<presence";
if($to) $out .= " to='$to'";
if($type) $out .= " type='$type'";
......@@ -62,7 +62,7 @@ class Laconica_XMPP extends XMPPHP_XMPP {
if(!is_null($priority)) $out .= "<priority>$priority</priority>";
$out .= "</presence>";
}
$this->send($out);
}
}
......@@ -105,7 +105,7 @@ function jabber_connect($resource=NULL, $status=NULL, $priority=NULL) {
);
$conn->autoSubscribe();
$conn->useEncryption(common_config('xmpp', 'encryption'));
if (!$conn) {
return false;
}
......@@ -141,7 +141,7 @@ function jabber_send_notice($to, $notice) {
# Extra stuff defined by Twitter, needed by twitter clients
function jabber_format_entry($profile, $notice) {
$noticeurl = common_local_url('shownotice',
array('notice' => $notice->id));
$msg = jabber_format_notice($profile, $notice);
......@@ -167,7 +167,7 @@ function jabber_format_entry($profile, $notice) {
$html .= ($notice->rendered) ? $notice->rendered : common_render_content($notice->content, $notice);
$html .= "\n</body>\n";
$html .= "\n</html>\n";
$event = "<event xmlns='http://jabber.org/protocol/pubsub#event'>\n";
$event .= "<items xmlns='http://jabber.org/protocol/pubsub' ";
$event .= "node='" . common_local_url('public') . "'>\n";
......@@ -229,6 +229,7 @@ function jabber_special_presence($type, $to=NULL, $show=NULL, $status=NULL) {
}
function jabber_broadcast_notice($notice) {
if (!common_config('xmpp', 'enabled')) {
return true;
}
......@@ -268,7 +269,7 @@ function jabber_broadcast_notice($notice) {
# XXX: use a join here rather than looping through results
$sub = new Subscription();
$sub->subscribed = $notice->profile_id;
if ($sub->find()) {
while ($sub->fetch()) {
$user = User::staticGet($sub->subscriber);
......@@ -289,14 +290,20 @@ function jabber_broadcast_notice($notice) {
}
}
}
return true;
}
function jabber_public_notice($notice) {
# Now, users who want everything
$public = common_config('xmpp', 'public');
# FIXME PRIV don't send out private messages here
# XXX: should we send out non-local messages if public,localonly = false? I think not
# XXX: should we send out non-local messages if public,localonly
# = false? I think not
if ($public && $notice->is_local) {
foreach ($public as $address) {
common_log(LOG_INFO,
......@@ -305,7 +312,7 @@ function jabber_broadcast_notice($notice) {
jabber_send_notice($address, $notice);
}
}
return true;
}
......
......@@ -1070,7 +1070,7 @@ function common_broadcast_notice($notice, $remote=false) {
# Stick the notice on the queue
function common_enqueue_notice($notice) {
foreach (array('jabber', 'omb', 'sms') as $transport) {
foreach (array('jabber', 'omb', 'sms', 'public') as $transport) {
$qi = new Queue_item();
$qi->notice_id = $notice->id;
$qi->transport = $transport;
......@@ -1126,6 +1126,12 @@ function common_real_broadcast($notice, $remote=false) {
common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id);
}
}
if ($success) {
$success = jabber_public_notice($notice);
if (!$success) {
common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id);
}
}
// XXX: broadcast notices to other IM
return $success;
}
......
#!/usr/bin/env php
<?php
/*
* Laconica - a distributed open-source microblogging tool
* Copyright (C) 2008, Controlez-Vous, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
# Abort if called from a web server
if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
print "This script must be run from the command line\n";
exit();
}
define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
define('LACONICA', true);
require_once(INSTALLDIR . '/lib/common.php');
require_once(INSTALLDIR . '/lib/jabber.php');
require_once(INSTALLDIR . '/lib/queuehandler.php');
set_error_handler('common_error_handler');
class PublicQueueHandler extends QueueHandler {
function transport() {
return 'public';
}
function start() {
# Low priority; we don't want to receive messages
$this->conn = jabber_connect($this->_id, NULL, -1);
return !is_null($this->conn);
}
function handle_notice($notice) {
return jabber_public_notice($notice);
}
function finish() {
}
}
mb_internal_encoding('UTF-8');
$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-public');
$handler = new XmppQueueHandler($resource);
if ($handler->start()) {
$handler->handle_queue();
}
$handler->finish();
......@@ -22,6 +22,7 @@ export INSTALLDIR=$1
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppdaemon.php -b -m --pidfile=/var/run/xmppdaemon.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppqueuehandler.php -b -m --pidfile=/var/run/xmppqueuehandler.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/publicqueuehandler.php -b -m --pidfile=/var/run/publicqueuehandler.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/xmppconfirmhandler.php -b -m --pidfile=/var/run/xmppconfirmhandler.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/smsqueuehandler.php -b -m --pidfile=/var/run/smsqueuehandler.pid
/sbin/start-stop-daemon -S --exec $INSTALLDIR/scripts/ombqueuehandler.php -b -m --pidfile=/var/run/ombqueuehandler.pid
......@@ -22,6 +22,7 @@ export INSTALLDIR=$1
/sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppdaemon.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppqueuehandler.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/publicqueuehandler.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/xmppconfirmhandler.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/smsqueuehandler.pid
/sbin/start-stop-daemon -K -m --pidfile=/var/run/ombqueuehandler.pid
......@@ -259,7 +259,7 @@ class XMPPDaemon {
mb_internal_encoding('UTF-8');
$resource = ($argc > 1) ? $argv[1] : NULL;
$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-listen');
$daemon = new XMPPDaemon($resource);
......
......@@ -41,7 +41,7 @@ class XmppQueueHandler extends QueueHandler {
function start() {
# Low priority; we don't want to receive messages
$this->conn = jabber_connect($this->resource, NULL, -100);
$this->conn = jabber_connect($this->_id, NULL, -1);
return !is_null($this->conn);
}
......@@ -55,7 +55,7 @@ class XmppQueueHandler extends QueueHandler {
mb_internal_encoding('UTF-8');
$resource = ($argc > 1) ? $argv[1] : NULL;
$resource = ($argc > 1) ? $argv[1] : (common_config('xmpp','resource') . '-queuehandler');
$handler = new XmppQueueHandler($resource);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment