stompqueuemanager.php 24.1 KB
Newer Older
1 2
<?php
/**
3
 * StatusNet, the distributed open-source microblogging tool
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
 *
 * Abstract class for queue managers
 *
 * PHP version 5
 *
 * LICENCE: This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 * @category  QueueManager
23
 * @package   StatusNet
24 25
 * @author    Evan Prodromou <evan@status.net>
 * @author    Sarven Capadisli <csarven@status.net>
26
 * @copyright 2009 StatusNet, Inc.
27
 * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
28
 * @link      http://status.net/
29 30 31
 */

require_once 'Stomp.php';
32
require_once 'Stomp/Exception.php';
33

34
class StompQueueManager extends QueueManager
35
{
36 37 38 39
    protected $servers;
    protected $username;
    protected $password;
    protected $base;
40
    protected $control;
41

42 43
    protected $useTransactions;
    protected $useAcks;
44

45
    protected $sites = array();
46
    protected $subscriptions = array();
47

48 49 50 51 52
    protected $cons = array(); // all open connections
    protected $disconnect = array();
    protected $transaction = array();
    protected $transactionCount = array();
    protected $defaultIdx = 0;
53

54 55
    function __construct()
    {
56
        parent::__construct();
57 58 59 60 61 62
        $server = common_config('queue', 'stomp_server');
        if (is_array($server)) {
            $this->servers = $server;
        } else {
            $this->servers = array($server);
        }
63 64 65 66 67 68 69
        $this->username        = common_config('queue', 'stomp_username');
        $this->password        = common_config('queue', 'stomp_password');
        $this->base            = common_config('queue', 'queue_basename');
        $this->control         = common_config('queue', 'control_channel');
        $this->breakout        = common_config('queue', 'breakout');
        $this->useTransactions = common_config('queue', 'stomp_transactions');
        $this->useAcks         = common_config('queue', 'stomp_acks');
70 71
    }

72 73 74 75 76
    /**
     * Tell the i/o master we only need a single instance to cover
     * all sites running in this process.
     */
    public static function multiSite()
77
    {
78 79
        return IoManager::INSTANCE_PER_PROCESS;
    }
80

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
    /**
     * Optional; ping any running queue handler daemons with a notification
     * such as announcing a new site to handle or requesting clean shutdown.
     * This avoids having to restart all the daemons manually to update configs
     * and such.
     *
     * Currently only relevant for multi-site queue managers such as Stomp.
     *
     * @param string $event event key
     * @param string $param optional parameter to append to key
     * @return boolean success
     */
    public function sendControlSignal($event, $param='')
    {
        $message = $event;
        if ($param != '') {
            $message .= ':' . $param;
        }
        $this->_connect();
Brion Vibber's avatar
Brion Vibber committed
100 101 102 103
        $con = $this->cons[$this->defaultIdx];
        $result = $con->send($this->control,
                             $message,
                             array ('created' => common_sql_now()));
104 105 106 107 108 109 110 111
        if ($result) {
            $this->_log(LOG_INFO, "Sent control ping to queue daemons: $message");
            return true;
        } else {
            $this->_log(LOG_ERR, "Failed sending control ping to queue daemons: $message");
            return false;
        }
    }
112 113

    /**
114
     * Saves an object into the queue item table.
115
     *
116
     * @param mixed $object
117
     * @param string $queue
118
     * @param string $siteNickname optional override to drop into another site's queue
119
     *
120
     * @return boolean true on success
121
     * @throws StompException on connection or send error
122
     */
123
    public function enqueue($object, $queue, $siteNickname=null)
124 125
    {
        $this->_connect();
126 127 128 129 130 131 132 133 134 135 136 137
        if (common_config('queue', 'stomp_enqueue_on')) {
            // We're trying to force all writes to a single server.
            // WARNING: this might do odd things if that server connection dies.
            $idx = array_search(common_config('queue', 'stomp_enqueue_on'),
                                $this->servers);
            if ($idx === false) {
                common_log(LOG_ERR, 'queue stomp_enqueue_on setting does not match our server list.');
                $idx = $this->defaultIdx;
            }
        } else {
            $idx = $this->defaultIdx;
        }
138
        return $this->_doEnqueue($object, $queue, $idx, $siteNickname);
139
    }
140

141 142 143 144 145 146 147
    /**
     * Saves a notice object reference into the queue item table
     * on the given connection.
     *
     * @return boolean true on success
     * @throws StompException on connection or send error
     */
148
    protected function _doEnqueue($object, $queue, $idx, $siteNickname=null)
149
    {
150
        $rep = $this->logrep($object);
151
        $envelope = array('site' => $siteNickname ? $siteNickname : common_config('site', 'nickname'),
152 153 154
                          'handler' => $queue,
                          'payload' => $this->encode($object));
        $msg = serialize($envelope);
155

156
        $props = array('created' => common_sql_now());
157
        if ($this->isPersistent($queue)) {
158 159
            $props['persistent'] = 'true';
        }
160 161 162

        $con = $this->cons[$idx];
        $host = $con->getServer();
163 164
        $target = $this->queueName($queue);
        $result = $con->send($target, $msg, $props);
165 166

        if (!$result) {
167
            $this->_log(LOG_ERR, "Error sending $rep to $queue queue on $host $target");
168 169 170
            return false;
        }

171
        $this->_log(LOG_DEBUG, "complete remote queueing $rep for $queue on $host $target");
172
        $this->stats('enqueued', $queue);
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
        return true;
    }

    /**
     * Determine whether messages to this queue should be marked as persistent.
     * Actual persistent storage depends on the queue server's configuration.
     * @param string $queue
     * @return bool
     */
    protected function isPersistent($queue)
    {
        $mode = common_config('queue', 'stomp_persistent');
        if (is_array($mode)) {
            return in_array($queue, $mode);
        } else {
            return (bool)$mode;
        }
190 191
    }

192 193 194 195 196 197 198
    /**
     * Send any sockets we're listening on to the IO manager
     * to wait for input.
     *
     * @return array of resources
     */
    public function getSockets()
199
    {
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
        $sockets = array();
        foreach ($this->cons as $con) {
            if ($con) {
                $sockets[] = $con->getSocket();
            }
        }
        return $sockets;
    }

    /**
     * Get the Stomp connection object associated with the given socket.
     * @param resource $socket
     * @return int index into connections list
     * @throws Exception
     */
    protected function connectionFromSocket($socket)
    {
        foreach ($this->cons as $i => $con) {
            if ($con && $con->getSocket() === $socket) {
                return $i;
            }
        }
        throw new Exception(__CLASS__ . " asked to read from unrecognized socket");
223
    }
224

225 226 227 228 229 230 231 232 233
    /**
     * We've got input to handle on our socket!
     * Read any waiting Stomp frame(s) and process them.
     *
     * @param resource $socket
     * @return boolean ok on success
     */
    public function handleInput($socket)
    {
234 235 236
        $idx = $this->connectionFromSocket($socket);
        $con = $this->cons[$idx];
        $host = $con->getServer();
237
        $this->defaultIdx = $idx;
238

239
        $ok = true;
240 241 242
        try {
            $frames = $con->readFrames();
        } catch (StompException $e) {
243 244
            $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage());
            fclose($socket); // ???
245 246 247 248 249
            $this->cons[$idx] = null;
            $this->transaction[$idx] = null;
            $this->disconnect[$idx] = time();
            return false;
        }
250
        foreach ($frames as $frame) {
251 252
            $dest = $frame->headers['destination'];
            if ($dest == $this->control) {
253
                if (!$this->handleControlSignal($frame)) {
254 255 256 257 258
                    // We got a control event that requests a shutdown;
                    // close out and stop handling anything else!
                    break;
                }
            } else {
259
                $ok = $this->handleItem($frame) && $ok;
260
            }
261 262 263
            $this->ack($idx, $frame);
            $this->commit($idx);
            $this->begin($idx);
264 265 266
        }
        return $ok;
    }
