Security update for Core, with self-updated composer
[yaffs-website] / web / core / lib / Drupal / Core / Queue / DatabaseQueue.php
1 <?php
2
3 namespace Drupal\Core\Queue;
4
5 use Drupal\Core\Database\Connection;
6 use Drupal\Core\Database\SchemaObjectExistsException;
7 use Drupal\Core\DependencyInjection\DependencySerializationTrait;
8
9 /**
10  * Default queue implementation.
11  *
12  * @ingroup queue
13  */
14 class DatabaseQueue implements ReliableQueueInterface, QueueGarbageCollectionInterface {
15
16   use DependencySerializationTrait;
17
18   /**
19    * The database table name.
20    */
21   const TABLE_NAME = 'queue';
22
23   /**
24    * The name of the queue this instance is working with.
25    *
26    * @var string
27    */
28   protected $name;
29
30   /**
31    * The database connection.
32    *
33    * @var \Drupal\Core\Database\Connection
34    */
35   protected $connection;
36
37   /**
38    * Constructs a \Drupal\Core\Queue\DatabaseQueue object.
39    *
40    * @param string $name
41    *   The name of the queue.
42    * @param \Drupal\Core\Database\Connection $connection
43    *   The Connection object containing the key-value tables.
44    */
45   public function __construct($name, Connection $connection) {
46     $this->name = $name;
47     $this->connection = $connection;
48   }
49
50   /**
51    * {@inheritdoc}
52    */
53   public function createItem($data) {
54     $try_again = FALSE;
55     try {
56       $id = $this->doCreateItem($data);
57     }
58     catch (\Exception $e) {
59       // If there was an exception, try to create the table.
60       if (!$try_again = $this->ensureTableExists()) {
61         // If the exception happened for other reason than the missing table,
62         // propagate the exception.
63         throw $e;
64       }
65     }
66     // Now that the table has been created, try again if necessary.
67     if ($try_again) {
68       $id = $this->doCreateItem($data);
69     }
70     return $id;
71   }
72
73   /**
74    * Adds a queue item and store it directly to the queue.
75    *
76    * @param $data
77    *   Arbitrary data to be associated with the new task in the queue.
78    *
79    * @return
80    *   A unique ID if the item was successfully created and was (best effort)
81    *   added to the queue, otherwise FALSE. We don't guarantee the item was
82    *   committed to disk etc, but as far as we know, the item is now in the
83    *   queue.
84    */
85   protected function doCreateItem($data) {
86     $query = $this->connection->insert(static::TABLE_NAME)
87       ->fields([
88         'name' => $this->name,
89         'data' => serialize($data),
90         // We cannot rely on REQUEST_TIME because many items might be created
91         // by a single request which takes longer than 1 second.
92         'created' => time(),
93       ]);
94     // Return the new serial ID, or FALSE on failure.
95     return $query->execute();
96   }
97
98   /**
99    * {@inheritdoc}
100    */
101   public function numberOfItems() {
102     try {
103       return $this->connection->query('SELECT COUNT(item_id) FROM {' . static::TABLE_NAME . '} WHERE name = :name', [':name' => $this->name])
104         ->fetchField();
105     }
106     catch (\Exception $e) {
107       $this->catchException($e);
108       // If there is no table there cannot be any items.
109       return 0;
110     }
111   }
112
113   /**
114    * {@inheritdoc}
115    */
116   public function claimItem($lease_time = 30) {
117     // Claim an item by updating its expire fields. If claim is not successful
118     // another thread may have claimed the item in the meantime. Therefore loop
119     // until an item is successfully claimed or we are reasonably sure there
120     // are no unclaimed items left.
121     while (TRUE) {
122       try {
123         $item = $this->connection->queryRange('SELECT data, created, item_id FROM {' . static::TABLE_NAME . '} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, [':name' => $this->name])->fetchObject();
124       }
125       catch (\Exception $e) {
126         $this->catchException($e);
127         // If the table does not exist there are no items currently available to
128         // claim.
129         return FALSE;
130       }
131       if ($item) {
132         // Try to update the item. Only one thread can succeed in UPDATEing the
133         // same row. We cannot rely on REQUEST_TIME because items might be
134         // claimed by a single consumer which runs longer than 1 second. If we
135         // continue to use REQUEST_TIME instead of the current time(), we steal
136         // time from the lease, and will tend to reset items before the lease
137         // should really expire.
138         $update = $this->connection->update(static::TABLE_NAME)
139           ->fields([
140             'expire' => time() + $lease_time,
141           ])
142           ->condition('item_id', $item->item_id)
143           ->condition('expire', 0);
144         // If there are affected rows, this update succeeded.
145         if ($update->execute()) {
146           $item->data = unserialize($item->data);
147           return $item;
148         }
149       }
150       else {
151         // No items currently available to claim.
152         return FALSE;
153       }
154     }
155   }
156
157   /**
158    * {@inheritdoc}
159    */
160   public function releaseItem($item) {
161     try {
162       $update = $this->connection->update(static::TABLE_NAME)
163         ->fields([
164           'expire' => 0,
165         ])
166         ->condition('item_id', $item->item_id);
167       return $update->execute();
168     }
169     catch (\Exception $e) {
170       $this->catchException($e);
171       // If the table doesn't exist we should consider the item released.
172       return TRUE;
173     }
174   }
175
176   /**
177    * {@inheritdoc}
178    */
179   public function deleteItem($item) {
180     try {
181       $this->connection->delete(static::TABLE_NAME)
182         ->condition('item_id', $item->item_id)
183         ->execute();
184     }
185     catch (\Exception $e) {
186       $this->catchException($e);
187     }
188   }
189
190   /**
191    * {@inheritdoc}
192    */
193   public function createQueue() {
194     // All tasks are stored in a single database table (which is created on
195     // demand) so there is nothing we need to do to create a new queue.
196   }
197
198   /**
199    * {@inheritdoc}
200    */
201   public function deleteQueue() {
202     try {
203       $this->connection->delete(static::TABLE_NAME)
204         ->condition('name', $this->name)
205         ->execute();
206     }
207     catch (\Exception $e) {
208       $this->catchException($e);
209     }
210   }
211
212   /**
213    * {@inheritdoc}
214    */
215   public function garbageCollection() {
216     try {
217       // Clean up the queue for failed batches.
218       $this->connection->delete(static::TABLE_NAME)
219         ->condition('created', REQUEST_TIME - 864000, '<')
220         ->condition('name', 'drupal_batch:%', 'LIKE')
221         ->execute();
222
223       // Reset expired items in the default queue implementation table. If that's
224       // not used, this will simply be a no-op.
225       $this->connection->update(static::TABLE_NAME)
226         ->fields([
227           'expire' => 0,
228         ])
229         ->condition('expire', 0, '<>')
230         ->condition('expire', REQUEST_TIME, '<')
231         ->execute();
232     }
233     catch (\Exception $e) {
234       $this->catchException($e);
235     }
236   }
237
238   /**
239    * Check if the table exists and create it if not.
240    */
241   protected function ensureTableExists() {
242     try {
243       $database_schema = $this->connection->schema();
244       if (!$database_schema->tableExists(static::TABLE_NAME)) {
245         $schema_definition = $this->schemaDefinition();
246         $database_schema->createTable(static::TABLE_NAME, $schema_definition);
247         return TRUE;
248       }
249     }
250     // If another process has already created the queue table, attempting to
251     // recreate it will throw an exception. In this case just catch the
252     // exception and do nothing.
253     catch (SchemaObjectExistsException $e) {
254       return TRUE;
255     }
256     return FALSE;
257   }
258
259   /**
260    * Act on an exception when queue might be stale.
261    *
262    * If the table does not yet exist, that's fine, but if the table exists and
263    * yet the query failed, then the queue is stale and the exception needs to
264    * propagate.
265    *
266    * @param $e
267    *   The exception.
268    *
269    * @throws \Exception
270    *   If the table exists the exception passed in is rethrown.
271    */
272   protected function catchException(\Exception $e) {
273     if ($this->connection->schema()->tableExists(static::TABLE_NAME)) {
274       throw $e;
275     }
276   }
277
278   /**
279    * Defines the schema for the queue table.
280    *
281    * @internal
282    */
283   public function schemaDefinition() {
284     return [
285       'description' => 'Stores items in queues.',
286       'fields' => [
287         'item_id' => [
288           'type' => 'serial',
289           'unsigned' => TRUE,
290           'not null' => TRUE,
291           'description' => 'Primary Key: Unique item ID.',
292         ],
293         'name' => [
294           'type' => 'varchar_ascii',
295           'length' => 255,
296           'not null' => TRUE,
297           'default' => '',
298           'description' => 'The queue name.',
299         ],
300         'data' => [
301           'type' => 'blob',
302           'not null' => FALSE,
303           'size' => 'big',
304           'serialize' => TRUE,
305           'description' => 'The arbitrary data for the item.',
306         ],
307         'expire' => [
308           'type' => 'int',
309           'not null' => TRUE,
310           'default' => 0,
311           'description' => 'Timestamp when the claim lease expires on the item.',
312         ],
313         'created' => [
314           'type' => 'int',
315           'not null' => TRUE,
316           'default' => 0,
317           'description' => 'Timestamp when the item was created.',
318         ],
319       ],
320       'primary key' => ['item_id'],
321       'indexes' => [
322         'name_created' => ['name', 'created'],
323         'expire' => ['expire'],
324       ],
325     ];
326   }
327
328 }