Upgraded drupal core with security updates
[yaffs-website] / web / core / modules / migrate / src / MigrateExecutable.php
1 <?php
2
3 namespace Drupal\migrate;
4
5 use Drupal\Component\Utility\Bytes;
6 use Drupal\Core\Utility\Error;
7 use Drupal\Core\StringTranslation\StringTranslationTrait;
8 use Drupal\migrate\Event\MigrateEvents;
9 use Drupal\migrate\Event\MigrateImportEvent;
10 use Drupal\migrate\Event\MigratePostRowSaveEvent;
11 use Drupal\migrate\Event\MigratePreRowSaveEvent;
12 use Drupal\migrate\Event\MigrateRollbackEvent;
13 use Drupal\migrate\Event\MigrateRowDeleteEvent;
14 use Drupal\migrate\Exception\RequirementsException;
15 use Drupal\migrate\Plugin\MigrateIdMapInterface;
16 use Drupal\migrate\Plugin\MigrationInterface;
17 use Symfony\Component\EventDispatcher\EventDispatcherInterface;
18
19 /**
20  * Defines a migrate executable class.
21  */
22 class MigrateExecutable implements MigrateExecutableInterface {
23   use StringTranslationTrait;
24
25   /**
26    * The configuration of the migration to do.
27    *
28    * @var \Drupal\migrate\Plugin\MigrationInterface
29    */
30   protected $migration;
31
32   /**
33    * Status of one row.
34    *
35    * The value is a MigrateIdMapInterface::STATUS_* constant, for example:
36    * STATUS_IMPORTED.
37    *
38    * @var int
39    */
40   protected $sourceRowStatus;
41
42   /**
43    * The ratio of the memory limit at which an operation will be interrupted.
44    *
45    * @var float
46    */
47   protected $memoryThreshold = 0.85;
48
49   /**
50    * The PHP memory_limit expressed in bytes.
51    *
52    * @var int
53    */
54   protected $memoryLimit;
55
56   /**
57    * The configuration values of the source.
58    *
59    * @var array
60    */
61   protected $sourceIdValues;
62
63   /**
64    * An array of counts. Initially used for cache hit/miss tracking.
65    *
66    * @var array
67    */
68   protected $counts = [];
69
70   /**
71    * The source.
72    *
73    * @var \Drupal\migrate\Plugin\MigrateSourceInterface
74    */
75   protected $source;
76
77   /**
78    * The event dispatcher.
79    *
80    * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
81    */
82   protected $eventDispatcher;
83
84   /**
85    * Migration message service.
86    *
87    * @todo https://www.drupal.org/node/2822663 Make this protected.
88    *
89    * @var \Drupal\migrate\MigrateMessageInterface
90    */
91   public $message;
92
93   /**
94    * Constructs a MigrateExecutable and verifies and sets the memory limit.
95    *
96    * @param \Drupal\migrate\Plugin\MigrationInterface $migration
97    *   The migration to run.
98    * @param \Drupal\migrate\MigrateMessageInterface $message
99    *   The message to record.
100    * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
101    *   The event dispatcher.
102    *
103    * @throws \Drupal\migrate\MigrateException
104    */
105   public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, EventDispatcherInterface $event_dispatcher = NULL) {
106     $this->migration = $migration;
107     $this->message = $message;
108     $this->migration->getIdMap()->setMessage($message);
109     $this->eventDispatcher = $event_dispatcher;
110     // Record the memory limit in bytes
111     $limit = trim(ini_get('memory_limit'));
112     if ($limit == '-1') {
113       $this->memoryLimit = PHP_INT_MAX;
114     }
115     else {
116       $this->memoryLimit = Bytes::toInt($limit);
117     }
118   }
119
120   /**
121    * Returns the source.
122    *
123    * Makes sure source is initialized based on migration settings.
124    *
125    * @return \Drupal\migrate\Plugin\MigrateSourceInterface
126    *   The source.
127    */
128   protected function getSource() {
129     if (!isset($this->source)) {
130       $this->source = $this->migration->getSourcePlugin();
131     }
132     return $this->source;
133   }
134
135   /**
136    * Gets the event dispatcher.
137    *
138    * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface
139    */
140   protected function getEventDispatcher() {
141     if (!$this->eventDispatcher) {
142       $this->eventDispatcher = \Drupal::service('event_dispatcher');
143     }
144     return $this->eventDispatcher;
145   }
146
147   /**
148    * {@inheritdoc}
149    */
150   public function import() {
151     // Only begin the import operation if the migration is currently idle.
152     if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
153       $this->message->display($this->t('Migration @id is busy with another operation: @status',
154         [
155           '@id' => $this->migration->id(),
156           '@status' => $this->t($this->migration->getStatusLabel()),
157         ]), 'error');
158       return MigrationInterface::RESULT_FAILED;
159     }
160     $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration, $this->message));
161
162     // Knock off migration if the requirements haven't been met.
163     try {
164       $this->migration->checkRequirements();
165     }
166     catch (RequirementsException $e) {
167       $this->message->display(
168         $this->t(
169           'Migration @id did not meet the requirements. @message @requirements',
170           [
171             '@id' => $this->migration->id(),
172             '@message' => $e->getMessage(),
173             '@requirements' => $e->getRequirementsString(),
174           ]
175         ),
176         'error'
177       );
178
179       return MigrationInterface::RESULT_FAILED;
180     }
181
182     $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING);
183     $return = MigrationInterface::RESULT_COMPLETED;
184     $source = $this->getSource();
185     $id_map = $this->migration->getIdMap();
186
187     try {
188       $source->rewind();
189     }
190     catch (\Exception $e) {
191       $this->message->display(
192         $this->t('Migration failed with source plugin exception: @e', ['@e' => $e->getMessage()]), 'error');
193       $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
194       return MigrationInterface::RESULT_FAILED;
195     }
196
197     $destination = $this->migration->getDestinationPlugin();
198     while ($source->valid()) {
199       $row = $source->current();
200       $this->sourceIdValues = $row->getSourceIdValues();
201
202       try {
203         $this->processRow($row);
204         $save = TRUE;
205       }
206       catch (MigrateException $e) {
207         $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus());
208         $this->saveMessage($e->getMessage(), $e->getLevel());
209         $save = FALSE;
210       }
211       catch (MigrateSkipRowException $e) {
212         if ($e->getSaveToMap()) {
213           $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
214         }
215         if ($message = trim($e->getMessage())) {
216           $this->saveMessage($message, MigrationInterface::MESSAGE_INFORMATIONAL);
217         }
218         $save = FALSE;
219       }
220
221       if ($save) {
222         try {
223           $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $this->message, $row));
224           $destination_id_values = $destination->import($row, $id_map->lookupDestinationId($this->sourceIdValues));
225           $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values));
226           if ($destination_id_values) {
227             // We do not save an idMap entry for config.
228             if ($destination_id_values !== TRUE) {
229               $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction());
230             }
231           }
232           else {
233             $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
234             if (!$id_map->messageCount()) {
235               $message = $this->t('New object was not saved, no error provided');
236               $this->saveMessage($message);
237               $this->message->display($message);
238             }
239           }
240         }
241         catch (MigrateException $e) {
242           $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus());
243           $this->saveMessage($e->getMessage(), $e->getLevel());
244         }
245         catch (\Exception $e) {
246           $this->migration->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
247           $this->handleException($e);
248         }
249       }
250
251       $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
252
253       // Check for memory exhaustion.
254       if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
255         break;
256       }
257
258       // If anyone has requested we stop, return the requested result.
259       if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
260         $return = $this->migration->getInterruptionResult();
261         $this->migration->clearInterruptionResult();
262         break;
263       }
264
265       try {
266         $source->next();
267       }
268       catch (\Exception $e) {
269         $this->message->display(
270           $this->t('Migration failed with source plugin exception: @e',
271             ['@e' => $e->getMessage()]), 'error');
272         $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
273         return MigrationInterface::RESULT_FAILED;
274       }
275     }
276
277     $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration, $this->message));
278     $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
279     return $return;
280   }
281
282   /**
283    * {@inheritdoc}
284    */
285   public function rollback() {
286     // Only begin the rollback operation if the migration is currently idle.
287     if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
288       $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error');
289       return MigrationInterface::RESULT_FAILED;
290     }
291
292     // Announce that rollback is about to happen.
293     $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration));
294
295     // Optimistically assume things are going to work out; if not, $return will be
296     // updated to some other status.
297     $return = MigrationInterface::RESULT_COMPLETED;
298
299     $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK);
300     $id_map = $this->migration->getIdMap();
301     $destination = $this->migration->getDestinationPlugin();
302
303     // Loop through each row in the map, and try to roll it back.
304     foreach ($id_map as $map_row) {
305       $destination_key = $id_map->currentDestination();
306       if ($destination_key) {
307         $map_row = $id_map->getRowByDestination($destination_key);
308         if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) {
309           $this->getEventDispatcher()
310             ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));
311           $destination->rollback($destination_key);
312           $this->getEventDispatcher()
313             ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));
314         }
315         // We're now done with this row, so remove it from the map.
316         $id_map->deleteDestination($destination_key);
317       }
318       else {
319         // If there is no destination key the import probably failed and we can
320         // remove the row without further action.
321         $source_key = $id_map->currentSource();
322         $id_map->delete($source_key);
323       }
324
325       // Check for memory exhaustion.
326       if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
327         break;
328       }
329
330       // If anyone has requested we stop, return the requested result.
331       if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
332         $return = $this->migration->getInterruptionResult();
333         $this->migration->clearInterruptionResult();
334         break;
335       }
336     }
337
338     // Notify modules that rollback attempt was complete.
339     $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration));
340     $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
341
342     return $return;
343   }
344
345   /**
346    * {@inheritdoc}
347    */
348   public function processRow(Row $row, array $process = NULL, $value = NULL) {
349     foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) {
350       $multiple = FALSE;
351       /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */
352       foreach ($plugins as $plugin) {
353         $definition = $plugin->getPluginDefinition();
354         // Many plugins expect a scalar value but the current value of the
355         // pipeline might be multiple scalars (this is set by the previous
356         // plugin) and in this case the current value needs to be iterated
357         // and each scalar separately transformed.
358         if ($multiple && !$definition['handle_multiples']) {
359           $new_value = [];
360           if (!is_array($value)) {
361             throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value));
362           }
363           $break = FALSE;
364           foreach ($value as $scalar_value) {
365             try {
366               $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
367             }
368             catch (MigrateSkipProcessException $e) {
369               $new_value[] = NULL;
370               $break = TRUE;
371             }
372           }
373           $value = $new_value;
374           if ($break) {
375             break;
376           }
377         }
378         else {
379           try {
380             $value = $plugin->transform($value, $this, $row, $destination);
381           }
382           catch (MigrateSkipProcessException $e) {
383             $value = NULL;
384             break;
385           }
386           $multiple = $plugin->multiple();
387         }
388       }
389       // No plugins or no value means do not set.
390       if ($plugins && !is_null($value)) {
391         $row->setDestinationProperty($destination, $value);
392       }
393       // Reset the value.
394       $value = NULL;
395     }
396   }
397
398   /**
399    * Fetches the key array for the current source record.
400    *
401    * @return array
402    *   The current source IDs.
403    */
404   protected function currentSourceIds() {
405     return $this->getSource()->getCurrentIds();
406   }
407
408   /**
409    * {@inheritdoc}
410    */
411   public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
412     $this->migration->getIdMap()->saveMessage($this->sourceIdValues, $message, $level);
413   }
414
415   /**
416    * Takes an Exception object and both saves and displays it.
417    *
418    * Pulls in additional information on the location triggering the exception.
419    *
420    * @param \Exception $exception
421    *   Object representing the exception.
422    * @param bool $save
423    *   (optional) Whether to save the message in the migration's mapping table.
424    *   Set to FALSE in contexts where this doesn't make sense.
425    */
426   protected function handleException(\Exception $exception, $save = TRUE) {
427     $result = Error::decodeException($exception);
428     $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';
429     if ($save) {
430       $this->saveMessage($message);
431     }
432     $this->message->display($message, 'error');
433   }
434
435   /**
436    * Checks for exceptional conditions, and display feedback.
437    */
438   protected function checkStatus() {
439     if ($this->memoryExceeded()) {
440       return MigrationInterface::RESULT_INCOMPLETE;
441     }
442     return MigrationInterface::RESULT_COMPLETED;
443   }
444
445   /**
446    * Tests whether we've exceeded the desired memory threshold.
447    *
448    * If so, output a message.
449    *
450    * @return bool
451    *   TRUE if the threshold is exceeded, otherwise FALSE.
452    */
453   protected function memoryExceeded() {
454     $usage = $this->getMemoryUsage();
455     $pct_memory = $usage / $this->memoryLimit;
456     if (!$threshold = $this->memoryThreshold) {
457       return FALSE;
458     }
459     if ($pct_memory > $threshold) {
460       $this->message->display(
461         $this->t(
462           'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.',
463           [
464             '@pct' => round($pct_memory * 100),
465             '@usage' => $this->formatSize($usage),
466             '@limit' => $this->formatSize($this->memoryLimit),
467           ]
468         ),
469         'warning'
470       );
471       $usage = $this->attemptMemoryReclaim();
472       $pct_memory = $usage / $this->memoryLimit;
473       // Use a lower threshold - we don't want to be in a situation where we keep
474       // coming back here and trimming a tiny amount
475       if ($pct_memory > (0.90 * $threshold)) {
476         $this->message->display(
477           $this->t(
478             'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch',
479             [
480               '@pct' => round($pct_memory * 100),
481               '@usage' => $this->formatSize($usage),
482               '@limit' => $this->formatSize($this->memoryLimit),
483             ]
484           ),
485           'warning'
486         );
487         return TRUE;
488       }
489       else {
490         $this->message->display(
491           $this->t(
492             'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing',
493             [
494               '@pct' => round($pct_memory * 100),
495               '@usage' => $this->formatSize($usage),
496               '@limit' => $this->formatSize($this->memoryLimit),
497             ]
498           ),
499           'warning');
500         return FALSE;
501       }
502     }
503     else {
504       return FALSE;
505     }
506   }
507
508   /**
509    * Returns the memory usage so far.
510    *
511    * @return int
512    *   The memory usage.
513    */
514   protected function getMemoryUsage() {
515     return memory_get_usage();
516   }
517
518   /**
519    * Tries to reclaim memory.
520    *
521    * @return int
522    *   The memory usage after reclaim.
523    */
524   protected function attemptMemoryReclaim() {
525     // First, try resetting Drupal's static storage - this frequently releases
526     // plenty of memory to continue.
527     drupal_static_reset();
528
529     // Entity storage can blow up with caches so clear them out.
530     $manager = \Drupal::entityManager();
531     foreach ($manager->getDefinitions() as $id => $definition) {
532       $manager->getStorage($id)->resetCache();
533     }
534
535     // @TODO: explore resetting the container.
536
537     return memory_get_usage();
538   }
539
540   /**
541    * Generates a string representation for the given byte count.
542    *
543    * @param int $size
544    *   A size in bytes.
545    *
546    * @return string
547    *   A translated string representation of the size.
548    */
549   protected function formatSize($size) {
550     return format_size($size);
551   }
552
553 }