267

268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
    /**
     * Attempt to reconnect in background if we lost a connection.
     */
    function idle()
    {
        $now = time();
        foreach ($this->cons as $idx => $con) {
            if (empty($con)) {
                $age = $now - $this->disconnect[$idx];
                if ($age >= 60) {
                    $this->_reconnect($idx);
                }
            }
        }
        return true;
    }

285 286
    /**
     * Initialize our connection and subscribe to all the queues
287 288
     * we're going to need to handle... If multiple queue servers
     * are configured for failover, we'll listen to all of them.
289 290 291 292 293 294 295 296 297
     *
     * Side effects: in multi-site mode, may reset site configuration.
     *
     * @param IoMaster $master process/event controller
     * @return bool return false on failure
     */
    public function start($master)
    {
        parent::start($master);
298
        $this->_connectAll();
299

300 301
        foreach ($this->cons as $i => $con) {
            if ($con) {
302
                $this->doSubscribe($con);
303 304 305
                $this->begin($i);
            }
        }
306 307
        return true;
    }
308

309
    /**
310
     * Close out any active connections.
311 312 313 314 315
     *
     * @return bool return false on failure
     */
    public function finish()
    {
316 317
        // If there are any outstanding delivered messages we haven't processed,
        // free them for another thread to take.
318 319 320
        foreach ($this->cons as $i => $con) {
            if ($con) {
                $this->rollback($i);
Brion Vibber's avatar
Brion Vibber committed
321 322
                $con->disconnect();
                $this->cons[$i] = null;
323 324
            }
        }
325 326
        return true;
    }
