5 use Drush\Log\LogLevel;
8 class Queue7 extends QueueBase {
13 public function getQueues() {
14 if (!isset(static::$queues)) {
15 static::$queues = module_invoke_all('cron_queue_info');
16 drupal_alter('cron_queue_info', static::$queues);
17 // Merge in queues from modules that implement hook_queue_info.
18 // Currently only defined by the queue_ui module.
19 $info_queues = module_invoke_all('queue_info');
20 foreach ($info_queues as $name => $queue) {
21 static::$queues[$name]['worker callback'] = $queue['cron']['callback'];
22 if (isset($queue['cron']['time'])) {
23 static::$queues[$name]['time'] = $queue['cron']['time'];
27 return static::$queues;
33 * @return \DrupalQueueInterface
35 public function getQueue($name) {
36 return DrupalQueue::get($name);
42 public function run($name, $time_limit = 0) {
43 $info = $this->getInfo($name);
44 $function = $info['worker callback'];
45 $end = time() + $time_limit;
46 $queue = $this->getQueue($name);
49 while ((!$time_limit || time() < $end) && ($item = $queue->claimItem())) {
51 drush_log(dt('Processing item @id from @name queue.', array('@name' => $name, 'id' => $item->item_id)), LogLevel::INFO);
52 $function($item->data);
53 $queue->deleteItem($item);
56 catch (\Exception $e) {
57 // In case of exception log it and leave the item in the queue
58 // to be processed again later.
59 drush_set_error('DRUSH_QUEUE_EXCEPTION', $e->getMessage());