3 namespace Drupal\migrate;
5 use Drupal\Component\Utility\Bytes;
6 use Drupal\Core\Utility\Error;
7 use Drupal\Core\StringTranslation\StringTranslationTrait;
8 use Drupal\migrate\Event\MigrateEvents;
9 use Drupal\migrate\Event\MigrateImportEvent;
10 use Drupal\migrate\Event\MigratePostRowSaveEvent;
11 use Drupal\migrate\Event\MigratePreRowSaveEvent;
12 use Drupal\migrate\Event\MigrateRollbackEvent;
13 use Drupal\migrate\Event\MigrateRowDeleteEvent;
14 use Drupal\migrate\Exception\RequirementsException;
15 use Drupal\migrate\Plugin\MigrateIdMapInterface;
16 use Drupal\migrate\Plugin\MigrationInterface;
17 use Symfony\Component\EventDispatcher\EventDispatcherInterface;
20 * Defines a migrate executable class.
22 class MigrateExecutable implements MigrateExecutableInterface {
23 use StringTranslationTrait;
26 * The configuration of the migration to do.
28 * @var \Drupal\migrate\Plugin\MigrationInterface
35 * The value is a MigrateIdMapInterface::STATUS_* constant, for example:
40 protected $sourceRowStatus;
43 * The ratio of the memory limit at which an operation will be interrupted.
47 protected $memoryThreshold = 0.85;
50 * The PHP memory_limit expressed in bytes.
54 protected $memoryLimit;
57 * The configuration values of the source.
61 protected $sourceIdValues;
64 * An array of counts. Initially used for cache hit/miss tracking.
68 protected $counts = [];
73 * @var \Drupal\migrate\Plugin\MigrateSourceInterface
78 * The event dispatcher.
80 * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
82 protected $eventDispatcher;
85 * Migration message service.
87 * @todo https://www.drupal.org/node/2822663 Make this protected.
89 * @var \Drupal\migrate\MigrateMessageInterface
94 * Constructs a MigrateExecutable and verifies and sets the memory limit.
96 * @param \Drupal\migrate\Plugin\MigrationInterface $migration
97 * The migration to run.
98 * @param \Drupal\migrate\MigrateMessageInterface $message
99 * (optional) The migrate message service.
100 * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
101 * (optional) The event dispatcher.
103 * @throws \Drupal\migrate\MigrateException
105 public function __construct(MigrationInterface $migration, MigrateMessageInterface $message = NULL, EventDispatcherInterface $event_dispatcher = NULL) {
106 $this->migration = $migration;
107 $this->message = $message ?: new MigrateMessage();
108 $this->migration->getIdMap()->setMessage($this->message);
109 $this->eventDispatcher = $event_dispatcher;
110 // Record the memory limit in bytes
111 $limit = trim(ini_get('memory_limit'));
112 if ($limit == '-1') {
113 $this->memoryLimit = PHP_INT_MAX;
116 $this->memoryLimit = Bytes::toInt($limit);
121 * Returns the source.
123 * Makes sure source is initialized based on migration settings.
125 * @return \Drupal\migrate\Plugin\MigrateSourceInterface
128 protected function getSource() {
129 if (!isset($this->source)) {
130 $this->source = $this->migration->getSourcePlugin();
132 return $this->source;
136 * Gets the event dispatcher.
138 * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface
140 protected function getEventDispatcher() {
141 if (!$this->eventDispatcher) {
142 $this->eventDispatcher = \Drupal::service('event_dispatcher');
144 return $this->eventDispatcher;
150 public function import() {
151 // Only begin the import operation if the migration is currently idle.
152 if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
153 $this->message->display($this->t('Migration @id is busy with another operation: @status',
155 '@id' => $this->migration->id(),
156 '@status' => $this->t($this->migration->getStatusLabel()),
158 return MigrationInterface::RESULT_FAILED;
160 $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration, $this->message));
162 // Knock off migration if the requirements haven't been met.
164 $this->migration->checkRequirements();
166 catch (RequirementsException $e) {
167 $this->message->display(
169 'Migration @id did not meet the requirements. @message @requirements',
171 '@id' => $this->migration->id(),
172 '@message' => $e->getMessage(),
173 '@requirements' => $e->getRequirementsString(),
179 return MigrationInterface::RESULT_FAILED;
182 $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING);
183 $return = MigrationInterface::RESULT_COMPLETED;
184 $source = $this->getSource();
185 $id_map = $this->migration->getIdMap();
190 catch (\Exception $e) {
191 $this->message->display(
192 $this->t('Migration failed with source plugin exception: @e', ['@e' => $e->getMessage()]), 'error');
193 $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
194 return MigrationInterface::RESULT_FAILED;
197 $destination = $this->migration->getDestinationPlugin();
198 while ($source->valid()) {
199 $row = $source->current();
200 $this->sourceIdValues = $row->getSourceIdValues();
203 $this->processRow($row);
206 catch (MigrateException $e) {
207 $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus());
208 $this->saveMessage($e->getMessage(), $e->getLevel());
211 catch (MigrateSkipRowException $e) {
212 if ($e->getSaveToMap()) {
213 $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
215 if ($message = trim($e->getMessage())) {
216 $this->saveMessage($message, MigrationInterface::MESSAGE_INFORMATIONAL);
223 $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $this->message, $row));
224 $destination_ids = $id_map->lookupDestinationIds($this->sourceIdValues);
225 $destination_id_values = $destination_ids ? reset($destination_ids) : [];
226 $destination_id_values = $destination->import($row, $destination_id_values);
227 $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values));
228 if ($destination_id_values) {
229 // We do not save an idMap entry for config.
230 if ($destination_id_values !== TRUE) {
231 $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction());
235 $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
236 if (!$id_map->messageCount()) {
237 $message = $this->t('New object was not saved, no error provided');
238 $this->saveMessage($message);
239 $this->message->display($message);
243 catch (MigrateException $e) {
244 $this->migration->getIdMap()->saveIdMapping($row, [], $e->getStatus());
245 $this->saveMessage($e->getMessage(), $e->getLevel());
247 catch (\Exception $e) {
248 $this->migration->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);
249 $this->handleException($e);
253 $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;
255 // Check for memory exhaustion.
256 if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
260 // If anyone has requested we stop, return the requested result.
261 if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
262 $return = $this->migration->getInterruptionResult();
263 $this->migration->clearInterruptionResult();
270 catch (\Exception $e) {
271 $this->message->display(
272 $this->t('Migration failed with source plugin exception: @e',
273 ['@e' => $e->getMessage()]), 'error');
274 $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
275 return MigrationInterface::RESULT_FAILED;
279 $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration, $this->message));
280 $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
287 public function rollback() {
288 // Only begin the rollback operation if the migration is currently idle.
289 if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {
290 $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error');
291 return MigrationInterface::RESULT_FAILED;
294 // Announce that rollback is about to happen.
295 $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration));
297 // Optimistically assume things are going to work out; if not, $return will be
298 // updated to some other status.
299 $return = MigrationInterface::RESULT_COMPLETED;
301 $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK);
302 $id_map = $this->migration->getIdMap();
303 $destination = $this->migration->getDestinationPlugin();
305 // Loop through each row in the map, and try to roll it back.
306 foreach ($id_map as $map_row) {
307 $destination_key = $id_map->currentDestination();
308 if ($destination_key) {
309 $map_row = $id_map->getRowByDestination($destination_key);
310 if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) {
311 $this->getEventDispatcher()
312 ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));
313 $destination->rollback($destination_key);
314 $this->getEventDispatcher()
315 ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));
317 // We're now done with this row, so remove it from the map.
318 $id_map->deleteDestination($destination_key);
321 // If there is no destination key the import probably failed and we can
322 // remove the row without further action.
323 $source_key = $id_map->currentSource();
324 $id_map->delete($source_key);
327 // Check for memory exhaustion.
328 if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {
332 // If anyone has requested we stop, return the requested result.
333 if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {
334 $return = $this->migration->getInterruptionResult();
335 $this->migration->clearInterruptionResult();
340 // Notify modules that rollback attempt was complete.
341 $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration));
342 $this->migration->setStatus(MigrationInterface::STATUS_IDLE);
350 public function processRow(Row $row, array $process = NULL, $value = NULL) {
351 foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) {
353 /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */
354 foreach ($plugins as $plugin) {
355 $definition = $plugin->getPluginDefinition();
356 // Many plugins expect a scalar value but the current value of the
357 // pipeline might be multiple scalars (this is set by the previous
358 // plugin) and in this case the current value needs to be iterated
359 // and each scalar separately transformed.
360 if ($multiple && !$definition['handle_multiples']) {
362 if (!is_array($value)) {
363 throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value));
366 foreach ($value as $scalar_value) {
368 $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);
370 catch (MigrateSkipProcessException $e) {
382 $value = $plugin->transform($value, $this, $row, $destination);
384 catch (MigrateSkipProcessException $e) {
388 $multiple = $plugin->multiple();
391 // Ensure all values, including nulls, are migrated.
394 $row->setDestinationProperty($destination, $value);
397 $row->setEmptyDestinationProperty($destination);
406 * Fetches the key array for the current source record.
409 * The current source IDs.
411 protected function currentSourceIds() {
412 return $this->getSource()->getCurrentIds();
418 public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {
419 $this->migration->getIdMap()->saveMessage($this->sourceIdValues, $message, $level);
423 * Takes an Exception object and both saves and displays it.
425 * Pulls in additional information on the location triggering the exception.
427 * @param \Exception $exception
428 * Object representing the exception.
430 * (optional) Whether to save the message in the migration's mapping table.
431 * Set to FALSE in contexts where this doesn't make sense.
433 protected function handleException(\Exception $exception, $save = TRUE) {
434 $result = Error::decodeException($exception);
435 $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';
437 $this->saveMessage($message);
439 $this->message->display($message, 'error');
443 * Checks for exceptional conditions, and display feedback.
445 protected function checkStatus() {
446 if ($this->memoryExceeded()) {
447 return MigrationInterface::RESULT_INCOMPLETE;
449 return MigrationInterface::RESULT_COMPLETED;
453 * Tests whether we've exceeded the desired memory threshold.
455 * If so, output a message.
458 * TRUE if the threshold is exceeded, otherwise FALSE.
460 protected function memoryExceeded() {
461 $usage = $this->getMemoryUsage();
462 $pct_memory = $usage / $this->memoryLimit;
463 if (!$threshold = $this->memoryThreshold) {
466 if ($pct_memory > $threshold) {
467 $this->message->display(
469 'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.',
471 '@pct' => round($pct_memory * 100),
472 '@usage' => $this->formatSize($usage),
473 '@limit' => $this->formatSize($this->memoryLimit),
478 $usage = $this->attemptMemoryReclaim();
479 $pct_memory = $usage / $this->memoryLimit;
480 // Use a lower threshold - we don't want to be in a situation where we keep
481 // coming back here and trimming a tiny amount
482 if ($pct_memory > (0.90 * $threshold)) {
483 $this->message->display(
485 'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch',
487 '@pct' => round($pct_memory * 100),
488 '@usage' => $this->formatSize($usage),
489 '@limit' => $this->formatSize($this->memoryLimit),
497 $this->message->display(
499 'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing',
501 '@pct' => round($pct_memory * 100),
502 '@usage' => $this->formatSize($usage),
503 '@limit' => $this->formatSize($this->memoryLimit),
516 * Returns the memory usage so far.
521 protected function getMemoryUsage() {
522 return memory_get_usage();
526 * Tries to reclaim memory.
529 * The memory usage after reclaim.
531 protected function attemptMemoryReclaim() {
532 // First, try resetting Drupal's static storage - this frequently releases
533 // plenty of memory to continue.
534 drupal_static_reset();
536 // Entity storage can blow up with caches so clear them out.
537 $manager = \Drupal::entityManager();
538 foreach ($manager->getDefinitions() as $id => $definition) {
539 $manager->getStorage($id)->resetCache();
542 // @TODO: explore resetting the container.
544 // Run garbage collector to further reduce memory.
547 return memory_get_usage();
551 * Generates a string representation for the given byte count.
557 * A translated string representation of the size.
559 protected function formatSize($size) {
560 return format_size($size);