327

328
    /**
329 330 331
     * Lazy open a single connection to Stomp queue server.
     * If multiple servers are configured, we let the Stomp client library
     * worry about finding a working connection among them.
332 333 334
     */
    protected function _connect()
    {
335 336 337 338 339
        if (empty($this->cons)) {
            $list = $this->servers;
            if (count($list) > 1) {
                shuffle($list); // Randomize to spread load
                $url = 'failover://(' . implode(',', $list) . ')';
340
            } else {
341
                $url = $list[0];
342
            }
343 344 345 346 347
            $con = $this->_doConnect($url);
            $this->cons = array($con);
            $this->transactionCount = array(0);
            $this->transaction = array(null);
            $this->disconnect = array(null);
348 349
        }
    }
350

351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    /**
     * Lazy open connections to all Stomp servers, if in manual failover
     * mode. This means the queue servers don't speak to each other, so
     * we have to listen to all of them to make sure we get all events.
     */
    protected function _connectAll()
    {
        if (!common_config('queue', 'stomp_manual_failover')) {
            return $this->_connect();
        }
        if (empty($this->cons)) {
            $this->cons = array();
            $this->transactionCount = array();
            $this->transaction = array();
            foreach ($this->servers as $idx => $server) {
                try {
                    $this->cons[] = $this->_doConnect($server);
                    $this->disconnect[] = null;
                } catch (Exception $e) {
                    // s'okay, we'll live
                    $this->cons[] = null;
                    $this->disconnect[] = time();
                }
                $this->transactionCount[] = 0;
                $this->transaction[] = null;
            }
            if (empty($this->cons)) {
                throw new ServerException("No queue servers reachable...");
                return false;
            }
        }
    }

384 385 386 387
    /**
     * Attempt to manually reconnect to the Stomp server for the given
     * slot. If successful, set up our subscriptions on it.
     */
