Version 1
[yaffs-website] / web / core / modules / migrate / src / Plugin / migrate / source / SourcePluginBase.php
diff --git a/web/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php b/web/core/modules/migrate/src/Plugin/migrate/source/SourcePluginBase.php
new file mode 100644 (file)
index 0000000..e355850
--- /dev/null
@@ -0,0 +1,544 @@
+<?php
+
+namespace Drupal\migrate\Plugin\migrate\source;
+
+use Drupal\Core\Plugin\PluginBase;
+use Drupal\migrate\Event\MigrateRollbackEvent;
+use Drupal\migrate\Event\RollbackAwareInterface;
+use Drupal\migrate\Plugin\MigrationInterface;
+use Drupal\migrate\MigrateException;
+use Drupal\migrate\MigrateSkipRowException;
+use Drupal\migrate\Plugin\MigrateIdMapInterface;
+use Drupal\migrate\Plugin\MigrateSourceInterface;
+use Drupal\migrate\Row;
+
+/**
+ * The base class for all source plugins.
+ *
+ * @see \Drupal\migrate\Plugin\MigratePluginManager
+ * @see \Drupal\migrate\Annotation\MigrateSource
+ * @see \Drupal\migrate\Plugin\MigrateSourceInterface
+ * @see plugin_api
+ *
+ * @ingroup migration
+ */
+abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {
+
+  /**
+   * The module handler service.
+   *
+   * @var \Drupal\Core\Extension\ModuleHandlerInterface
+   */
+  protected $moduleHandler;
+
+  /**
+   * The entity migration object.
+   *
+   * @var \Drupal\migrate\Plugin\MigrationInterface
+   */
+  protected $migration;
+
+  /**
+   * The current row from the query.
+   *
+   * @var \Drupal\Migrate\Row
+   */
+  protected $currentRow;
+
+  /**
+   * The primary key of the current row.
+   *
+   * @var array
+   */
+  protected $currentSourceIds;
+
+  /**
+   * Information on the property used as the high-water mark.
+   *
+   * Array of 'name' and (optional) db 'alias' properties used for high-water
+   * mark.
+   *
+   * @var array
+   */
+  protected $highWaterProperty = [];
+
+  /**
+   * The key-value storage for the high-water value.
+   *
+   * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
+   */
+  protected $highWaterStorage;
+
+  /**
+   * The high water mark at the beginning of the import operation.
+   *
+   * If the source has a property for tracking changes (like Drupal has
+   * node.changed) then this is the highest value of those imported so far.
+   *
+   * @var int
+   */
+  protected $originalHighWater;
+
+  /**
+   * Whether this instance should cache the source count.
+   *
+   * @var bool
+   */
+  protected $cacheCounts = FALSE;
+
+  /**
+   * Key to use for caching counts.
+   *
+   * @var string
+   */
+  protected $cacheKey;
+
+  /**
+   * Whether this instance should not attempt to count the source.
+   *
+   * @var bool
+   */
+  protected $skipCount = FALSE;
+
+  /**
+   * Flags whether to track changes to incoming data.
+   *
+   * If TRUE, we will maintain hashed source rows to determine whether incoming
+   * data has changed.
+   *
+   * @var bool
+   */
+  protected $trackChanges = FALSE;
+
+  /**
+   * Flags whether source plugin will read the map row and add to data row.
+   *
+   * By default, next() will directly read the map row and add it to the data
+   * row. A source plugin implementation may do this itself (in particular, the
+   * SQL source can incorporate the map table into the query) - if so, it should
+   * set this TRUE so we don't duplicate the effort.
+   *
+   * @var bool
+   */
+  protected $mapRowAdded = FALSE;
+
+  /**
+   * The backend cache.
+   *
+   * @var \Drupal\Core\Cache\CacheBackendInterface
+   */
+  protected $cache;
+
+  /**
+   * The migration ID map.
+   *
+   * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
+   */
+  protected $idMap;
+
+  /**
+   * The iterator to iterate over the source rows.
+   *
+   * @var \Iterator
+   */
+  protected $iterator;
+
+  /**
+   * {@inheritdoc}
+   */
+  public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
+    parent::__construct($configuration, $plugin_id, $plugin_definition);
+    $this->migration = $migration;
+
+    // Set up some defaults based on the source configuration.
+    foreach (['cacheCounts' => 'cache_counts', 'skipCount' => 'skip_count', 'trackChanges' => 'track_changes'] as $property => $config_key) {
+      if (isset($configuration[$config_key])) {
+        $this->$property = (bool) $configuration[$config_key];
+      }
+    }
+    $this->cacheKey = !empty($configuration['cache_key']) ? $configuration['cache_key'] : NULL;
+    $this->idMap = $this->migration->getIdMap();
+    $this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;
+
+    // Pull out the current highwater mark if we have a highwater property.
+    if ($this->highWaterProperty) {
+      $this->originalHighWater = $this->getHighWater();
+    }
+
+    // Don't allow the use of both highwater and track changes together.
+    if ($this->highWaterProperty && $this->trackChanges) {
+      throw new MigrateException('You should either use a highwater mark or track changes not both. They are both designed to solve the same problem');
+    }
+  }
+
+  /**
+   * Initializes the iterator with the source data.
+   *
+   * @return array
+   *   An array of the data for this source.
+   */
+  protected abstract function initializeIterator();
+
+  /**
+   * Gets the module handler.
+   *
+   * @return \Drupal\Core\Extension\ModuleHandlerInterface
+   *   The module handler.
+   */
+  protected function getModuleHandler() {
+    if (!isset($this->moduleHandler)) {
+      $this->moduleHandler = \Drupal::moduleHandler();
+    }
+    return $this->moduleHandler;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function prepareRow(Row $row) {
+    $result = TRUE;
+    try {
+      $result_hook = $this->getModuleHandler()->invokeAll('migrate_prepare_row', [$row, $this, $this->migration]);
+      $result_named_hook = $this->getModuleHandler()->invokeAll('migrate_' . $this->migration->id() . '_prepare_row', [$row, $this, $this->migration]);
+      // We will skip if any hook returned FALSE.
+      $skip = ($result_hook && in_array(FALSE, $result_hook)) || ($result_named_hook && in_array(FALSE, $result_named_hook));
+      $save_to_map = TRUE;
+    }
+    catch (MigrateSkipRowException $e) {
+      $skip = TRUE;
+      $save_to_map = $e->getSaveToMap();
+      if ($message = trim($e->getMessage())) {
+        $this->idMap->saveMessage($row->getSourceIdValues(), $message, MigrationInterface::MESSAGE_INFORMATIONAL);
+      }
+    }
+
+    // We're explicitly skipping this row - keep track in the map table.
+    if ($skip) {
+      // Make sure we replace any previous messages for this item with any
+      // new ones.
+      if ($save_to_map) {
+        $this->idMap->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
+        $this->currentRow = NULL;
+        $this->currentSourceIds = NULL;
+      }
+      $result = FALSE;
+    }
+    elseif ($this->trackChanges) {
+      // When tracking changed data, We want to quietly skip (rather than
+      // "ignore") rows with changes. The caller needs to make that decision,
+      // so we need to provide them with the necessary information (before and
+      // after hashes).
+      $row->rehash();
+    }
+    return $result;
+  }
+
+  /**
+   * Returns the iterator that will yield the row arrays to be processed.
+   *
+   * @return \Iterator
+   *   The iterator that will yield the row arrays to be processed.
+   */
+  protected function getIterator() {
+    if (!isset($this->iterator)) {
+      $this->iterator = $this->initializeIterator();
+    }
+    return $this->iterator;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function current() {
+    return $this->currentRow;
+  }
+
+  /**
+   * Gets the iterator key.
+   *
+   * Implementation of \Iterator::key() - called when entering a loop iteration,
+   * returning the key of the current row. It must be a scalar - we will
+   * serialize to fulfill the requirement, but using getCurrentIds() is
+   * preferable.
+   */
+  public function key() {
+    return serialize($this->currentSourceIds);
+  }
+
+  /**
+   * Checks whether the iterator is currently valid.
+   *
+   * Implementation of \Iterator::valid() - called at the top of the loop,
+   * returning TRUE to process the loop and FALSE to terminate it.
+   */
+  public function valid() {
+    return isset($this->currentRow);
+  }
+
+  /**
+   * Rewinds the iterator.
+   *
+   * Implementation of \Iterator::rewind() - subclasses of SourcePluginBase
+   * should implement initializeIterator() to do any class-specific setup for
+   * iterating source records.
+   */
+  public function rewind() {
+    $this->getIterator()->rewind();
+    $this->next();
+  }
+
+  /**
+   * {@inheritdoc}
+   *
+   * The migration iterates over rows returned by the source plugin. This
+   * method determines the next row which will be processed and imported into
+   * the system.
+   *
+   * The method tracks the source and destination IDs using the ID map plugin.
+   *
+   * This also takes care about highwater support. Highwater allows to reimport
+   * rows from a previous migration run, which got changed in the meantime.
+   * This is done by specifying a highwater field, which is compared with the
+   * last time, the migration got executed (originalHighWater).
+   */
+  public function next() {
+    $this->currentSourceIds = NULL;
+    $this->currentRow = NULL;
+
+    // In order to find the next row we want to process, we ask the source
+    // plugin for the next possible row.
+    while (!isset($this->currentRow) && $this->getIterator()->valid()) {
+
+      $row_data = $this->getIterator()->current() + $this->configuration;
+      $this->fetchNextRow();
+      $row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->getDestinationIds());
+
+      // Populate the source key for this row.
+      $this->currentSourceIds = $row->getSourceIdValues();
+
+      // Pick up the existing map row, if any, unless fetchNextRow() did it.
+      if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
+        $row->setIdMap($id_map);
+      }
+
+      // Clear any previous messages for this row before potentially adding
+      // new ones.
+      if (!empty($this->currentSourceIds)) {
+        $this->idMap->delete($this->currentSourceIds, TRUE);
+      }
+
+      // Preparing the row gives source plugins the chance to skip.
+      if ($this->prepareRow($row) === FALSE) {
+        continue;
+      }
+
+      // Check whether the row needs processing.
+      // 1. This row has not been imported yet.
+      // 2. Explicitly set to update.
+      // 3. The row is newer than the current highwater mark.
+      // 4. If no such property exists then try by checking the hash of the row.
+      if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) {
+        $this->currentRow = $row->freezeSource();
+      }
+
+      if ($this->getHighWaterProperty()) {
+        $this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name']));
+      }
+    }
+  }
+
+  /**
+   * Position the iterator to the following row.
+   */
+  protected function fetchNextRow() {
+    $this->getIterator()->next();
+  }
+
+  /**
+   * Check if the incoming data is newer than what we've previously imported.
+   *
+   * @param \Drupal\migrate\Row $row
+   *   The row we're importing.
+   *
+   * @return bool
+   *   TRUE if the highwater value in the row is greater than our current value.
+   */
+  protected function aboveHighwater(Row $row) {
+    return $this->getHighWaterProperty() && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
+  }
+
+  /**
+   * Checks if the incoming row has changed since our last import.
+   *
+   * @param \Drupal\migrate\Row $row
+   *   The row we're importing.
+   *
+   * @return bool
+   *   TRUE if the row has changed otherwise FALSE.
+   */
+  protected function rowChanged(Row $row) {
+    return $this->trackChanges && $row->changed();
+  }
+
+  /**
+   * Gets the currentSourceIds data member.
+   */
+  public function getCurrentIds() {
+    return $this->currentSourceIds;
+  }
+
+  /**
+   * Gets the source count.
+   *
+   * Return a count of available source records, from the cache if appropriate.
+   * Returns -1 if the source is not countable.
+   *
+   * @param bool $refresh
+   *   (optional) Whether or not to refresh the count. Defaults to FALSE.
+   *
+   * @return int
+   *   The count.
+   */
+  public function count($refresh = FALSE) {
+    if ($this->skipCount) {
+      return -1;
+    }
+
+    if (!isset($this->cacheKey)) {
+      $this->cacheKey = hash('sha256', $this->getPluginId());
+    }
+
+    // If a refresh is requested, or we're not caching counts, ask the derived
+    // class to get the count from the source.
+    if ($refresh || !$this->cacheCounts) {
+      $count = $this->doCount();
+      $this->getCache()->set($this->cacheKey, $count);
+    }
+    else {
+      // Caching is in play, first try to retrieve a cached count.
+      $cache_object = $this->getCache()->get($this->cacheKey, 'cache');
+      if (is_object($cache_object)) {
+        // Success.
+        $count = $cache_object->data;
+      }
+      else {
+        // No cached count, ask the derived class to count 'em up, and cache
+        // the result.
+        $count = $this->doCount();
+        $this->getCache()->set($this->cacheKey, $count);
+      }
+    }
+    return $count;
+  }
+
+  /**
+   * Gets the cache object.
+   *
+   * @return \Drupal\Core\Cache\CacheBackendInterface
+   *   The cache object.
+   */
+  protected function getCache() {
+    if (!isset($this->cache)) {
+      $this->cache = \Drupal::cache('migrate');
+    }
+    return $this->cache;
+  }
+
+  /**
+   * Gets the source count checking if the source is countable or using the
+   * iterator_count function.
+   *
+   * @return int
+   */
+  protected function doCount() {
+    $iterator = $this->getIterator();
+    return $iterator instanceof \Countable ? $iterator->count() : iterator_count($this->initializeIterator());
+  }
+
+  /**
+   * Get the high water storage object.
+   *
+   * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
+   *   The storage object.
+   */
+  protected function getHighWaterStorage() {
+    if (!isset($this->highWaterStorage)) {
+      $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
+    }
+    return $this->highWaterStorage;
+  }
+
+  /**
+   * The current value of the high water mark.
+   *
+   * The high water mark defines a timestamp stating the time the import was last
+   * run. If the mark is set, only content with a higher timestamp will be
+   * imported.
+   *
+   * @return int|null
+   *   A Unix timestamp representing the high water mark, or NULL if no high
+   *   water mark has been stored.
+   */
+  protected function getHighWater() {
+    return $this->getHighWaterStorage()->get($this->migration->id());
+  }
+
+  /**
+   * Save the new high water mark.
+   *
+   * @param int $high_water
+   *   The high water timestamp.
+   */
+  protected function saveHighWater($high_water) {
+    $this->getHighWaterStorage()->set($this->migration->id(), $high_water);
+  }
+
+  /**
+   * Get information on the property used as the high watermark.
+   *
+   * Array of 'name' & (optional) db 'alias' properties used for high watermark.
+   *
+   * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
+   *
+   * @return array
+   *   The property used as the high watermark.
+   */
+  protected function getHighWaterProperty() {
+    return $this->highWaterProperty;
+  }
+
+  /**
+   * Get the name of the field used as the high watermark.
+   *
+   * The name of the field qualified with an alias if available.
+   *
+   * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
+   *
+   * @return string|null
+   *   The name of the field for the high water mark, or NULL if not set.
+   */
+  protected function getHighWaterField() {
+    if (!empty($this->highWaterProperty['name'])) {
+      return !empty($this->highWaterProperty['alias']) ?
+        $this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] :
+        $this->highWaterProperty['name'];
+    }
+    return NULL;
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function preRollback(MigrateRollbackEvent $event) {
+    // Nothing to do in this implementation.
+  }
+
+  /**
+   * {@inheritdoc}
+   */
+  public function postRollback(MigrateRollbackEvent $event) {
+    // Reset the high-water mark.
+    $this->saveHighWater(NULL);
+  }
+
+}