$queue) { static::$queues[$name]['worker callback'] = $queue['cron']['callback']; if (isset($queue['cron']['time'])) { static::$queues[$name]['time'] = $queue['cron']['time']; } } } return static::$queues; } /** * {@inheritdoc} * * @return \DrupalQueueInterface */ public function getQueue($name) { return DrupalQueue::get($name); } /** * {@inheritdoc} */ public function run($name, $time_limit = 0) { $info = $this->getInfo($name); $function = $info['worker callback']; $end = time() + $time_limit; $queue = $this->getQueue($name); $count = 0; while ((!$time_limit || time() < $end) && ($item = $queue->claimItem())) { try { drush_log(dt('Processing item @id from @name queue.', array('@name' => $name, 'id' => $item->item_id)), LogLevel::INFO); $function($item->data); $queue->deleteItem($item); $count++; } catch (\Exception $e) { // In case of exception log it and leave the item in the queue // to be processed again later. drush_set_error('DRUSH_QUEUE_EXCEPTION', $e->getMessage()); } } return $count; } }