Version 1
[yaffs-website] / vendor / drush / drush / lib / Drush / Queue / Queue8.php
1 <?php
2
3 namespace Drush\Queue;
4
5 use Drush\Log\LogLevel;
6 use Drupal\Core\Queue\QueueWorkerManager;
7 use Drupal\Core\Queue\RequeueException;
8 use Drupal\Core\Queue\SuspendQueueException;
9
10 class Queue8 extends QueueBase {
11
12   /**
13    * @var \Drupal\Core\Queue\QueueWorkerManager
14    */
15   protected $workerManager;
16
17   /**
18    * Set the queue worker manager.
19    */
20   public function __construct(QueueWorkerManager $manager = NULL) {
21     $this->workerManager = $manager ?: \Drupal::service('plugin.manager.queue_worker');
22   }
23
24   /**
25    * {@inheritdoc}
26    */
27   public function getQueues() {
28     if (!isset(static::$queues)) {
29       static::$queues = array();
30       foreach ($this->workerManager->getDefinitions() as $name => $info) {
31         static::$queues[$name] = $info;
32       }
33     }
34     return static::$queues;
35   }
36
37   /**
38    * {@inheritdoc}
39    *
40    * @return \Drupal\Core\Queue\QueueInterface
41    */
42   public function getQueue($name) {
43     return \Drupal::queue($name);
44   }
45
46   /**
47    * {@inheritdoc}
48    */
49   public function run($name, $time_limit = 0) {
50     $worker = $this->workerManager->createInstance($name);
51     $end = time() + $time_limit;
52     $queue = $this->getQueue($name);
53     $count = 0;
54
55     while ((!$time_limit || time() < $end) && ($item = $queue->claimItem())) {
56       try {
57         drush_log(dt('Processing item @id from @name queue.', array('@name' => $name, '@id' => $item->item_id)), LogLevel::INFO);
58         $worker->processItem($item->data);
59         $queue->deleteItem($item);
60         $count++;
61       }
62       catch (RequeueException $e) {
63         // The worker requested the task to be immediately requeued.
64         $queue->releaseItem($item);
65       }
66       catch (SuspendQueueException $e) {
67         // If the worker indicates there is a problem with the whole queue,
68         // release the item and skip to the next queue.
69         $queue->releaseItem($item);
70         drush_set_error('DRUSH_SUSPEND_QUEUE_EXCEPTION', $e->getMessage());
71       }
72       catch (\Exception $e) {
73         // In case of any other kind of exception, log it and leave the item
74         // in the queue to be processed again later.
75         drush_set_error('DRUSH_QUEUE_EXCEPTION', $e->getMessage());
76       }
77     }
78
79     return $count;
80   }
81
82 }