388 389 390 391 392 393 394 395 396 397 398
    protected function _reconnect($idx)
    {
        try {
            $con = $this->_doConnect($this->servers[$idx]);
        } catch (Exception $e) {
            $this->_log(LOG_ERR, $e->getMessage());
            $con = null;
        }
        if ($con) {
            $this->cons[$idx] = $con;
            $this->disconnect[$idx] = null;
399

400
            $this->doSubscribe($con);
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422
            $this->begin($idx);
        } else {
            // Try again later...
            $this->disconnect[$idx] = time();
        }
    }

    protected function _doConnect($server)
    {
        $this->_log(LOG_INFO, "Connecting to '$server' as '$this->username'...");
        $con = new LiberalStomp($server);

        if ($con->connect($this->username, $this->password)) {
            $this->_log(LOG_INFO, "Connected.");
        } else {
            $this->_log(LOG_ERR, 'Failed to connect to queue server');
            throw new ServerException('Failed to connect to queue server');
        }

        return $con;
    }

423
    /**
424 425
     * Set up all our raw queue subscriptions on the given connection
     * @param LiberalStomp $con
426
     */
427
    protected function doSubscribe(LiberalStomp $con)
428
    {
429
        $host = $con->getServer();
430 431 432
        foreach ($this->subscriptions() as $sub) {
            $this->_log(LOG_INFO, "Subscribing to $sub on $host");
            $con->subscribe($sub);
433 434
        }
    }
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    
    /**
     * Grab a full list of stomp-side queue subscriptions.
     * Will include:
     *  - control broadcast channel
     *  - shared group queues for active groups
     *  - per-handler and per-site breakouts from $config['queue']['breakout']
     *    that are rooted in the active groups.
     *
     * @return array of strings
     */
    protected function subscriptions()
    {
        $subs = array();
        $subs[] = $this->control;

        foreach ($this->activeGroups as $group) {
            $subs[] = $this->base . $group;
        }

        foreach ($this->breakout as $spec) {
            $parts = explode('/', $spec);
            if (count($parts) < 2 || count($parts) > 3) {
                common_log(LOG_ERR, "Bad queue breakout specifier $spec");
            }
            if (in_array($parts[0], $this->activeGroups)) {
                $subs[] = $this->base . $spec;
            }
        }
        return array_unique($subs);
    }
466

467
    /**
468
     * Handle and acknowledge an event that's come in through a queue.
469 470 471 472 473 474 475 476
     *
     * If the queue handler reports failure, the message is requeued for later.
     * Missing notices or handler classes will drop the message.
     *
     * Side effects: in multi-site mode, may reset site configuration to
     * match the site that queued the event.
     *
     * @param StompFrame $frame
477
     * @return bool success
478
     */
479
    protected function handleItem($frame)
480
    {
481 482
        $host = $this->cons[$this->defaultIdx]->getServer();
        $message = unserialize($frame->body);
483 484 485

        if ($message === false) {
            $this->_log(LOG_ERR, "Can't unserialize frame: {$frame->body}");
486
            $this->_log(LOG_ERR, "Unserializable frame length: " . strlen($frame->body));
487 488 489
            return false;
        }

490 491
        $site = $message['site'];
        $queue = $message['handler'];
492

493 494 495
        if ($this->isDeadletter($frame, $message)) {
            $this->stats('deadletter', $queue);
	        return false;
496
        }
497

498 499 500
        // @fixme detect failing site switches
        $this->switchSite($site);

501 502 503
        try {
            $item = $this->decode($message['payload']);
        } catch (Exception $e) {
504
            $this->_log(LOG_ERR, "Skipping empty or deleted item in queue $queue from $host");
505 506
            $this->stats('baditem', $queue);
            return false;
507
        }
508 509 510
        $info = $this->logrep($item) . " posted at " .
                $frame->headers['created'] . " in queue $queue from $host";
        $this->_log(LOG_DEBUG, "Dequeued $info");
511

512 513 514 515
        try {
            $handler = $this->getHandler($queue);
            $ok = $handler->handle($item);
        } catch (NoQueueHandlerException $e) {
516
            $this->_log(LOG_ERR, "Missing handler class; skipping $info");
517 518
            $this->stats('badhandler', $queue);
            return false;
519 520 521 522
        } catch (Exception $e) {
            $this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage());
            $ok = false;
        }
523

