Pull merge.
[yaffs-website] / web / core / modules / migrate / src / Plugin / migrate / source / SourcePluginBase.php
1 <?php
2
3 namespace Drupal\migrate\Plugin\migrate\source;
4
5 use Drupal\Core\Plugin\PluginBase;
6 use Drupal\migrate\Event\MigrateRollbackEvent;
7 use Drupal\migrate\Event\RollbackAwareInterface;
8 use Drupal\migrate\Plugin\MigrationInterface;
9 use Drupal\migrate\MigrateException;
10 use Drupal\migrate\MigrateSkipRowException;
11 use Drupal\migrate\Plugin\MigrateIdMapInterface;
12 use Drupal\migrate\Plugin\MigrateSourceInterface;
13 use Drupal\migrate\Row;
14
15 /**
16  * The base class for source plugins.
17  *
18  * Available configuration keys:
19  * - cache_counts: (optional) If set, cache the source count.
20  * - skip_count: (optional) If set, do not attempt to count the source.
21  * - track_changes: (optional) If set, track changes to incoming data.
22  * - high_water_property: (optional) It is an array of name & alias values
23  *   (optional table alias). This high_water_property is typically a timestamp
24  *   or serial id showing what was the last imported record. Only content with a
25  *   higher value will be imported.
26  *
27  * The high_water_property and track_changes are mutually exclusive.
28  *
29  * Example:
30  *
31  * @code
32  * source:
33  *   plugin: some_source_plugin_name
34  *   cache_counts: true
35  *   track_changes: true
36  * @endcode
37  *
38  * This example uses the plugin "some_source_plugin_name" and caches the count
39  * of available source records to save calculating it every time count() is
40  * called. Changes to incoming data are watched (because track_changes is true),
41  * which can affect the result of prepareRow().
42  *
43  * Example:
44  *
45  * @code
46  * source:
47  *   plugin: some_source_plugin_name
48  *   skip_count: true
49  *   high_water_property:
50  *     name: changed
51  *     alias: n
52  * @endcode
53  *
54  * In this example, skip_count is true which means count() will not attempt to
55  * count the available source records, but just always return -1 instead. The
56  * high_water_property defines which field marks the last imported row of the
57  * migration. This will get converted into a SQL condition that looks like
58  * 'n.changed' or 'changed' if no alias.
59  *
60  * @see \Drupal\migrate\Plugin\MigratePluginManager
61  * @see \Drupal\migrate\Annotation\MigrateSource
62  * @see \Drupal\migrate\Plugin\MigrateSourceInterface
63  * @see plugin_api
64  *
65  * @ingroup migration
66  */
67 abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {
68
69   /**
70    * The module handler service.
71    *
72    * @var \Drupal\Core\Extension\ModuleHandlerInterface
73    */
74   protected $moduleHandler;
75
76   /**
77    * The entity migration object.
78    *
79    * @var \Drupal\migrate\Plugin\MigrationInterface
80    */
81   protected $migration;
82
83   /**
84    * The current row from the query.
85    *
86    * @var \Drupal\Migrate\Row
87    */
88   protected $currentRow;
89
90   /**
91    * The primary key of the current row.
92    *
93    * @var array
94    */
95   protected $currentSourceIds;
96
97   /**
98    * Information on the property used as the high-water mark.
99    *
100    * Array of 'name' and (optional) db 'alias' properties used for high-water
101    * mark.
102    *
103    * @var array
104    */
105   protected $highWaterProperty = [];
106
107   /**
108    * The key-value storage for the high-water value.
109    *
110    * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
111    */
112   protected $highWaterStorage;
113
114   /**
115    * The high water mark at the beginning of the import operation.
116    *
117    * If the source has a property for tracking changes (like Drupal has
118    * node.changed) then this is the highest value of those imported so far.
119    *
120    * @var int
121    */
122   protected $originalHighWater;
123
124   /**
125    * Whether this instance should cache the source count.
126    *
127    * @var bool
128    */
129   protected $cacheCounts = FALSE;
130
131   /**
132    * Key to use for caching counts.
133    *
134    * @var string
135    */
136   protected $cacheKey;
137
138   /**
139    * Whether this instance should not attempt to count the source.
140    *
141    * @var bool
142    */
143   protected $skipCount = FALSE;
144
145   /**
146    * Flags whether to track changes to incoming data.
147    *
148    * If TRUE, we will maintain hashed source rows to determine whether incoming
149    * data has changed.
150    *
151    * @var bool
152    */
153   protected $trackChanges = FALSE;
154
155   /**
156    * Flags whether source plugin will read the map row and add to data row.
157    *
158    * By default, next() will directly read the map row and add it to the data
159    * row. A source plugin implementation may do this itself (in particular, the
160    * SQL source can incorporate the map table into the query) - if so, it should
161    * set this TRUE so we don't duplicate the effort.
162    *
163    * @var bool
164    */
165   protected $mapRowAdded = FALSE;
166
167   /**
168    * The backend cache.
169    *
170    * @var \Drupal\Core\Cache\CacheBackendInterface
171    */
172   protected $cache;
173
174   /**
175    * The migration ID map.
176    *
177    * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
178    */
179   protected $idMap;
180
181   /**
182    * The iterator to iterate over the source rows.
183    *
184    * @var \Iterator
185    */
186   protected $iterator;
187
188   /**
189    * {@inheritdoc}
190    */
191   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
192     parent::__construct($configuration, $plugin_id, $plugin_definition);
193     $this->migration = $migration;
194
195     // Set up some defaults based on the source configuration.
196     foreach (['cacheCounts' => 'cache_counts', 'skipCount' => 'skip_count', 'trackChanges' => 'track_changes'] as $property => $config_key) {
197       if (isset($configuration[$config_key])) {
198         $this->$property = (bool) $configuration[$config_key];
199       }
200     }
201     $this->cacheKey = !empty($configuration['cache_key']) ? $configuration['cache_key'] : NULL;
202     $this->idMap = $this->migration->getIdMap();
203     $this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;
204
205     // Pull out the current highwater mark if we have a highwater property.
206     if ($this->highWaterProperty) {
207       $this->originalHighWater = $this->getHighWater();
208     }
209
210     // Don't allow the use of both highwater and track changes together.
211     if ($this->highWaterProperty && $this->trackChanges) {
212       throw new MigrateException('You should either use a highwater mark or track changes not both. They are both designed to solve the same problem');
213     }
214   }
215
216   /**
217    * Initializes the iterator with the source data.
218    *
219    * @return \Iterator
220    *   Returns an iteratable object of data for this source.
221    */
222   abstract protected function initializeIterator();
223
224   /**
225    * Gets the module handler.
226    *
227    * @return \Drupal\Core\Extension\ModuleHandlerInterface
228    *   The module handler.
229    */
230   protected function getModuleHandler() {
231     if (!isset($this->moduleHandler)) {
232       $this->moduleHandler = \Drupal::moduleHandler();
233     }
234     return $this->moduleHandler;
235   }
236
237   /**
238    * {@inheritdoc}
239    */
240   public function prepareRow(Row $row) {
241     $result = TRUE;
242     try {
243       $result_hook = $this->getModuleHandler()->invokeAll('migrate_prepare_row', [$row, $this, $this->migration]);
244       $result_named_hook = $this->getModuleHandler()->invokeAll('migrate_' . $this->migration->id() . '_prepare_row', [$row, $this, $this->migration]);
245       // We will skip if any hook returned FALSE.
246       $skip = ($result_hook && in_array(FALSE, $result_hook)) || ($result_named_hook && in_array(FALSE, $result_named_hook));
247       $save_to_map = TRUE;
248     }
249     catch (MigrateSkipRowException $e) {
250       $skip = TRUE;
251       $save_to_map = $e->getSaveToMap();
252       if ($message = trim($e->getMessage())) {
253         $this->idMap->saveMessage($row->getSourceIdValues(), $message, MigrationInterface::MESSAGE_INFORMATIONAL);
254       }
255     }
256
257     // We're explicitly skipping this row - keep track in the map table.
258     if ($skip) {
259       // Make sure we replace any previous messages for this item with any
260       // new ones.
261       if ($save_to_map) {
262         $this->idMap->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
263         $this->currentRow = NULL;
264         $this->currentSourceIds = NULL;
265       }
266       $result = FALSE;
267     }
268     elseif ($this->trackChanges) {
269       // When tracking changed data, We want to quietly skip (rather than
270       // "ignore") rows with changes. The caller needs to make that decision,
271       // so we need to provide them with the necessary information (before and
272       // after hashes).
273       $row->rehash();
274     }
275     return $result;
276   }
277
278   /**
279    * Returns the iterator that will yield the row arrays to be processed.
280    *
281    * @return \Iterator
282    *   The iterator that will yield the row arrays to be processed.
283    */
284   protected function getIterator() {
285     if (!isset($this->iterator)) {
286       $this->iterator = $this->initializeIterator();
287     }
288     return $this->iterator;
289   }
290
291   /**
292    * {@inheritdoc}
293    */
294   public function current() {
295     return $this->currentRow;
296   }
297
298   /**
299    * Gets the iterator key.
300    *
301    * Implementation of \Iterator::key() - called when entering a loop iteration,
302    * returning the key of the current row. It must be a scalar - we will
303    * serialize to fulfill the requirement, but using getCurrentIds() is
304    * preferable.
305    */
306   public function key() {
307     return serialize($this->currentSourceIds);
308   }
309
310   /**
311    * Checks whether the iterator is currently valid.
312    *
313    * Implementation of \Iterator::valid() - called at the top of the loop,
314    * returning TRUE to process the loop and FALSE to terminate it.
315    */
316   public function valid() {
317     return isset($this->currentRow);
318   }
319
320   /**
321    * Rewinds the iterator.
322    *
323    * Implementation of \Iterator::rewind() - subclasses of SourcePluginBase
324    * should implement initializeIterator() to do any class-specific setup for
325    * iterating source records.
326    */
327   public function rewind() {
328     $this->getIterator()->rewind();
329     $this->next();
330   }
331
332   /**
333    * {@inheritdoc}
334    *
335    * The migration iterates over rows returned by the source plugin. This
336    * method determines the next row which will be processed and imported into
337    * the system.
338    *
339    * The method tracks the source and destination IDs using the ID map plugin.
340    *
341    * This also takes care about highwater support. Highwater allows to reimport
342    * rows from a previous migration run, which got changed in the meantime.
343    * This is done by specifying a highwater field, which is compared with the
344    * last time, the migration got executed (originalHighWater).
345    */
346   public function next() {
347     $this->currentSourceIds = NULL;
348     $this->currentRow = NULL;
349
350     // In order to find the next row we want to process, we ask the source
351     // plugin for the next possible row.
352     while (!isset($this->currentRow) && $this->getIterator()->valid()) {
353
354       $row_data = $this->getIterator()->current() + $this->configuration;
355       $this->fetchNextRow();
356       $row = new Row($row_data, $this->getIds());
357
358       // Populate the source key for this row.
359       $this->currentSourceIds = $row->getSourceIdValues();
360
361       // Pick up the existing map row, if any, unless fetchNextRow() did it.
362       if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
363         $row->setIdMap($id_map);
364       }
365
366       // Clear any previous messages for this row before potentially adding
367       // new ones.
368       if (!empty($this->currentSourceIds)) {
369         $this->idMap->delete($this->currentSourceIds, TRUE);
370       }
371
372       // Preparing the row gives source plugins the chance to skip.
373       if ($this->prepareRow($row) === FALSE) {
374         continue;
375       }
376
377       // Check whether the row needs processing.
378       // 1. This row has not been imported yet.
379       // 2. Explicitly set to update.
380       // 3. The row is newer than the current highwater mark.
381       // 4. If no such property exists then try by checking the hash of the row.
382       if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) {
383         $this->currentRow = $row->freezeSource();
384       }
385
386       if ($this->getHighWaterProperty()) {
387         $this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name']));
388       }
389     }
390   }
391
392   /**
393    * Position the iterator to the following row.
394    */
395   protected function fetchNextRow() {
396     $this->getIterator()->next();
397   }
398
399   /**
400    * Check if the incoming data is newer than what we've previously imported.
401    *
402    * @param \Drupal\migrate\Row $row
403    *   The row we're importing.
404    *
405    * @return bool
406    *   TRUE if the highwater value in the row is greater than our current value.
407    */
408   protected function aboveHighwater(Row $row) {
409     return $this->getHighWaterProperty() && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
410   }
411
412   /**
413    * Checks if the incoming row has changed since our last import.
414    *
415    * @param \Drupal\migrate\Row $row
416    *   The row we're importing.
417    *
418    * @return bool
419    *   TRUE if the row has changed otherwise FALSE.
420    */
421   protected function rowChanged(Row $row) {
422     return $this->trackChanges && $row->changed();
423   }
424
425   /**
426    * Gets the currentSourceIds data member.
427    */
428   public function getCurrentIds() {
429     return $this->currentSourceIds;
430   }
431
432   /**
433    * Gets the source count.
434    *
435    * Return a count of available source records, from the cache if appropriate.
436    * Returns -1 if the source is not countable.
437    *
438    * @param bool $refresh
439    *   (optional) Whether or not to refresh the count. Defaults to FALSE. Not
440    *   all implementations support the reset flag. In such instances this
441    *   parameter is ignored and the result of calling the method will always be
442    *   up to date.
443    *
444    * @return int
445    *   The count.
446    */
447   public function count($refresh = FALSE) {
448     if ($this->skipCount) {
449       return -1;
450     }
451
452     if (!isset($this->cacheKey)) {
453       $this->cacheKey = hash('sha256', $this->getPluginId());
454     }
455
456     // If a refresh is requested, or we're not caching counts, ask the derived
457     // class to get the count from the source.
458     if ($refresh || !$this->cacheCounts) {
459       $count = $this->doCount();
460       $this->getCache()->set($this->cacheKey, $count);
461     }
462     else {
463       // Caching is in play, first try to retrieve a cached count.
464       $cache_object = $this->getCache()->get($this->cacheKey, 'cache');
465       if (is_object($cache_object)) {
466         // Success.
467         $count = $cache_object->data;
468       }
469       else {
470         // No cached count, ask the derived class to count 'em up, and cache
471         // the result.
472         $count = $this->doCount();
473         $this->getCache()->set($this->cacheKey, $count);
474       }
475     }
476     return $count;
477   }
478
479   /**
480    * Gets the cache object.
481    *
482    * @return \Drupal\Core\Cache\CacheBackendInterface
483    *   The cache object.
484    */
485   protected function getCache() {
486     if (!isset($this->cache)) {
487       $this->cache = \Drupal::cache('migrate');
488     }
489     return $this->cache;
490   }
491
492   /**
493    * Gets the source count checking if the source is countable or using the
494    * iterator_count function.
495    *
496    * @return int
497    */
498   protected function doCount() {
499     $iterator = $this->getIterator();
500     return $iterator instanceof \Countable ? $iterator->count() : iterator_count($this->initializeIterator());
501   }
502
503   /**
504    * Get the high water storage object.
505    *
506    * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
507    *   The storage object.
508    */
509   protected function getHighWaterStorage() {
510     if (!isset($this->highWaterStorage)) {
511       $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
512     }
513     return $this->highWaterStorage;
514   }
515
516   /**
517    * The current value of the high water mark.
518    *
519    * The high water mark defines a timestamp stating the time the import was last
520    * run. If the mark is set, only content with a higher timestamp will be
521    * imported.
522    *
523    * @return int|null
524    *   A Unix timestamp representing the high water mark, or NULL if no high
525    *   water mark has been stored.
526    */
527   protected function getHighWater() {
528     return $this->getHighWaterStorage()->get($this->migration->id());
529   }
530
531   /**
532    * Save the new high water mark.
533    *
534    * @param int $high_water
535    *   The high water timestamp.
536    */
537   protected function saveHighWater($high_water) {
538     $this->getHighWaterStorage()->set($this->migration->id(), $high_water);
539   }
540
541   /**
542    * Get information on the property used as the high watermark.
543    *
544    * Array of 'name' & (optional) db 'alias' properties used for high watermark.
545    *
546    * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
547    *
548    * @return array
549    *   The property used as the high watermark.
550    */
551   protected function getHighWaterProperty() {
552     return $this->highWaterProperty;
553   }
554
555   /**
556    * Get the name of the field used as the high watermark.
557    *
558    * The name of the field qualified with an alias if available.
559    *
560    * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
561    *
562    * @return string|null
563    *   The name of the field for the high water mark, or NULL if not set.
564    */
565   protected function getHighWaterField() {
566     if (!empty($this->highWaterProperty['name'])) {
567       return !empty($this->highWaterProperty['alias']) ?
568         $this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] :
569         $this->highWaterProperty['name'];
570     }
571     return NULL;
572   }
573
574   /**
575    * {@inheritdoc}
576    */
577   public function preRollback(MigrateRollbackEvent $event) {
578     // Nothing to do in this implementation.
579   }
580
581   /**
582    * {@inheritdoc}
583    */
584   public function postRollback(MigrateRollbackEvent $event) {
585     // Reset the high-water mark.
586     $this->saveHighWater(NULL);
587   }
588
589   /**
590    * {@inheritdoc}
591    */
592   public function getSourceModule() {
593     if (!empty($this->configuration['source_module'])) {
594       return $this->configuration['source_module'];
595     }
596     elseif (!empty($this->pluginDefinition['source_module'])) {
597       return $this->pluginDefinition['source_module'];
598     }
599     return NULL;
600   }
601
602 }