3 namespace Drupal\migrate\Plugin\migrate\source;
5 use Drupal\Core\Plugin\PluginBase;
6 use Drupal\migrate\Event\MigrateRollbackEvent;
7 use Drupal\migrate\Event\RollbackAwareInterface;
8 use Drupal\migrate\Plugin\MigrationInterface;
9 use Drupal\migrate\MigrateException;
10 use Drupal\migrate\MigrateSkipRowException;
11 use Drupal\migrate\Plugin\MigrateIdMapInterface;
12 use Drupal\migrate\Plugin\MigrateSourceInterface;
13 use Drupal\migrate\Row;
16 * The base class for all source plugins.
18 * @see \Drupal\migrate\Plugin\MigratePluginManager
19 * @see \Drupal\migrate\Annotation\MigrateSource
20 * @see \Drupal\migrate\Plugin\MigrateSourceInterface
25 abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {
28 * The module handler service.
30 * @var \Drupal\Core\Extension\ModuleHandlerInterface
32 protected $moduleHandler;
35 * The entity migration object.
37 * @var \Drupal\migrate\Plugin\MigrationInterface
42 * The current row from the query.
44 * @var \Drupal\Migrate\Row
46 protected $currentRow;
49 * The primary key of the current row.
53 protected $currentSourceIds;
56 * Information on the property used as the high-water mark.
58 * Array of 'name' and (optional) db 'alias' properties used for high-water
63 protected $highWaterProperty = [];
66 * The key-value storage for the high-water value.
68 * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
70 protected $highWaterStorage;
73 * The high water mark at the beginning of the import operation.
75 * If the source has a property for tracking changes (like Drupal has
76 * node.changed) then this is the highest value of those imported so far.
80 protected $originalHighWater;
83 * Whether this instance should cache the source count.
87 protected $cacheCounts = FALSE;
90 * Key to use for caching counts.
97 * Whether this instance should not attempt to count the source.
101 protected $skipCount = FALSE;
104 * Flags whether to track changes to incoming data.
106 * If TRUE, we will maintain hashed source rows to determine whether incoming
111 protected $trackChanges = FALSE;
114 * Flags whether source plugin will read the map row and add to data row.
116 * By default, next() will directly read the map row and add it to the data
117 * row. A source plugin implementation may do this itself (in particular, the
118 * SQL source can incorporate the map table into the query) - if so, it should
119 * set this TRUE so we don't duplicate the effort.
123 protected $mapRowAdded = FALSE;
128 * @var \Drupal\Core\Cache\CacheBackendInterface
133 * The migration ID map.
135 * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
140 * The iterator to iterate over the source rows.
149 public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
150 parent::__construct($configuration, $plugin_id, $plugin_definition);
151 $this->migration = $migration;
153 // Set up some defaults based on the source configuration.
154 foreach (['cacheCounts' => 'cache_counts', 'skipCount' => 'skip_count', 'trackChanges' => 'track_changes'] as $property => $config_key) {
155 if (isset($configuration[$config_key])) {
156 $this->$property = (bool) $configuration[$config_key];
159 $this->cacheKey = !empty($configuration['cache_key']) ? $configuration['cache_key'] : NULL;
160 $this->idMap = $this->migration->getIdMap();
161 $this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;
163 // Pull out the current highwater mark if we have a highwater property.
164 if ($this->highWaterProperty) {
165 $this->originalHighWater = $this->getHighWater();
168 // Don't allow the use of both highwater and track changes together.
169 if ($this->highWaterProperty && $this->trackChanges) {
170 throw new MigrateException('You should either use a highwater mark or track changes not both. They are both designed to solve the same problem');
175 * Initializes the iterator with the source data.
178 * An array of the data for this source.
180 protected abstract function initializeIterator();
183 * Gets the module handler.
185 * @return \Drupal\Core\Extension\ModuleHandlerInterface
186 * The module handler.
188 protected function getModuleHandler() {
189 if (!isset($this->moduleHandler)) {
190 $this->moduleHandler = \Drupal::moduleHandler();
192 return $this->moduleHandler;
198 public function prepareRow(Row $row) {
201 $result_hook = $this->getModuleHandler()->invokeAll('migrate_prepare_row', [$row, $this, $this->migration]);
202 $result_named_hook = $this->getModuleHandler()->invokeAll('migrate_' . $this->migration->id() . '_prepare_row', [$row, $this, $this->migration]);
203 // We will skip if any hook returned FALSE.
204 $skip = ($result_hook && in_array(FALSE, $result_hook)) || ($result_named_hook && in_array(FALSE, $result_named_hook));
207 catch (MigrateSkipRowException $e) {
209 $save_to_map = $e->getSaveToMap();
210 if ($message = trim($e->getMessage())) {
211 $this->idMap->saveMessage($row->getSourceIdValues(), $message, MigrationInterface::MESSAGE_INFORMATIONAL);
215 // We're explicitly skipping this row - keep track in the map table.
217 // Make sure we replace any previous messages for this item with any
220 $this->idMap->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
221 $this->currentRow = NULL;
222 $this->currentSourceIds = NULL;
226 elseif ($this->trackChanges) {
227 // When tracking changed data, We want to quietly skip (rather than
228 // "ignore") rows with changes. The caller needs to make that decision,
229 // so we need to provide them with the necessary information (before and
237 * Returns the iterator that will yield the row arrays to be processed.
240 * The iterator that will yield the row arrays to be processed.
242 protected function getIterator() {
243 if (!isset($this->iterator)) {
244 $this->iterator = $this->initializeIterator();
246 return $this->iterator;
252 public function current() {
253 return $this->currentRow;
257 * Gets the iterator key.
259 * Implementation of \Iterator::key() - called when entering a loop iteration,
260 * returning the key of the current row. It must be a scalar - we will
261 * serialize to fulfill the requirement, but using getCurrentIds() is
264 public function key() {
265 return serialize($this->currentSourceIds);
269 * Checks whether the iterator is currently valid.
271 * Implementation of \Iterator::valid() - called at the top of the loop,
272 * returning TRUE to process the loop and FALSE to terminate it.
274 public function valid() {
275 return isset($this->currentRow);
279 * Rewinds the iterator.
281 * Implementation of \Iterator::rewind() - subclasses of SourcePluginBase
282 * should implement initializeIterator() to do any class-specific setup for
283 * iterating source records.
285 public function rewind() {
286 $this->getIterator()->rewind();
293 * The migration iterates over rows returned by the source plugin. This
294 * method determines the next row which will be processed and imported into
297 * The method tracks the source and destination IDs using the ID map plugin.
299 * This also takes care about highwater support. Highwater allows to reimport
300 * rows from a previous migration run, which got changed in the meantime.
301 * This is done by specifying a highwater field, which is compared with the
302 * last time, the migration got executed (originalHighWater).
304 public function next() {
305 $this->currentSourceIds = NULL;
306 $this->currentRow = NULL;
308 // In order to find the next row we want to process, we ask the source
309 // plugin for the next possible row.
310 while (!isset($this->currentRow) && $this->getIterator()->valid()) {
312 $row_data = $this->getIterator()->current() + $this->configuration;
313 $this->fetchNextRow();
314 $row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->getDestinationIds());
316 // Populate the source key for this row.
317 $this->currentSourceIds = $row->getSourceIdValues();
319 // Pick up the existing map row, if any, unless fetchNextRow() did it.
320 if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
321 $row->setIdMap($id_map);
324 // Clear any previous messages for this row before potentially adding
326 if (!empty($this->currentSourceIds)) {
327 $this->idMap->delete($this->currentSourceIds, TRUE);
330 // Preparing the row gives source plugins the chance to skip.
331 if ($this->prepareRow($row) === FALSE) {
335 // Check whether the row needs processing.
336 // 1. This row has not been imported yet.
337 // 2. Explicitly set to update.
338 // 3. The row is newer than the current highwater mark.
339 // 4. If no such property exists then try by checking the hash of the row.
340 if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) {
341 $this->currentRow = $row->freezeSource();
344 if ($this->getHighWaterProperty()) {
345 $this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name']));
351 * Position the iterator to the following row.
353 protected function fetchNextRow() {
354 $this->getIterator()->next();
358 * Check if the incoming data is newer than what we've previously imported.
360 * @param \Drupal\migrate\Row $row
361 * The row we're importing.
364 * TRUE if the highwater value in the row is greater than our current value.
366 protected function aboveHighwater(Row $row) {
367 return $this->getHighWaterProperty() && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
371 * Checks if the incoming row has changed since our last import.
373 * @param \Drupal\migrate\Row $row
374 * The row we're importing.
377 * TRUE if the row has changed otherwise FALSE.
379 protected function rowChanged(Row $row) {
380 return $this->trackChanges && $row->changed();
384 * Gets the currentSourceIds data member.
386 public function getCurrentIds() {
387 return $this->currentSourceIds;
391 * Gets the source count.
393 * Return a count of available source records, from the cache if appropriate.
394 * Returns -1 if the source is not countable.
396 * @param bool $refresh
397 * (optional) Whether or not to refresh the count. Defaults to FALSE.
402 public function count($refresh = FALSE) {
403 if ($this->skipCount) {
407 if (!isset($this->cacheKey)) {
408 $this->cacheKey = hash('sha256', $this->getPluginId());
411 // If a refresh is requested, or we're not caching counts, ask the derived
412 // class to get the count from the source.
413 if ($refresh || !$this->cacheCounts) {
414 $count = $this->doCount();
415 $this->getCache()->set($this->cacheKey, $count);
418 // Caching is in play, first try to retrieve a cached count.
419 $cache_object = $this->getCache()->get($this->cacheKey, 'cache');
420 if (is_object($cache_object)) {
422 $count = $cache_object->data;
425 // No cached count, ask the derived class to count 'em up, and cache
427 $count = $this->doCount();
428 $this->getCache()->set($this->cacheKey, $count);
435 * Gets the cache object.
437 * @return \Drupal\Core\Cache\CacheBackendInterface
440 protected function getCache() {
441 if (!isset($this->cache)) {
442 $this->cache = \Drupal::cache('migrate');
448 * Gets the source count checking if the source is countable or using the
449 * iterator_count function.
453 protected function doCount() {
454 $iterator = $this->getIterator();
455 return $iterator instanceof \Countable ? $iterator->count() : iterator_count($this->initializeIterator());
459 * Get the high water storage object.
461 * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
462 * The storage object.
464 protected function getHighWaterStorage() {
465 if (!isset($this->highWaterStorage)) {
466 $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
468 return $this->highWaterStorage;
472 * The current value of the high water mark.
474 * The high water mark defines a timestamp stating the time the import was last
475 * run. If the mark is set, only content with a higher timestamp will be
479 * A Unix timestamp representing the high water mark, or NULL if no high
480 * water mark has been stored.
482 protected function getHighWater() {
483 return $this->getHighWaterStorage()->get($this->migration->id());
487 * Save the new high water mark.
489 * @param int $high_water
490 * The high water timestamp.
492 protected function saveHighWater($high_water) {
493 $this->getHighWaterStorage()->set($this->migration->id(), $high_water);
497 * Get information on the property used as the high watermark.
499 * Array of 'name' & (optional) db 'alias' properties used for high watermark.
501 * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
504 * The property used as the high watermark.
506 protected function getHighWaterProperty() {
507 return $this->highWaterProperty;
511 * Get the name of the field used as the high watermark.
513 * The name of the field qualified with an alias if available.
515 * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
517 * @return string|null
518 * The name of the field for the high water mark, or NULL if not set.
520 protected function getHighWaterField() {
521 if (!empty($this->highWaterProperty['name'])) {
522 return !empty($this->highWaterProperty['alias']) ?
523 $this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] :
524 $this->highWaterProperty['name'];
532 public function preRollback(MigrateRollbackEvent $event) {
533 // Nothing to do in this implementation.
539 public function postRollback(MigrateRollbackEvent $event) {
540 // Reset the high-water mark.
541 $this->saveHighWater(NULL);