Commit 292ac40c authored by Zach Copley's avatar Zach Copley

Merge branch 'testing' of git@gitorious.org:statusnet/mainline into testing

parents 48a1a5a2 d13d73c5
......@@ -147,6 +147,7 @@ class Memcached_DataObject extends DB_DataObject
{
$result = parent::insert();
if ($result) {
$this->fixupTimestamps();
$this->encache(); // in case of cached negative lookups
}
return $result;
......@@ -159,6 +160,7 @@ class Memcached_DataObject extends DB_DataObject
}
$result = parent::update($orig);
if ($result) {
$this->fixupTimestamps();
$this->encache();
}
return $result;
......@@ -366,7 +368,7 @@ class Memcached_DataObject extends DB_DataObject
}
/**
* sends query to database - this is the private one that must work
* sends query to database - this is the private one that must work
* - internal functions use this rather than $this->query()
*
* Overridden to do logging.
......@@ -428,7 +430,7 @@ class Memcached_DataObject extends DB_DataObject
//
// WARNING WARNING if we end up actually using multiple DBs at a time
// we'll need some fancier logic here.
if (!$exists && !empty($_DB_DATAOBJECT['CONNECTIONS'])) {
if (!$exists && !empty($_DB_DATAOBJECT['CONNECTIONS']) && php_sapi_name() == 'cli') {
foreach ($_DB_DATAOBJECT['CONNECTIONS'] as $index => $conn) {
if (!empty($conn)) {
$conn->disconnect();
......@@ -529,4 +531,25 @@ class Memcached_DataObject extends DB_DataObject
return $c->delete($cacheKey);
}
function fixupTimestamps()
{
// Fake up timestamp columns
$columns = $this->table();
foreach ($columns as $name => $type) {
if ($type & DB_DATAOBJECT_MYSQLTIMESTAMP) {
$this->$name = common_sql_now();
}
}
}
function debugDump()
{
common_debug("debugDump: " . common_log_objstring($this));
}
function raiseError($message, $type = null, $behaviour = null)
{
throw new ServerException("DB_DataObject error [$type]: $message");
}
}
......@@ -326,13 +326,7 @@ class Notice extends Memcached_DataObject
# XXX: someone clever could prepend instead of clearing the cache
$notice->blowOnInsert();
if (common_config('queue', 'inboxes')) {
$qm = QueueManager::get();
$qm->enqueue($notice, 'distrib');
} else {
$handler = new DistribQueueHandler();
$handler->handle($notice);
}
$notice->distribute();
return $notice;
}
......@@ -1447,4 +1441,31 @@ class Notice extends Memcached_DataObject
$gi->free();
}
function distribute()
{
if (common_config('queue', 'inboxes')) {
// If there's a failure, we want to _force_
// distribution at this point.
try {
$qm = QueueManager::get();
$qm->enqueue($this, 'distrib');
} catch (Exception $e) {
// If the exception isn't transient, this
// may throw more exceptions as DQH does
// its own enqueueing. So, we ignore them!
try {
$handler = new DistribQueueHandler();
$handler->handle($this);
} catch (Exception $e) {
common_log(LOG_ERR, "emergency redistribution resulted in " . $e->getMessage());
}
// Re-throw so somebody smarter can handle it.
throw $e;
}
} else {
$handler = new DistribQueueHandler();
$handler->handle($this);
}
}
}
......@@ -64,8 +64,12 @@ class Session extends Memcached_DataObject
$session = Session::staticGet('id', $id);
if (empty($session)) {
self::logdeb("Couldn't find '$id'");
return '';
} else {
self::logdeb("Found '$id', returning " .
strlen($session->session_data) .
" chars of data");
return (string)$session->session_data;
}
}
......@@ -77,14 +81,24 @@ class Session extends Memcached_DataObject
$session = Session::staticGet('id', $id);
if (empty($session)) {
self::logdeb("'$id' doesn't yet exist; inserting.");
$session = new Session();
$session->id = $id;
$session->session_data = $session_data;
$session->created = common_sql_now();
return $session->insert();
$result = $session->insert();
if (!$result) {
common_log_db_error($session, 'INSERT', __FILE__);
self::logdeb("Failed to insert '$id'.");
} else {
self::logdeb("Successfully inserted '$id' (result = $result).");
}
return $result;
} else {
self::logdeb("'$id' already exists; updating.");
if (strcmp($session->session_data, $session_data) == 0) {
self::logdeb("Not writing session '$id'; unchanged");
return true;
......@@ -95,7 +109,16 @@ class Session extends Memcached_DataObject
$session->session_data = $session_data;
return $session->update($orig);
$result = $session->update($orig);
if (!$result) {
common_log_db_error($session, 'UPDATE', __FILE__);
self::logdeb("Failed to update '$id'.");
} else {
self::logdeb("Successfully updated '$id' (result = $result).");
}
return $result;
}
}
}
......@@ -106,8 +129,17 @@ class Session extends Memcached_DataObject
$session = Session::staticGet('id', $id);
if (!empty($session)) {
return $session->delete();
if (empty($session)) {
self::logdeb("Can't find '$id' to delete.");
} else {
$result = $session->delete();
if (!$result) {
common_log_db_error($session, 'DELETE', __FILE__);
self::logdeb("Failed to delete '$id'.");
} else {
self::logdeb("Successfully deleted '$id' (result = $result).");
}
return $result;
}
}
......@@ -132,7 +164,10 @@ class Session extends Memcached_DataObject
$session->free();
self::logdeb("Found " . count($ids) . " ids to delete.");
foreach ($ids as $id) {
self::logdeb("Destroying session '$id'.");
self::destroy($id);
}
}
......
......@@ -152,6 +152,16 @@ function checkMirror($action_obj, $args)
static $alwaysRW = array('session', 'remember_me');
// We ensure that these tables always are used
// on the master DB
$config['db']['database_rw'] = $config['db']['database'];
$config['db']['ini_rw'] = INSTALLDIR.'/classes/statusnet.ini';
foreach ($alwaysRW as $table) {
$config['db']['table_'.$table] = 'rw';
}
if (common_config('db', 'mirror') && $action_obj->isReadOnly($args)) {
if (is_array(common_config('db', 'mirror'))) {
// "load balancing", ha ha
......@@ -162,16 +172,6 @@ function checkMirror($action_obj, $args)
$mirror = common_config('db', 'mirror');
}
// We ensure that these tables always are used
// on the master DB
$config['db']['database_rw'] = $config['db']['database'];
$config['db']['ini_rw'] = INSTALLDIR.'/classes/statusnet.ini';
foreach ($alwaysRW as $table) {
$config['db']['table_'.$table] = 'rw';
}
// everyone else uses the mirror
$config['db']['database'] = $mirror;
......
......@@ -84,6 +84,8 @@ $default =
'control_channel' => '/topic/statusnet-control', // broadcasts to all queue daemons
'stomp_username' => null,
'stomp_password' => null,
'stomp_persistent' => true, // keep items across queue server restart, if persistence is enabled
'stomp_manual_failover' => true, // if multiple servers are listed, treat them as separate (enqueue on one randomly, listen on all)
'monitor' => null, // URL to monitor ping endpoint (work in progress)
'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
'debug_memory' => false, // true to spit memory usage to log
......
......@@ -33,6 +33,22 @@ class LiberalStomp extends Stomp
return $this->_socket;
}
/**
* Return the host we're currently connected to.
*
* @return string
*/
function getServer()
{
$idx = $this->_currentHost;
if ($idx >= 0) {
$host = $this->_hosts[$idx];
return "$host[0]:$host[1]";
} else {
return '[unconnected]';
}
}
/**
* Make socket connection to the server
* We also set the stream to non-blocking mode, since we'll be
......@@ -71,10 +87,12 @@ class LiberalStomp extends Stomp
// @fixme this sometimes hangs in blocking mode...
// shouldn't we have been idle until we found there's more data?
$read = fread($this->_socket, $rb);
if ($read === false) {
$this->_reconnect();
if ($read === false || ($read === '' && feof($this->_socket))) {
// @fixme possibly attempt an auto reconnect as old code?
throw new StompException("Error reading");
//$this->_reconnect();
// @fixme this will lose prior items
return $this->readFrames();
//return $this->readFrames();
}
$data .= $read;
if (strpos($data, "\x00") !== false) {
......
This diff is collapsed.
......@@ -178,7 +178,6 @@ function common_ensure_session()
}
if (isset($id)) {
session_id($id);
setcookie(session_name(), $id);
}
@session_start();
if (!isset($_SESSION['started'])) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment