e35585034e585e453d788709558a28f251f364b9
[yaffs-website] / web / core / modules / migrate / src / Plugin / migrate / source / SourcePluginBase.php
1 <?php
2
3 namespace Drupal\migrate\Plugin\migrate\source;
4
5 use Drupal\Core\Plugin\PluginBase;
6 use Drupal\migrate\Event\MigrateRollbackEvent;
7 use Drupal\migrate\Event\RollbackAwareInterface;
8 use Drupal\migrate\Plugin\MigrationInterface;
9 use Drupal\migrate\MigrateException;
10 use Drupal\migrate\MigrateSkipRowException;
11 use Drupal\migrate\Plugin\MigrateIdMapInterface;
12 use Drupal\migrate\Plugin\MigrateSourceInterface;
13 use Drupal\migrate\Row;
14
15 /**
16  * The base class for all source plugins.
17  *
18  * @see \Drupal\migrate\Plugin\MigratePluginManager
19  * @see \Drupal\migrate\Annotation\MigrateSource
20  * @see \Drupal\migrate\Plugin\MigrateSourceInterface
21  * @see plugin_api
22  *
23  * @ingroup migration
24  */
25 abstract class SourcePluginBase extends PluginBase implements MigrateSourceInterface, RollbackAwareInterface {
26
27   /**
28    * The module handler service.
29    *
30    * @var \Drupal\Core\Extension\ModuleHandlerInterface
31    */
32   protected $moduleHandler;
33
34   /**
35    * The entity migration object.
36    *
37    * @var \Drupal\migrate\Plugin\MigrationInterface
38    */
39   protected $migration;
40
41   /**
42    * The current row from the query.
43    *
44    * @var \Drupal\Migrate\Row
45    */
46   protected $currentRow;
47
48   /**
49    * The primary key of the current row.
50    *
51    * @var array
52    */
53   protected $currentSourceIds;
54
55   /**
56    * Information on the property used as the high-water mark.
57    *
58    * Array of 'name' and (optional) db 'alias' properties used for high-water
59    * mark.
60    *
61    * @var array
62    */
63   protected $highWaterProperty = [];
64
65   /**
66    * The key-value storage for the high-water value.
67    *
68    * @var \Drupal\Core\KeyValueStore\KeyValueStoreInterface
69    */
70   protected $highWaterStorage;
71
72   /**
73    * The high water mark at the beginning of the import operation.
74    *
75    * If the source has a property for tracking changes (like Drupal has
76    * node.changed) then this is the highest value of those imported so far.
77    *
78    * @var int
79    */
80   protected $originalHighWater;
81
82   /**
83    * Whether this instance should cache the source count.
84    *
85    * @var bool
86    */
87   protected $cacheCounts = FALSE;
88
89   /**
90    * Key to use for caching counts.
91    *
92    * @var string
93    */
94   protected $cacheKey;
95
96   /**
97    * Whether this instance should not attempt to count the source.
98    *
99    * @var bool
100    */
101   protected $skipCount = FALSE;
102
103   /**
104    * Flags whether to track changes to incoming data.
105    *
106    * If TRUE, we will maintain hashed source rows to determine whether incoming
107    * data has changed.
108    *
109    * @var bool
110    */
111   protected $trackChanges = FALSE;
112
113   /**
114    * Flags whether source plugin will read the map row and add to data row.
115    *
116    * By default, next() will directly read the map row and add it to the data
117    * row. A source plugin implementation may do this itself (in particular, the
118    * SQL source can incorporate the map table into the query) - if so, it should
119    * set this TRUE so we don't duplicate the effort.
120    *
121    * @var bool
122    */
123   protected $mapRowAdded = FALSE;
124
125   /**
126    * The backend cache.
127    *
128    * @var \Drupal\Core\Cache\CacheBackendInterface
129    */
130   protected $cache;
131
132   /**
133    * The migration ID map.
134    *
135    * @var \Drupal\migrate\Plugin\MigrateIdMapInterface
136    */
137   protected $idMap;
138
139   /**
140    * The iterator to iterate over the source rows.
141    *
142    * @var \Iterator
143    */
144   protected $iterator;
145
146   /**
147    * {@inheritdoc}
148    */
149   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration) {
150     parent::__construct($configuration, $plugin_id, $plugin_definition);
151     $this->migration = $migration;
152
153     // Set up some defaults based on the source configuration.
154     foreach (['cacheCounts' => 'cache_counts', 'skipCount' => 'skip_count', 'trackChanges' => 'track_changes'] as $property => $config_key) {
155       if (isset($configuration[$config_key])) {
156         $this->$property = (bool) $configuration[$config_key];
157       }
158     }
159     $this->cacheKey = !empty($configuration['cache_key']) ? $configuration['cache_key'] : NULL;
160     $this->idMap = $this->migration->getIdMap();
161     $this->highWaterProperty = !empty($configuration['high_water_property']) ? $configuration['high_water_property'] : FALSE;
162
163     // Pull out the current highwater mark if we have a highwater property.
164     if ($this->highWaterProperty) {
165       $this->originalHighWater = $this->getHighWater();
166     }
167
168     // Don't allow the use of both highwater and track changes together.
169     if ($this->highWaterProperty && $this->trackChanges) {
170       throw new MigrateException('You should either use a highwater mark or track changes not both. They are both designed to solve the same problem');
171     }
172   }
173
174   /**
175    * Initializes the iterator with the source data.
176    *
177    * @return array
178    *   An array of the data for this source.
179    */
180   protected abstract function initializeIterator();
181
182   /**
183    * Gets the module handler.
184    *
185    * @return \Drupal\Core\Extension\ModuleHandlerInterface
186    *   The module handler.
187    */
188   protected function getModuleHandler() {
189     if (!isset($this->moduleHandler)) {
190       $this->moduleHandler = \Drupal::moduleHandler();
191     }
192     return $this->moduleHandler;
193   }
194
195   /**
196    * {@inheritdoc}
197    */
198   public function prepareRow(Row $row) {
199     $result = TRUE;
200     try {
201       $result_hook = $this->getModuleHandler()->invokeAll('migrate_prepare_row', [$row, $this, $this->migration]);
202       $result_named_hook = $this->getModuleHandler()->invokeAll('migrate_' . $this->migration->id() . '_prepare_row', [$row, $this, $this->migration]);
203       // We will skip if any hook returned FALSE.
204       $skip = ($result_hook && in_array(FALSE, $result_hook)) || ($result_named_hook && in_array(FALSE, $result_named_hook));
205       $save_to_map = TRUE;
206     }
207     catch (MigrateSkipRowException $e) {
208       $skip = TRUE;
209       $save_to_map = $e->getSaveToMap();
210       if ($message = trim($e->getMessage())) {
211         $this->idMap->saveMessage($row->getSourceIdValues(), $message, MigrationInterface::MESSAGE_INFORMATIONAL);
212       }
213     }
214
215     // We're explicitly skipping this row - keep track in the map table.
216     if ($skip) {
217       // Make sure we replace any previous messages for this item with any
218       // new ones.
219       if ($save_to_map) {
220         $this->idMap->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);
221         $this->currentRow = NULL;
222         $this->currentSourceIds = NULL;
223       }
224       $result = FALSE;
225     }
226     elseif ($this->trackChanges) {
227       // When tracking changed data, We want to quietly skip (rather than
228       // "ignore") rows with changes. The caller needs to make that decision,
229       // so we need to provide them with the necessary information (before and
230       // after hashes).
231       $row->rehash();
232     }
233     return $result;
234   }
235
236   /**
237    * Returns the iterator that will yield the row arrays to be processed.
238    *
239    * @return \Iterator
240    *   The iterator that will yield the row arrays to be processed.
241    */
242   protected function getIterator() {
243     if (!isset($this->iterator)) {
244       $this->iterator = $this->initializeIterator();
245     }
246     return $this->iterator;
247   }
248
249   /**
250    * {@inheritdoc}
251    */
252   public function current() {
253     return $this->currentRow;
254   }
255
256   /**
257    * Gets the iterator key.
258    *
259    * Implementation of \Iterator::key() - called when entering a loop iteration,
260    * returning the key of the current row. It must be a scalar - we will
261    * serialize to fulfill the requirement, but using getCurrentIds() is
262    * preferable.
263    */
264   public function key() {
265     return serialize($this->currentSourceIds);
266   }
267
268   /**
269    * Checks whether the iterator is currently valid.
270    *
271    * Implementation of \Iterator::valid() - called at the top of the loop,
272    * returning TRUE to process the loop and FALSE to terminate it.
273    */
274   public function valid() {
275     return isset($this->currentRow);
276   }
277
278   /**
279    * Rewinds the iterator.
280    *
281    * Implementation of \Iterator::rewind() - subclasses of SourcePluginBase
282    * should implement initializeIterator() to do any class-specific setup for
283    * iterating source records.
284    */
285   public function rewind() {
286     $this->getIterator()->rewind();
287     $this->next();
288   }
289
290   /**
291    * {@inheritdoc}
292    *
293    * The migration iterates over rows returned by the source plugin. This
294    * method determines the next row which will be processed and imported into
295    * the system.
296    *
297    * The method tracks the source and destination IDs using the ID map plugin.
298    *
299    * This also takes care about highwater support. Highwater allows to reimport
300    * rows from a previous migration run, which got changed in the meantime.
301    * This is done by specifying a highwater field, which is compared with the
302    * last time, the migration got executed (originalHighWater).
303    */
304   public function next() {
305     $this->currentSourceIds = NULL;
306     $this->currentRow = NULL;
307
308     // In order to find the next row we want to process, we ask the source
309     // plugin for the next possible row.
310     while (!isset($this->currentRow) && $this->getIterator()->valid()) {
311
312       $row_data = $this->getIterator()->current() + $this->configuration;
313       $this->fetchNextRow();
314       $row = new Row($row_data, $this->migration->getSourcePlugin()->getIds(), $this->migration->getDestinationIds());
315
316       // Populate the source key for this row.
317       $this->currentSourceIds = $row->getSourceIdValues();
318
319       // Pick up the existing map row, if any, unless fetchNextRow() did it.
320       if (!$this->mapRowAdded && ($id_map = $this->idMap->getRowBySource($this->currentSourceIds))) {
321         $row->setIdMap($id_map);
322       }
323
324       // Clear any previous messages for this row before potentially adding
325       // new ones.
326       if (!empty($this->currentSourceIds)) {
327         $this->idMap->delete($this->currentSourceIds, TRUE);
328       }
329
330       // Preparing the row gives source plugins the chance to skip.
331       if ($this->prepareRow($row) === FALSE) {
332         continue;
333       }
334
335       // Check whether the row needs processing.
336       // 1. This row has not been imported yet.
337       // 2. Explicitly set to update.
338       // 3. The row is newer than the current highwater mark.
339       // 4. If no such property exists then try by checking the hash of the row.
340       if (!$row->getIdMap() || $row->needsUpdate() || $this->aboveHighwater($row) || $this->rowChanged($row)) {
341         $this->currentRow = $row->freezeSource();
342       }
343
344       if ($this->getHighWaterProperty()) {
345         $this->saveHighWater($row->getSourceProperty($this->highWaterProperty['name']));
346       }
347     }
348   }
349
350   /**
351    * Position the iterator to the following row.
352    */
353   protected function fetchNextRow() {
354     $this->getIterator()->next();
355   }
356
357   /**
358    * Check if the incoming data is newer than what we've previously imported.
359    *
360    * @param \Drupal\migrate\Row $row
361    *   The row we're importing.
362    *
363    * @return bool
364    *   TRUE if the highwater value in the row is greater than our current value.
365    */
366   protected function aboveHighwater(Row $row) {
367     return $this->getHighWaterProperty() && $row->getSourceProperty($this->highWaterProperty['name']) > $this->originalHighWater;
368   }
369
370   /**
371    * Checks if the incoming row has changed since our last import.
372    *
373    * @param \Drupal\migrate\Row $row
374    *   The row we're importing.
375    *
376    * @return bool
377    *   TRUE if the row has changed otherwise FALSE.
378    */
379   protected function rowChanged(Row $row) {
380     return $this->trackChanges && $row->changed();
381   }
382
383   /**
384    * Gets the currentSourceIds data member.
385    */
386   public function getCurrentIds() {
387     return $this->currentSourceIds;
388   }
389
390   /**
391    * Gets the source count.
392    *
393    * Return a count of available source records, from the cache if appropriate.
394    * Returns -1 if the source is not countable.
395    *
396    * @param bool $refresh
397    *   (optional) Whether or not to refresh the count. Defaults to FALSE.
398    *
399    * @return int
400    *   The count.
401    */
402   public function count($refresh = FALSE) {
403     if ($this->skipCount) {
404       return -1;
405     }
406
407     if (!isset($this->cacheKey)) {
408       $this->cacheKey = hash('sha256', $this->getPluginId());
409     }
410
411     // If a refresh is requested, or we're not caching counts, ask the derived
412     // class to get the count from the source.
413     if ($refresh || !$this->cacheCounts) {
414       $count = $this->doCount();
415       $this->getCache()->set($this->cacheKey, $count);
416     }
417     else {
418       // Caching is in play, first try to retrieve a cached count.
419       $cache_object = $this->getCache()->get($this->cacheKey, 'cache');
420       if (is_object($cache_object)) {
421         // Success.
422         $count = $cache_object->data;
423       }
424       else {
425         // No cached count, ask the derived class to count 'em up, and cache
426         // the result.
427         $count = $this->doCount();
428         $this->getCache()->set($this->cacheKey, $count);
429       }
430     }
431     return $count;
432   }
433
434   /**
435    * Gets the cache object.
436    *
437    * @return \Drupal\Core\Cache\CacheBackendInterface
438    *   The cache object.
439    */
440   protected function getCache() {
441     if (!isset($this->cache)) {
442       $this->cache = \Drupal::cache('migrate');
443     }
444     return $this->cache;
445   }
446
447   /**
448    * Gets the source count checking if the source is countable or using the
449    * iterator_count function.
450    *
451    * @return int
452    */
453   protected function doCount() {
454     $iterator = $this->getIterator();
455     return $iterator instanceof \Countable ? $iterator->count() : iterator_count($this->initializeIterator());
456   }
457
458   /**
459    * Get the high water storage object.
460    *
461    * @return \Drupal\Core\KeyValueStore\KeyValueStoreInterface
462    *   The storage object.
463    */
464   protected function getHighWaterStorage() {
465     if (!isset($this->highWaterStorage)) {
466       $this->highWaterStorage = \Drupal::keyValue('migrate:high_water');
467     }
468     return $this->highWaterStorage;
469   }
470
471   /**
472    * The current value of the high water mark.
473    *
474    * The high water mark defines a timestamp stating the time the import was last
475    * run. If the mark is set, only content with a higher timestamp will be
476    * imported.
477    *
478    * @return int|null
479    *   A Unix timestamp representing the high water mark, or NULL if no high
480    *   water mark has been stored.
481    */
482   protected function getHighWater() {
483     return $this->getHighWaterStorage()->get($this->migration->id());
484   }
485
486   /**
487    * Save the new high water mark.
488    *
489    * @param int $high_water
490    *   The high water timestamp.
491    */
492   protected function saveHighWater($high_water) {
493     $this->getHighWaterStorage()->set($this->migration->id(), $high_water);
494   }
495
496   /**
497    * Get information on the property used as the high watermark.
498    *
499    * Array of 'name' & (optional) db 'alias' properties used for high watermark.
500    *
501    * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
502    *
503    * @return array
504    *   The property used as the high watermark.
505    */
506   protected function getHighWaterProperty() {
507     return $this->highWaterProperty;
508   }
509
510   /**
511    * Get the name of the field used as the high watermark.
512    *
513    * The name of the field qualified with an alias if available.
514    *
515    * @see \Drupal\migrate\Plugin\migrate\source\SqlBase::initializeIterator()
516    *
517    * @return string|null
518    *   The name of the field for the high water mark, or NULL if not set.
519    */
520   protected function getHighWaterField() {
521     if (!empty($this->highWaterProperty['name'])) {
522       return !empty($this->highWaterProperty['alias']) ?
523         $this->highWaterProperty['alias'] . '.' . $this->highWaterProperty['name'] :
524         $this->highWaterProperty['name'];
525     }
526     return NULL;
527   }
528
529   /**
530    * {@inheritdoc}
531    */
532   public function preRollback(MigrateRollbackEvent $event) {
533     // Nothing to do in this implementation.
534   }
535
536   /**
537    * {@inheritdoc}
538    */
539   public function postRollback(MigrateRollbackEvent $event) {
540     // Reset the high-water mark.
541     $this->saveHighWater(NULL);
542   }
543
544 }