524 525 526 527
        if ($ok) {
            $this->_log(LOG_INFO, "Successfully handled $info");
            $this->stats('handled', $queue);
        } else {
528
            $this->_log(LOG_WARNING, "Failed handling $info");
529 530
            // Requeing moves the item to the end of the line for its next try.
            // @fixme add a manual retry count
531
            $this->enqueue($item, $queue);
532
            $this->stats('requeued', $queue);
533 534
        }

535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
        return $ok;
    }

    /**
     * Check if a redelivered message has been run through enough
     * that we're going to give up on it.
     *
     * @param StompFrame $frame
     * @param array $message unserialized message body
     * @return boolean true if we should discard
     */
    protected function isDeadLetter($frame, $message)
    {
        if (isset($frame->headers['redelivered']) && $frame->headers['redelivered'] == 'true') {
	        // Message was redelivered, possibly indicating a previous failure.
            $msgId = $frame->headers['message-id'];
            $site = $message['site'];
            $queue = $message['handler'];
	        $msgInfo = "message $msgId for $site in queue $queue";

	        $deliveries = $this->incDeliveryCount($msgId);
	        if ($deliveries > common_config('queue', 'max_retries')) {
		        $info = "DEAD-LETTER FILE: Gave up after retry $deliveries on $msgInfo";

		        $outdir = common_config('queue', 'dead_letter_dir');
		        if ($outdir) {
    		        $filename = $outdir . "/$site-$queue-" . rawurlencode($msgId);
    		        $info .= ": dumping to $filename";
    		        file_put_contents($filename, $message['payload']);
		        }

		        common_log(LOG_ERR, $info);
		        return true;
	        } else {
	            common_log(LOG_INFO, "retry $deliveries on $msgInfo");
	        }
        }
        return false;
    }

    /**
     * Update count of times we've re-encountered this message recently,
     * triggered when we get a message marked as 'redelivered'.
     *
     * Requires a CLI-friendly cache configuration.
     *
     * @param string $msgId message-id header from message
     * @return int number of retries recorded
     */
    function incDeliveryCount($msgId)
    {
	    $count = 0;
587
	    $cache = Cache::instance();
588 589 590 591 592 593 594 595 596 597
	    if ($cache) {
		    $key = 'statusnet:stomp:message-retries:' . $msgId;
		    $count = $cache->increment($key);
		    if (!$count) {
			    $count = 1;
			    $cache->set($key, $count, null, 3600);
			    $got = $cache->get($key);
		    }
	    }
	    return $count;
598
    }
Evan Prodromou's avatar
Evan Prodromou committed
599

600 601 602
    /**
     * Process a control signal broadcast.
     *
603
     * @param int $idx connection index
604 605 606
     * @param array $frame Stomp frame
     * @return bool true to continue; false to stop further processing.
     */
607
    protected function handleControlSignal($idx, $frame)
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631
    {
        $message = trim($frame->body);
        if (strpos($message, ':') !== false) {
            list($event, $param) = explode(':', $message, 2);
        } else {
            $event = $message;
            $param = '';
        }

        $shutdown = false;

        if ($event == 'shutdown') {
            $this->master->requestShutdown();
            $shutdown = true;
        } else if ($event == 'restart') {
            $this->master->requestRestart();
            $shutdown = true;
        } else if ($event == 'update') {
            $this->updateSiteConfig($param);
        } else {
            $this->_log(LOG_ERR, "Ignoring unrecognized control message: $message");
        }
        return $shutdown;
    }
632

633 634 635 636 637 638
    /**
     * Switch site, if necessary, and reset current handler assignments
     * @param string $site
     */
    function switchSite($site)
    {
mmn's avatar
mmn committed
639
        if ($site != GNUsocial::currentSite()) {
640
            $this->stats('switch');
mmn's avatar
mmn committed
641
            GNUsocial::switchSite($site);
642 643 644 645
            $this->initialize();
        }
    }

