--- /dev/null
+<?php
+
+namespace Drupal\migrate_tools;
+
+use Drupal\migrate\Event\MigratePreRowSaveEvent;
+use Drupal\migrate\Event\MigrateRollbackEvent;
+use Drupal\migrate\Event\MigrateRowDeleteEvent;
+use Drupal\migrate\MigrateExecutable as MigrateExecutableBase;
+use Drupal\migrate\MigrateMessageInterface;
+use Drupal\migrate\Plugin\MigrationInterface;
+use Drupal\migrate\MigrateSkipRowException;
+use Drupal\migrate\Plugin\MigrateIdMapInterface;
+use Drupal\migrate\Event\MigrateEvents;
+use Drupal\migrate_plus\Event\MigrateEvents as MigratePlusEvents;
+use Drupal\migrate\Event\MigrateMapSaveEvent;
+use Drupal\migrate\Event\MigrateMapDeleteEvent;
+use Drupal\migrate\Event\MigrateImportEvent;
+use Drupal\migrate_plus\Event\MigratePrepareRowEvent;
+
+class MigrateExecutable extends MigrateExecutableBase {
+
+ /**
+ * Counters of map statuses.
+ *
+ * @var array
+ * Set of counters, keyed by MigrateIdMapInterface::STATUS_* constant.
+ */
+ protected $saveCounters = array(
+ MigrateIdMapInterface::STATUS_FAILED => 0,
+ MigrateIdMapInterface::STATUS_IGNORED => 0,
+ MigrateIdMapInterface::STATUS_IMPORTED => 0,
+ MigrateIdMapInterface::STATUS_NEEDS_UPDATE => 0,
+ );
+
+ /**
+ * Counter of map deletions.
+ *
+ * @var int
+ */
+ protected $deleteCounter = 0;
+
+ /**
+ * Maximum number of items to process in this migration. 0 indicates no limit
+ * is to be applied.
+ *
+ * @var int
+ */
+ protected $itemLimit = 0;
+
+ /**
+ * Frequency (in items) at which progress messages should be emitted.
+ *
+ * @var int
+ */
+ protected $feedback = 0;
+
+ /**
+ * List of specific source IDs to import.
+ *
+ * @var array
+ */
+ protected $idlist = [];
+
+ /**
+ * Count of number of items processed so far in this migration.
+ * @var int
+ */
+ protected $counter = 0;
+
+ /**
+ * Whether the destination item exists before saving.
+ *
+ * @var bool
+ */
+ protected $preExistingItem = FALSE;
+
+ /**
+ * List of event listeners we have registered.
+ *
+ * @var array
+ */
+ protected $listeners = [];
+
+ /**
+ * {@inheritdoc}
+ */
+ public function __construct(MigrationInterface $migration, MigrateMessageInterface $message, array $options = []) {
+ parent::__construct($migration, $message);
+ if (isset($options['limit'])) {
+ $this->itemLimit = $options['limit'];
+ }
+ if (isset($options['feedback'])) {
+ $this->feedback = $options['feedback'];
+ }
+ if (isset($options['idlist'])) {
+ $this->idlist = explode(',', $options['idlist']);
+ array_walk($this->idlist , function(&$value, $key) {
+ $value = explode(':', $value);
+ });
+ }
+
+ $this->listeners[MigrateEvents::MAP_SAVE] = [$this, 'onMapSave'];
+ $this->listeners[MigrateEvents::MAP_DELETE] = [$this, 'onMapDelete'];
+ $this->listeners[MigrateEvents::POST_IMPORT] = [$this, 'onPostImport'];
+ $this->listeners[MigrateEvents::POST_ROLLBACK] = [$this, 'onPostRollback'];
+ $this->listeners[MigrateEvents::PRE_ROW_SAVE] = [$this, 'onPreRowSave'];
+ $this->listeners[MigrateEvents::POST_ROW_DELETE] = [$this, 'onPostRowDelete'];
+ $this->listeners[MigratePlusEvents::PREPARE_ROW] = [$this, 'onPrepareRow'];
+ foreach ($this->listeners as $event => $listener) {
+ \Drupal::service('event_dispatcher')->addListener($event, $listener);
+ }
+ }
+
+ /**
+ * Count up any map save events.
+ *
+ * @param \Drupal\migrate\Event\MigrateMapSaveEvent $event
+ * The map event.
+ */
+ public function onMapSave(MigrateMapSaveEvent $event) {
+ // Only count saves for this migration.
+ if ($event->getMap()->getQualifiedMapTableName() == $this->migration->getIdMap()->getQualifiedMapTableName()) {
+ $fields = $event->getFields();
+ // Distinguish between creation and update.
+ if ($fields['source_row_status'] == MigrateIdMapInterface::STATUS_IMPORTED &&
+ $this->preExistingItem
+ ) {
+ $this->saveCounters[MigrateIdMapInterface::STATUS_NEEDS_UPDATE]++;
+ }
+ else {
+ $this->saveCounters[$fields['source_row_status']]++;
+ }
+ }
+ }
+
+ /**
+ * Count up any rollback events.
+ *
+ * @param \Drupal\migrate\Event\MigrateMapDeleteEvent $event
+ * The map event.
+ */
+ public function onMapDelete(MigrateMapDeleteEvent $event) {
+ $this->deleteCounter++;
+ }
+
+ /**
+ * Return the number of items created.
+ *
+ * @return int
+ */
+ public function getCreatedCount() {
+ return $this->saveCounters[MigrateIdMapInterface::STATUS_IMPORTED];
+ }
+
+ /**
+ * Return the number of items updated.
+ *
+ * @return int
+ */
+ public function getUpdatedCount() {
+ return $this->saveCounters[MigrateIdMapInterface::STATUS_NEEDS_UPDATE];
+ }
+
+ /**
+ * Return the number of items ignored.
+ *
+ * @return int
+ */
+ public function getIgnoredCount() {
+ return $this->saveCounters[MigrateIdMapInterface::STATUS_IGNORED];
+ }
+
+ /**
+ * Return the number of items that failed.
+ *
+ * @return int
+ */
+ public function getFailedCount() {
+ return $this->saveCounters[MigrateIdMapInterface::STATUS_FAILED];
+ }
+
+ /**
+ * Return the total number of items processed. Note that STATUS_NEEDS_UPDATE
+ * is not counted, since this is typically set on stubs created as side
+ * effects, not on the primary item being imported.
+ *
+ * @return int
+ */
+ public function getProcessedCount() {
+ return $this->saveCounters[MigrateIdMapInterface::STATUS_IMPORTED] +
+ $this->saveCounters[MigrateIdMapInterface::STATUS_NEEDS_UPDATE] +
+ $this->saveCounters[MigrateIdMapInterface::STATUS_IGNORED] +
+ $this->saveCounters[MigrateIdMapInterface::STATUS_FAILED];
+ }
+
+ /**
+ * Return the number of items rolled back.
+ *
+ * @return int
+ */
+ public function getRollbackCount() {
+ return $this->deleteCounter;
+ }
+
+ /**
+ * Reset all the per-status counters to 0.
+ */
+ protected function resetCounters() {
+ foreach ($this->saveCounters as $status => $count) {
+ $this->saveCounters[$status] = 0;
+ }
+ $this->deleteCounter = 0;
+ }
+
+ /**
+ * React to migration completion.
+ *
+ * @param \Drupal\migrate\Event\MigrateImportEvent $event
+ * The map event.
+ */
+ public function onPostImport(MigrateImportEvent $event) {
+ $migrate_last_imported_store = \Drupal::keyValue('migrate_last_imported');
+ $migrate_last_imported_store->set($event->getMigration()->id(), round(microtime(TRUE) * 1000));
+ $this->progressMessage();
+ $this->removeListeners();
+ }
+
+ /**
+ * Clean up all our event listeners.
+ */
+ protected function removeListeners() {
+ foreach ($this->listeners as $event => $listener) {
+ \Drupal::service('event_dispatcher')->removeListener($event, $listener);
+ }
+ }
+
+ /**
+ * Emit information on what we've done since the last feedback (or the
+ * beginning of this migration).
+ *
+ * @param bool $done
+ */
+ protected function progressMessage($done = TRUE) {
+ $processed = $this->getProcessedCount();
+ if ($done) {
+ $singular_message = "Processed 1 item (@created created, @updated updated, @failures failed, @ignored ignored) - done with '@name'";
+ $plural_message = "Processed @numitems items (@created created, @updated updated, @failures failed, @ignored ignored) - done with '@name'";
+ }
+ else {
+ $singular_message = "Processed 1 item (@created created, @updated updated, @failures failed, @ignored ignored) - continuing with '@name'";
+ $plural_message = "Processed @numitems items (@created created, @updated updated, @failures failed, @ignored ignored) - continuing with '@name'";
+ }
+ $this->message->display(\Drupal::translation()->formatPlural($processed,
+ $singular_message, $plural_message,
+ array('@numitems' => $processed,
+ '@created' => $this->getCreatedCount(),
+ '@updated' => $this->getUpdatedCount(),
+ '@failures' => $this->getFailedCount(),
+ '@ignored' => $this->getIgnoredCount(),
+ '@name' => $this->migration->id())));
+ }
+
+ /**
+ * React to rollback completion.
+ *
+ * @param \Drupal\migrate\Event\MigrateRollbackEvent $event
+ * The map event.
+ */
+ public function onPostRollback(MigrateRollbackEvent $event) {
+ $this->rollbackMessage();
+ $this->removeListeners();
+ }
+
+ /**
+ * Emit information on what we've done since the last feedback (or the
+ * beginning of this migration).
+ *
+ * @param bool $done
+ */
+ protected function rollbackMessage($done = TRUE) {
+ $rolled_back = $this->getRollbackCount();
+ if ($done) {
+ $singular_message = "Rolled back 1 item - done with '@name'";
+ $plural_message = "Rolled back @numitems items - done with '@name'";
+ }
+ else {
+ $singular_message = "Rolled back 1 item - continuing with '@name'";
+ $plural_message = "Rolled back @numitems items - continuing with '@name'";
+ }
+ $this->message->display(\Drupal::translation()->formatPlural($rolled_back,
+ $singular_message, $plural_message,
+ array('@numitems' => $rolled_back,
+ '@name' => $this->migration->id())));
+ }
+
+ /**
+ * React to an item about to be imported.
+ *
+ * @param \Drupal\migrate\Event\MigratePreRowSaveEvent $event
+ * The pre-save event.
+ */
+ public function onPreRowSave(MigratePreRowSaveEvent $event) {
+ $id_map = $event->getRow()->getIdMap();
+ if (!empty($id_map['destid1'])) {
+ $this->preExistingItem = TRUE;
+ }
+ else {
+ $this->preExistingItem = FALSE;
+ }
+ }
+
+ /**
+ * React to item rollback.
+ *
+ * @param \Drupal\migrate\Event\MigrateRowDeleteEvent $event
+ * The post-save event.
+ */
+ public function onPostRowDelete(MigrateRowDeleteEvent $event) {
+ if ($this->feedback && ($this->deleteCounter) && $this->deleteCounter % $this->feedback == 0) {
+ $this->rollbackMessage(FALSE);
+ $this->resetCounters();
+ }
+ }
+
+ /**
+ * React to a new row.
+ *
+ * @param \Drupal\migrate_plus\Event\MigratePrepareRowEvent $event
+ * The prepare-row event.
+ *
+ * @throws \Drupal\migrate\MigrateSkipRowException
+ *
+ */
+ public function onPrepareRow(MigratePrepareRowEvent $event) {
+ if (!empty($this->idlist)) {
+ $row = $event->getRow();
+ /**
+ * @TODO replace for $source_id = $row->getSourceIdValues(); when https://www.drupal.org/node/2698023 is fixed
+ */
+ $migration = $event->getMigration();
+ $source_id = array_merge(array_flip(array_keys($migration->getSourcePlugin()
+ ->getIds())), $row->getSourceIdValues());
+ $skip = TRUE;
+ foreach ($this->idlist as $item) {
+ if (array_values($source_id) === $item) {
+ $skip = FALSE;
+ break;
+ }
+ }
+ if ($skip) {
+ throw new MigrateSkipRowException(NULL, FALSE);
+ }
+ }
+ if ($this->feedback && ($this->counter) && $this->counter % $this->feedback == 0) {
+ $this->progressMessage(FALSE);
+ $this->resetCounters();
+ }
+ $this->counter++;
+ if ($this->itemLimit && ($this->getProcessedCount() + 1) >= $this->itemLimit) {
+ $event->getMigration()->interruptMigration(MigrationInterface::RESULT_COMPLETED);
+ }
+
+ }
+
+}