5 use Drush\Log\LogLevel;
6 use Drupal\Core\Queue\QueueWorkerManager;
7 use Drupal\Core\Queue\RequeueException;
8 use Drupal\Core\Queue\SuspendQueueException;
10 class Queue8 extends QueueBase {
13 * @var \Drupal\Core\Queue\QueueWorkerManager
15 protected $workerManager;
18 * Set the queue worker manager.
20 public function __construct(QueueWorkerManager $manager = NULL) {
21 $this->workerManager = $manager ?: \Drupal::service('plugin.manager.queue_worker');
27 public function getQueues() {
28 if (!isset(static::$queues)) {
29 static::$queues = array();
30 foreach ($this->workerManager->getDefinitions() as $name => $info) {
31 static::$queues[$name] = $info;
34 return static::$queues;
40 * @return \Drupal\Core\Queue\QueueInterface
42 public function getQueue($name) {
43 return \Drupal::queue($name);
49 public function run($name, $time_limit = 0) {
50 $worker = $this->workerManager->createInstance($name);
51 $end = time() + $time_limit;
52 $queue = $this->getQueue($name);
55 while ((!$time_limit || time() < $end) && ($item = $queue->claimItem())) {
57 drush_log(dt('Processing item @id from @name queue.', array('@name' => $name, '@id' => $item->item_id)), LogLevel::INFO);
58 $worker->processItem($item->data);
59 $queue->deleteItem($item);
62 catch (RequeueException $e) {
63 // The worker requested the task to be immediately requeued.
64 $queue->releaseItem($item);
66 catch (SuspendQueueException $e) {
67 // If the worker indicates there is a problem with the whole queue,
68 // release the item and skip to the next queue.
69 $queue->releaseItem($item);
70 drush_set_error('DRUSH_SUSPEND_QUEUE_EXCEPTION', $e->getMessage());
72 catch (\Exception $e) {
73 // In case of any other kind of exception, log it and leave the item
74 // in the queue to be processed again later.
75 drush_set_error('DRUSH_QUEUE_EXCEPTION', $e->getMessage());