646
    /**
647
     * (Re)load runtime configuration for a given site by nickname,
648 649
     * triggered by a broadcast to the 'statusnet-control' topic.
     *
650 651 652
     * Configuration changes in database should update, but config
     * files might not.
     *
653 654 655 656 657
     * @param array $frame Stomp frame
     * @return bool true to continue; false to stop further processing.
     */
    protected function updateSiteConfig($nickname)
    {
658
        $sn = Status_network::getKV('nickname', $nickname);
659 660 661 662
        if ($sn) {
            $this->switchSite($nickname);
            if (!in_array($nickname, $this->sites)) {
                $this->addSite();
663
            }
664
            $this->stats('siteupdate');
665
        } else {
666
            $this->_log(LOG_ERR, "Ignoring ping for unrecognized new site $nickname");
667 668 669
        }
    }

670 671
    /**
     * Combines the queue_basename from configuration with the
672
     * group name for this queue to give eg:
673
     *
674
     * /queue/statusnet/main
675 676
     * /queue/statusnet/main/distrib
     * /queue/statusnet/xmpp/xmppout/site01
677 678 679 680 681
     *
     * @param string $queue
     * @return string
     */
    protected function queueName($queue)
682
    {
683
        $group = $this->queueGroup($queue);
mmn's avatar
mmn committed
684
        $site = GNUsocial::currentSite();
685 686 687 688 689 690 691 692 693

        $specs = array("$group/$queue/$site",
                       "$group/$queue");
        foreach ($specs as $spec) {
            if (in_array($spec, $this->breakout)) {
                return $this->base . $spec;
            }
        }
        return $this->base . $group;
694
    }
695

696
    /**
697
     * Get the breakout mode for the given queue on the current site.
698
     *
699 700
     * @param string $queue
     * @return string one of 'shared', 'handler', 'site'
701
     */
702
    protected function breakoutMode($queue)
Evan Prodromou's avatar
Evan Prodromou committed
703
    {
704 705 706 707 708
        $breakout = common_config('queue', 'breakout');
        if (isset($breakout[$queue])) {
            return $breakout[$queue];
        } else if (isset($breakout['*'])) {
            return $breakout['*'];
709
        } else {
710
            return 'shared';
711
        }
Evan Prodromou's avatar
Evan Prodromou committed
712 713
    }

714
    protected function begin($idx)
715 716
    {
        if ($this->useTransactions) {
717
            if (!empty($this->transaction[$idx])) {
718 719
                throw new Exception("Tried to start transaction in the middle of a transaction");
            }
720 721 722
            $this->transactionCount[$idx]++;
            $this->transaction[$idx] = $this->master->id . '-' . $this->transactionCount[$idx] . '-' . time();
            $this->cons[$idx]->begin($this->transaction[$idx]);
723 724 725
        }
    }

726
    protected function ack($idx, $frame)
727
    {
728 729 730 731 732 733 734 735
        if ($this->useAcks) {
            if ($this->useTransactions) {
                if (empty($this->transaction[$idx])) {
                    throw new Exception("Tried to ack but not in a transaction");
                }
                $this->cons[$idx]->ack($frame, $this->transaction[$idx]);
            } else {
                $this->cons[$idx]->ack($frame);
736 737 738 739
            }
        }
    }

740
    protected function commit($idx)
741 742
    {
        if ($this->useTransactions) {
743
            if (empty($this->transaction[$idx])) {
744 745
                throw new Exception("Tried to commit but not in a transaction");
            }
746 747
            $this->cons[$idx]->commit($this->transaction[$idx]);
            $this->transaction[$idx] = null;
748 749 750
        }
    }

751
    protected function rollback($idx)
752 753
    {
        if ($this->useTransactions) {
754
            if (empty($this->transaction[$idx])) {
755 756
                throw new Exception("Tried to rollback but not in a transaction");
            }
757 758
            $this->cons[$idx]->commit($this->transaction[$idx]);
            $this->transaction[$idx] = null;
759 760
        }
    }
761
}
762