Further Drupal 8.6.4 changes. Some core files were not committed before a commit...
[yaffs-website] / web / core / modules / migrate / src / MigrateExecutable.php
1 <?php
2
3 namespace Drupal\migrate;
4
5 use Drupal\Component\Utility\Bytes;
6 use Drupal\Core\Utility\Error;
7 use Drupal\Core\StringTranslation\StringTranslationTrait;
8 use Drupal\migrate\Event\MigrateEvents;
9 use Drupal\migrate\Event\MigrateImportEvent;
10 use Drupal\migrate\Event\MigratePostRowSaveEvent;
11 use Drupal\migrate\Event\MigratePreRowSaveEvent;
12 use Drupal\migrate\Event\MigrateRollbackEvent;
13 use Drupal\migrate\Event\MigrateRowDeleteEvent;
14 use Drupal\migrate\Exception\RequirementsException;
15 use Drupal\migrate\Plugin\MigrateIdMapInterface;
16 use Drupal\migrate\Plugin\MigrationInterface;
17 use Symfony\Component\EventDispatcher\EventDispatcherInterface;
18
19 /**
20  * Defines a migrate executable class.
21  */
22 class MigrateExecutable implements MigrateExecutableInterface {
23   use StringTranslationTrait;
24
25   /**
26    * The configuration of the migration to do.
27    *
28    * @var \Drupal\migrate\Plugin\MigrationInterface
29    */
30   protected $migration;
31
32   /**
33    * Status of one row.
34    *
35    * The value is a MigrateIdMapInterface::STATUS_* constant, for example:
36    * STATUS_IMPORTED.
37    *
38    * @var int
39    */
40   protected $sourceRowStatus;
41
42   /**
43    * The ratio of the memory limit at which an operation will be interrupted.
44    *
45    * @var float
46    */
47   protected $memoryThreshold = 0.85;
48
49   /**
50    * The PHP memory_limit expressed in bytes.
51    *
52    * @var int
53    */
54   protected $memoryLimit;
55
56   /**
57    * The configuration values of the source.
58    *
59    * @var array
60    */
61   protected $sourceIdValues;
62
63   /**
64    * An array of counts. Initially used for cache hit/miss tracking.
65    *
66    * @var array
67    */
68   protected $counts = [];
69
70   /**
71    * The source.
72    *
73    * @var \Drupal\migrate\Plugin\MigrateSourceInterface
74    */
75   protected $source;
76
77   /**
78    * The event dispatcher.
79    *
80    * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
81    */
82   protected $eventDispatcher;
83
84   /**
85    * Migration message service.
86    *
87    * @todo https://www.drupal.org/node/2822663 Make this protected.
88    *
89    * @var \Drupal\migrate\MigrateMessageInterface
90    */
91   public $message;
92
93   /**
94    * Constructs a MigrateExecutable and verifies and sets the memory limit.
95    *
96    * @param \Drupal\migrate\Plugin\MigrationInterface $migration
97    *   The migration to run.
98    * @param \Drupal\migrate\MigrateMessageInterface $message
99    *   (optional) The migrate message service.
100    * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
101    *   (optional) The event dispatcher.
102    *
103    * @throws \Drupal\migrate\MigrateException
104    */
105   public function __construct(MigrationInterface $migration, MigrateMessageInterface $message = NULL, EventDispatcherInterface $event_dispatcher = NULL) {
106     $this->migration = $migration;
107     $this->message = $message ?: new MigrateMessage();
108     $this->migration->getIdMap()->setMessage($this->message);
109     $this->eventDispatcher = $event_dispatcher;
110     // Record the memory limit in bytes
111     $limit = trim(ini_get('memory_limit'));
112     if ($limit == '-1') {
113       $this->memoryLimit = PHP_INT_MAX;
114     }
115     else {
116       $this->memoryLimit = Bytes::toInt($limit);
117     }
118   }
119
120   /**
121    * Returns the source.
122    *
123    * Makes sure source is initialized based on migration settings.
124    *
125    * @return \Drupal\migrate\Plugin\MigrateSourceInterface
126    *   The source.
127    */
128   protected function getSource() {
129     if (!isset($this->source)) {
130       $this->source = $this->migration->getSourcePlugin();
131     }
132     return $this->source;
133   }
134
135   /**
136    * Gets the event dispatcher.
137    *
138    * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface
139    */
140   protected function getEventDispatcher() {
141     if (!$this->eventDispatcher) {
142       $this->eventDispatcher = \Drupal::service('event_dispatcher');
143     }
144     return $this->eventDispatcher;
145   }
146
147   /**
148    * {@inheritdoc}
149    */
150   public function import() {
151     // Only begin the import operation if the migration is currently idle.
152     if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
153       $this->message->display($this->t('Migration @id is busy with another operation: @status',
154         [
155           '@id' => $this->migration->id(),
156           '@status' => $this->t($this->migration->getStatusLabel()),
157         ]), 'error');
158       return MigrationInterface::RESULT_FAILED;
159     }
160     $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration, $this->message));
161
162     // Knock off migration if the requirements haven't been met.
163     try {
164       $this->migration->checkRequirements();
165     }
166     catch (RequirementsException $e) {
167       $this->message->display(
168         $this->t(
169           'Migration @id did not meet the requirements. @message @requirements',
170           [
171             '@id' => $this->migration->id(),
172             '@message' => $e->getMessage(),
173             '@requirements' => $e->getRequirementsString(),
174           ]
175         ),
176         'error'
177       );
178
179       return MigrationInterface::RESULT_FAILED;
180     }
181
182     $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING);
183     $return = MigrationInterface::RESULT_COMPLETED;
184     $source = $this->getSource();
185     $id_map = $this->migration->getIdMap();
186
187     try {
188       $source->rewind();
189     }
190     catch (\Exception $e) {
191       $this->message->display(
192         $this->t('Migration failed with source plugin exception: @e', ['@e' => $e->getMessage()]), 'error');
193       $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
194       return MigrationInterface::RESULT_FAILED;
195     }
196
197     $destination = $this->migration->getDestinationPlugin();
198     while ($source->valid()) {
199       $row = $source->current();
200       $this->sourceIdValues = $row->getSourceIdValues();
201
202       try {
203         $this->processRow($row);
204         $save = TRUE;
205       }
206       catch (MigrateException $e) {
207         $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus());
208         $this->saveMessage($e->getMessage(), $e->getLevel());
209         $save = FALSE;
210       }
211       catch (MigrateSkipRowException $e) {
212         if ($e->getSaveToMap()) {
213           $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
214         }
215         if ($message = trim($e->getMessage())) {
216           $this->saveMessage($message, MigrationInterface::MESSAGE_INFORMATIONAL);
217         }
218         $save = FALSE;
219       }
220
221       if ($save) {
222         try {
223           $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $this->message, $row));
224           $destination_ids = $id_map->lookupDestinationIds($this->sourceIdValues);
225           $destination_id_values = $destination_ids ? reset($destination_ids) : [];
226           $destination_id_values = $destination->import($row, $destination_id_values);
227           $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values));
228           if ($destination_id_values) {
229             // We do not save an idMap entry for config.
230             if ($destination_id_values !== TRUE) {
231               $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction());
232             }
233           }
234           else {
235             $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
236             if (!$id_map->messageCount()) {
237               $message = $this->t('New object was not saved, no error provided');
238               $this->saveMessage($message);
239               $this->message->display($message);
240             }
241           }
242         }
243         catch (MigrateException $e) {
244           $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus());
245           $this->saveMessage($e->getMessage(), $e->getLevel());
246         }
247         catch (\Exception $e) {
248           $this->migration->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
249           $this->handleException($e);
250         }
251       }
252
253       $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
254
255       // Check for memory exhaustion.
256       if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
257         break;
258       }
259
260       // If anyone has requested we stop, return the requested result.
261       if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
262         $return = $this->migration->getInterruptionResult();
263         $this->migration->clearInterruptionResult();
264         break;
265       }
266
267       try {
268         $source->next();
269       }
270       catch (\Exception $e) {
271         $this->message->display(
272           $this->t('Migration failed with source plugin exception: @e',
273             ['@e' => $e->getMessage()]), 'error');
274         $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
275         return MigrationInterface::RESULT_FAILED;
276       }
277     }
278
279     $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration, $this->message));
280     $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
281     return $return;
282   }
283
284   /**
285    * {@inheritdoc}
286    */
287   public function rollback() {
288     // Only begin the rollback operation if the migration is currently idle.
289     if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
290       $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error');
291       return MigrationInterface::RESULT_FAILED;
292     }
293
294     // Announce that rollback is about to happen.
295     $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration));
296
297     // Optimistically assume things are going to work out; if not, $return will be
298     // updated to some other status.
299     $return = MigrationInterface::RESULT_COMPLETED;
300
301     $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK);
302     $id_map = $this->migration->getIdMap();
303     $destination = $this->migration->getDestinationPlugin();
304
305     // Loop through each row in the map, and try to roll it back.
306     foreach ($id_map as $map_row) {
307       $destination_key = $id_map->currentDestination();
308       if ($destination_key) {
309         $map_row = $id_map->getRowByDestination($destination_key);
310         if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) {
311           $this->getEventDispatcher()
312             ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));
313           $destination->rollback($destination_key);
314           $this->getEventDispatcher()
315             ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));
316         }
317         // We're now done with this row, so remove it from the map.
318         $id_map->deleteDestination($destination_key);
319       }
320       else {
321         // If there is no destination key the import probably failed and we can
322         // remove the row without further action.
323         $source_key = $id_map->currentSource();
324         $id_map->delete($source_key);
325       }
326
327       // Check for memory exhaustion.
328       if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
329         break;
330       }
331
332       // If anyone has requested we stop, return the requested result.
333       if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
334         $return = $this->migration->getInterruptionResult();
335         $this->migration->clearInterruptionResult();
336         break;
337       }
338     }
339
340     // Notify modules that rollback attempt was complete.
341     $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration));
342     $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
343
344     return $return;
345   }
346
347   /**
348    * {@inheritdoc}
349    */
350   public function processRow(Row $row, array $process = NULL, $value = NULL) {
351     foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) {
352       $multiple = FALSE;
353       /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */
354       foreach ($plugins as $plugin) {
355         $definition = $plugin->getPluginDefinition();
356         // Many plugins expect a scalar value but the current value of the
357         // pipeline might be multiple scalars (this is set by the previous
358         // plugin) and in this case the current value needs to be iterated
359         // and each scalar separately transformed.
360         if ($multiple && !$definition['handle_multiples']) {
361           $new_value = [];
362           if (!is_array($value)) {
363             throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value));
364           }
365           $break = FALSE;
366           foreach ($value as $scalar_value) {
367             try {
368               $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
369             }
370             catch (MigrateSkipProcessException $e) {
371               $new_value[] = NULL;
372               $break = TRUE;
373             }
374           }
375           $value = $new_value;
376           if ($break) {
377             break;
378           }
379         }
380         else {
381           try {
382             $value = $plugin->transform($value, $this, $row, $destination);
383           }
384           catch (MigrateSkipProcessException $e) {
385             $value = NULL;
386             break;
387           }
388           $multiple = $plugin->multiple();
389         }
390       }
391       // Ensure all values, including nulls, are migrated.
392       if ($plugins) {
393         if (isset($value)) {
394           $row->setDestinationProperty($destination, $value);
395         }
396         else {
397           $row->setEmptyDestinationProperty($destination);
398         }
399       }
400       // Reset the value.
401       $value = NULL;
402     }
403   }
404
405   /**
406    * Fetches the key array for the current source record.
407    *
408    * @return array
409    *   The current source IDs.
410    */
411   protected function currentSourceIds() {
412     return $this->getSource()->getCurrentIds();
413   }
414
415   /**
416    * {@inheritdoc}
417    */
418   public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
419     $this->migration->getIdMap()->saveMessage($this->sourceIdValues, $message, $level);
420   }
421
422   /**
423    * Takes an Exception object and both saves and displays it.
424    *
425    * Pulls in additional information on the location triggering the exception.
426    *
427    * @param \Exception $exception
428    *   Object representing the exception.
429    * @param bool $save
430    *   (optional) Whether to save the message in the migration's mapping table.
431    *   Set to FALSE in contexts where this doesn't make sense.
432    */
433   protected function handleException(\Exception $exception, $save = TRUE) {
434     $result = Error::decodeException($exception);
435     $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';
436     if ($save) {
437       $this->saveMessage($message);
438     }
439     $this->message->display($message, 'error');
440   }
441
442   /**
443    * Checks for exceptional conditions, and display feedback.
444    */
445   protected function checkStatus() {
446     if ($this->memoryExceeded()) {
447       return MigrationInterface::RESULT_INCOMPLETE;
448     }
449     return MigrationInterface::RESULT_COMPLETED;
450   }
451
452   /**
453    * Tests whether we've exceeded the desired memory threshold.
454    *
455    * If so, output a message.
456    *
457    * @return bool
458    *   TRUE if the threshold is exceeded, otherwise FALSE.
459    */
460   protected function memoryExceeded() {
461     $usage = $this->getMemoryUsage();
462     $pct_memory = $usage / $this->memoryLimit;
463     if (!$threshold = $this->memoryThreshold) {
464       return FALSE;
465     }
466     if ($pct_memory > $threshold) {
467       $this->message->display(
468         $this->t(
469           'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.',
470           [
471             '@pct' => round($pct_memory * 100),
472             '@usage' => $this->formatSize($usage),
473             '@limit' => $this->formatSize($this->memoryLimit),
474           ]
475         ),
476         'warning'
477       );
478       $usage = $this->attemptMemoryReclaim();
479       $pct_memory = $usage / $this->memoryLimit;
480       // Use a lower threshold - we don't want to be in a situation where we keep
481       // coming back here and trimming a tiny amount
482       if ($pct_memory > (0.90 * $threshold)) {
483         $this->message->display(
484           $this->t(
485             'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch',
486             [
487               '@pct' => round($pct_memory * 100),
488               '@usage' => $this->formatSize($usage),
489               '@limit' => $this->formatSize($this->memoryLimit),
490             ]
491           ),
492           'warning'
493         );
494         return TRUE;
495       }
496       else {
497         $this->message->display(
498           $this->t(
499             'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing',
500             [
501               '@pct' => round($pct_memory * 100),
502               '@usage' => $this->formatSize($usage),
503               '@limit' => $this->formatSize($this->memoryLimit),
504             ]
505           ),
506           'warning');
507         return FALSE;
508       }
509     }
510     else {
511       return FALSE;
512     }
513   }
514
515   /**
516    * Returns the memory usage so far.
517    *
518    * @return int
519    *   The memory usage.
520    */
521   protected function getMemoryUsage() {
522     return memory_get_usage();
523   }
524
525   /**
526    * Tries to reclaim memory.
527    *
528    * @return int
529    *   The memory usage after reclaim.
530    */
531   protected function attemptMemoryReclaim() {
532     // First, try resetting Drupal's static storage - this frequently releases
533     // plenty of memory to continue.
534     drupal_static_reset();
535
536     // Entity storage can blow up with caches so clear them out.
537     $manager = \Drupal::entityManager();
538     foreach ($manager->getDefinitions() as $id => $definition) {
539       $manager->getStorage($id)->resetCache();
540     }
541
542     // @TODO: explore resetting the container.
543
544     // Run garbage collector to further reduce memory.
545     gc_collect_cycles();
546
547     return memory_get_usage();
548   }
549
550   /**
551    * Generates a string representation for the given byte count.
552    *
553    * @param int $size
554    *   A size in bytes.
555    *
556    * @return string
557    *   A translated string representation of the size.
558    */
559   protected function formatSize($size) {
560     return format_size($size);
561   }
562
563 }