0de83485ffa9a1c9f1bc38f1bb732ace7d4a637e
[yaffs-website] / web / core / modules / migrate / src / Plugin / migrate / id_map / Sql.php
1 <?php
2
3 namespace Drupal\migrate\Plugin\migrate\id_map;
4
5 use Drupal\Core\Database\DatabaseException;
6 use Drupal\Core\Field\BaseFieldDefinition;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\Plugin\PluginBase;
9 use Drupal\migrate\MigrateMessage;
10 use Drupal\migrate\Audit\HighestIdInterface;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Event\MigrateIdMapMessageEvent;
13 use Drupal\migrate\MigrateException;
14 use Drupal\migrate\MigrateMessageInterface;
15 use Drupal\migrate\Plugin\MigrateIdMapInterface;
16 use Drupal\migrate\Row;
17 use Drupal\migrate\Event\MigrateEvents;
18 use Drupal\migrate\Event\MigrateMapSaveEvent;
19 use Drupal\migrate\Event\MigrateMapDeleteEvent;
20 use Symfony\Component\DependencyInjection\ContainerInterface;
21 use Symfony\Component\EventDispatcher\EventDispatcherInterface;
22
23 /**
24  * Defines the sql based ID map implementation.
25  *
26  * It creates one map and one message table per migration entity to store the
27  * relevant information.
28  *
29  * @PluginID("sql")
30  */
31 class Sql extends PluginBase implements MigrateIdMapInterface, ContainerFactoryPluginInterface, HighestIdInterface {
32
33   /**
34    * Column name of hashed source id values.
35    */
36   const SOURCE_IDS_HASH = 'source_ids_hash';
37
38   /**
39    * An event dispatcher instance to use for map events.
40    *
41    * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface
42    */
43   protected $eventDispatcher;
44
45   /**
46    * The migration map table name.
47    *
48    * @var string
49    */
50   protected $mapTableName;
51
52   /**
53    * The message table name.
54    *
55    * @var string
56    */
57   protected $messageTableName;
58
59   /**
60    * The migrate message service.
61    *
62    * @var \Drupal\migrate\MigrateMessageInterface
63    */
64   protected $message;
65
66   /**
67    * The database connection for the map/message tables on the destination.
68    *
69    * @var \Drupal\Core\Database\Connection
70    */
71   protected $database;
72
73   /**
74    * The select query.
75    *
76    * @var \Drupal\Core\Database\Query\SelectInterface
77    */
78   protected $query;
79
80   /**
81    * The migration being done.
82    *
83    * @var \Drupal\migrate\Plugin\MigrationInterface
84    */
85   protected $migration;
86
87   /**
88    * The source ID fields.
89    *
90    * @var array
91    */
92   protected $sourceIdFields;
93
94   /**
95    * The destination ID fields.
96    *
97    * @var array
98    */
99   protected $destinationIdFields;
100
101   /**
102    * Whether the plugin is already initialized.
103    *
104    * @var bool
105    */
106   protected $initialized;
107
108   /**
109    * The result.
110    *
111    * @var null
112    */
113   protected $result = NULL;
114
115   /**
116    * The source identifiers.
117    *
118    * @var array
119    */
120   protected $sourceIds = [];
121
122   /**
123    * The destination identifiers.
124    *
125    * @var array
126    */
127   protected $destinationIds = [];
128
129   /**
130    * The current row.
131    *
132    * @var null
133    */
134   protected $currentRow = NULL;
135
136   /**
137    * The current key.
138    *
139    * @var array
140    */
141   protected $currentKey = [];
142
143   /**
144    * Constructs an SQL object.
145    *
146    * Sets up the tables and builds the maps,
147    *
148    * @param array $configuration
149    *   The configuration.
150    * @param string $plugin_id
151    *   The plugin ID for the migration process to do.
152    * @param mixed $plugin_definition
153    *   The configuration for the plugin.
154    * @param \Drupal\migrate\Plugin\MigrationInterface $migration
155    *   The migration to do.
156    * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher
157    *   The event dispatcher.
158    */
159   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, EventDispatcherInterface $event_dispatcher) {
160     parent::__construct($configuration, $plugin_id, $plugin_definition);
161     $this->migration = $migration;
162     $this->eventDispatcher = $event_dispatcher;
163     $this->message = new MigrateMessage();
164
165     if (!isset($this->database)) {
166       $this->database = \Drupal::database();
167     }
168
169     // Default generated table names, limited to 63 characters.
170     $machine_name = str_replace(':', '__', $this->migration->id());
171     $prefix_length = strlen($this->database->tablePrefix());
172     $this->mapTableName = 'migrate_map_' . mb_strtolower($machine_name);
173     $this->mapTableName = mb_substr($this->mapTableName, 0, 63 - $prefix_length);
174     $this->messageTableName = 'migrate_message_' . mb_strtolower($machine_name);
175     $this->messageTableName = mb_substr($this->messageTableName, 0, 63 - $prefix_length);
176   }
177
178   /**
179    * {@inheritdoc}
180    */
181   public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
182     return new static(
183       $configuration,
184       $plugin_id,
185       $plugin_definition,
186       $migration,
187       $container->get('event_dispatcher')
188     );
189   }
190
191   /**
192    * Retrieves the hash of the source identifier values.
193    *
194    * @internal
195    *
196    * @param array $source_id_values
197    *   The source identifiers
198    *
199    * @return string
200    *   An hash containing the hashed values of the source identifiers.
201    */
202   public function getSourceIdsHash(array $source_id_values) {
203     // When looking up the destination ID we require an array with both the
204     // source key and value, e.g. ['nid' => 41]. In this case, $source_id_values
205     // need to be ordered the same order as $this->sourceIdFields().
206     // However, the Migration process plugin doesn't currently have a way to get
207     // the source key so we presume the values have been passed through in the
208     // correct order.
209     if (!isset($source_id_values[0])) {
210       $source_id_values_keyed = [];
211       foreach ($this->sourceIdFields() as $field_name => $source_id) {
212         $source_id_values_keyed[] = $source_id_values[$field_name];
213       }
214       $source_id_values = $source_id_values_keyed;
215     }
216     return hash('sha256', serialize(array_map('strval', $source_id_values)));
217   }
218
219   /**
220    * The source ID fields.
221    *
222    * @return array
223    *   The source ID fields.
224    */
225   protected function sourceIdFields() {
226     if (!isset($this->sourceIdFields)) {
227       // Build the source and destination identifier maps.
228       $this->sourceIdFields = [];
229       $count = 1;
230       foreach ($this->migration->getSourcePlugin()->getIds() as $field => $schema) {
231         $this->sourceIdFields[$field] = 'sourceid' . $count++;
232       }
233     }
234     return $this->sourceIdFields;
235   }
236
237   /**
238    * The destination ID fields.
239    *
240    * @return array
241    *   The destination ID fields.
242    */
243   protected function destinationIdFields() {
244     if (!isset($this->destinationIdFields)) {
245       $this->destinationIdFields = [];
246       $count = 1;
247       foreach ($this->migration->getDestinationPlugin()->getIds() as $field => $schema) {
248         $this->destinationIdFields[$field] = 'destid' . $count++;
249       }
250     }
251     return $this->destinationIdFields;
252   }
253
254   /**
255    * The name of the database map table.
256    *
257    * @return string
258    *   The map table name.
259    */
260   public function mapTableName() {
261     return $this->mapTableName;
262   }
263
264   /**
265    * The name of the database message table.
266    *
267    * @return string
268    *   The message table name.
269    */
270   public function messageTableName() {
271     return $this->messageTableName;
272   }
273
274   /**
275    * Get the fully qualified map table name.
276    *
277    * @return string
278    *   The fully qualified map table name.
279    */
280   public function getQualifiedMapTableName() {
281     return $this->getDatabase()->getFullQualifiedTableName($this->mapTableName);
282   }
283
284   /**
285    * Gets the database connection.
286    *
287    * @return \Drupal\Core\Database\Connection
288    *   The database connection object.
289    */
290   public function getDatabase() {
291     $this->init();
292     return $this->database;
293   }
294
295   /**
296    * Initialize the plugin.
297    */
298   protected function init() {
299     if (!$this->initialized) {
300       $this->initialized = TRUE;
301       $this->ensureTables();
302     }
303   }
304
305   /**
306    * {@inheritdoc}
307    */
308   public function setMessage(MigrateMessageInterface $message) {
309     $this->message = $message;
310   }
311
312   /**
313    * Create the map and message tables if they don't already exist.
314    */
315   protected function ensureTables() {
316     if (!$this->getDatabase()->schema()->tableExists($this->mapTableName)) {
317       // Generate appropriate schema info for the map and message tables,
318       // and map from the source field names to the map/msg field names.
319       $count = 1;
320       $source_id_schema = [];
321       $indexes = [];
322       foreach ($this->migration->getSourcePlugin()->getIds() as $id_definition) {
323         $mapkey = 'sourceid' . $count++;
324         $indexes['source'][] = $mapkey;
325         $source_id_schema[$mapkey] = $this->getFieldSchema($id_definition);
326         $source_id_schema[$mapkey]['not null'] = TRUE;
327       }
328
329       $source_ids_hash[static::SOURCE_IDS_HASH] = [
330         'type' => 'varchar',
331         'length' => '64',
332         'not null' => TRUE,
333         'description' => 'Hash of source ids. Used as primary key',
334       ];
335       $fields = $source_ids_hash + $source_id_schema;
336
337       // Add destination identifiers to map table.
338       // @todo How do we discover the destination schema?
339       $count = 1;
340       foreach ($this->migration->getDestinationPlugin()->getIds() as $id_definition) {
341         // Allow dest identifier fields to be NULL (for IGNORED/FAILED cases).
342         $mapkey = 'destid' . $count++;
343         $fields[$mapkey] = $this->getFieldSchema($id_definition);
344         $fields[$mapkey]['not null'] = FALSE;
345       }
346       $fields['source_row_status'] = [
347         'type' => 'int',
348         'size' => 'tiny',
349         'unsigned' => TRUE,
350         'not null' => TRUE,
351         'default' => MigrateIdMapInterface::STATUS_IMPORTED,
352         'description' => 'Indicates current status of the source row',
353       ];
354       $fields['rollback_action'] = [
355         'type' => 'int',
356         'size' => 'tiny',
357         'unsigned' => TRUE,
358         'not null' => TRUE,
359         'default' => MigrateIdMapInterface::ROLLBACK_DELETE,
360         'description' => 'Flag indicating what to do for this item on rollback',
361       ];
362       $fields['last_imported'] = [
363         'type' => 'int',
364         'unsigned' => TRUE,
365         'not null' => TRUE,
366         'default' => 0,
367         'description' => 'UNIX timestamp of the last time this row was imported',
368       ];
369       $fields['hash'] = [
370         'type' => 'varchar',
371         'length' => '64',
372         'not null' => FALSE,
373         'description' => 'Hash of source row data, for detecting changes',
374       ];
375       $schema = [
376         'description' => 'Mappings from source identifier value(s) to destination identifier value(s).',
377         'fields' => $fields,
378         'primary key' => [static::SOURCE_IDS_HASH],
379         'indexes' => $indexes,
380       ];
381       $this->getDatabase()->schema()->createTable($this->mapTableName, $schema);
382
383       // Now do the message table.
384       if (!$this->getDatabase()->schema()->tableExists($this->messageTableName())) {
385         $fields = [];
386         $fields['msgid'] = [
387           'type' => 'serial',
388           'unsigned' => TRUE,
389           'not null' => TRUE,
390         ];
391         $fields += $source_ids_hash;
392
393         $fields['level'] = [
394           'type' => 'int',
395           'unsigned' => TRUE,
396           'not null' => TRUE,
397           'default' => 1,
398         ];
399         $fields['message'] = [
400           'type' => 'text',
401           'size' => 'medium',
402           'not null' => TRUE,
403         ];
404         $schema = [
405           'description' => 'Messages generated during a migration process',
406           'fields' => $fields,
407           'primary key' => ['msgid'],
408         ];
409         $this->getDatabase()->schema()->createTable($this->messageTableName(), $schema);
410       }
411     }
412     else {
413       // Add any missing columns to the map table.
414       if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName,
415                                                     'rollback_action')) {
416         $this->getDatabase()->schema()->addField($this->mapTableName, 'rollback_action',
417           [
418             'type' => 'int',
419             'size' => 'tiny',
420             'unsigned' => TRUE,
421             'not null' => TRUE,
422             'default' => 0,
423             'description' => 'Flag indicating what to do for this item on rollback',
424           ]
425         );
426       }
427       if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName, 'hash')) {
428         $this->getDatabase()->schema()->addField($this->mapTableName, 'hash',
429           [
430             'type' => 'varchar',
431             'length' => '64',
432             'not null' => FALSE,
433             'description' => 'Hash of source row data, for detecting changes',
434           ]
435         );
436       }
437       if (!$this->getDatabase()->schema()->fieldExists($this->mapTableName, static::SOURCE_IDS_HASH)) {
438         $this->getDatabase()->schema()->addField($this->mapTableName, static::SOURCE_IDS_HASH, [
439           'type' => 'varchar',
440           'length' => '64',
441           'not null' => TRUE,
442           'description' => 'Hash of source ids. Used as primary key',
443         ]);
444       }
445     }
446   }
447
448   /**
449    * Creates schema from an ID definition.
450    *
451    * @param array $id_definition
452    *   The definition of the field having the structure as the items returned by
453    *   MigrateSourceInterface or MigrateDestinationInterface::getIds().
454    *
455    * @return array
456    *   The database schema definition.
457    *
458    * @see \Drupal\migrate\Plugin\MigrateSourceInterface::getIds()
459    * @see \Drupal\migrate\Plugin\MigrateDestinationInterface::getIds()
460    */
461   protected function getFieldSchema(array $id_definition) {
462     $type_parts = explode('.', $id_definition['type']);
463     if (count($type_parts) == 1) {
464       $type_parts[] = 'value';
465     }
466     unset($id_definition['type']);
467
468     // Get the field storage definition.
469     $definition = BaseFieldDefinition::create($type_parts[0]);
470
471     // Get a list of setting keys belonging strictly to the field definition.
472     $default_field_settings = $definition->getSettings();
473     // Separate field definition settings from custom settings. Custom settings
474     // are settings passed in $id_definition that are not part of field storage
475     // definition settings.
476     $field_settings = array_intersect_key($id_definition, $default_field_settings);
477     $custom_settings = array_diff_key($id_definition, $default_field_settings);
478
479     // Resolve schema from field storage definition settings.
480     $schema = $definition
481       ->setSettings($field_settings)
482       ->getColumns()[$type_parts[1]];
483
484     // Merge back custom settings.
485     return $schema + $custom_settings;
486   }
487
488   /**
489    * {@inheritdoc}
490    */
491   public function getRowBySource(array $source_id_values) {
492     $query = $this->getDatabase()->select($this->mapTableName(), 'map')
493       ->fields('map');
494     $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
495     $result = $query->execute();
496     return $result->fetchAssoc();
497   }
498
499   /**
500    * {@inheritdoc}
501    */
502   public function getRowByDestination(array $destination_id_values) {
503     $query = $this->getDatabase()->select($this->mapTableName(), 'map')
504       ->fields('map');
505     foreach ($this->destinationIdFields() as $field_name => $destination_id) {
506       $query->condition("map.$destination_id", $destination_id_values[$field_name], '=');
507     }
508     $result = $query->execute();
509     return $result->fetchAssoc();
510   }
511
512   /**
513    * {@inheritdoc}
514    */
515   public function getRowsNeedingUpdate($count) {
516     $rows = [];
517     $result = $this->getDatabase()->select($this->mapTableName(), 'map')
518       ->fields('map')
519       ->condition('source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE)
520       ->range(0, $count)
521       ->execute();
522     foreach ($result as $row) {
523       $rows[] = $row;
524     }
525     return $rows;
526   }
527
528   /**
529    * {@inheritdoc}
530    */
531   public function lookupSourceId(array $destination_id_values) {
532     $source_id_fields = $this->sourceIdFields();
533     $query = $this->getDatabase()->select($this->mapTableName(), 'map');
534     foreach ($source_id_fields as $source_field_name => $idmap_field_name) {
535       $query->addField('map', $idmap_field_name, $source_field_name);
536     }
537     foreach ($this->destinationIdFields() as $field_name => $destination_id) {
538       $query->condition("map.$destination_id", $destination_id_values[$field_name], '=');
539     }
540     $result = $query->execute();
541     return $result->fetchAssoc() ?: [];
542   }
543
544   /**
545    * {@inheritdoc}
546    */
547   public function lookupDestinationId(array $source_id_values) {
548     $results = $this->lookupDestinationIds($source_id_values);
549     return $results ? reset($results) : [];
550   }
551
552   /**
553    * {@inheritdoc}
554    */
555   public function lookupDestinationIds(array $source_id_values) {
556     if (empty($source_id_values)) {
557       return [];
558     }
559
560     // Canonicalize the keys into a hash of DB-field => value.
561     $is_associative = !isset($source_id_values[0]);
562     $conditions = [];
563     foreach ($this->sourceIdFields() as $field_name => $db_field) {
564       if ($is_associative) {
565         // Ensure to handle array elements with a NULL value.
566         if (array_key_exists($field_name, $source_id_values)) {
567           // Associative $source_id_values can have fields out of order.
568           if (isset($source_id_values[$field_name])) {
569             // Only add a condition if the value is not NULL.
570             $conditions[$db_field] = $source_id_values[$field_name];
571           }
572           unset($source_id_values[$field_name]);
573         }
574       }
575       else {
576         // For non-associative $source_id_values, we assume they're the first
577         // few fields.
578         if (empty($source_id_values)) {
579           break;
580         }
581         $conditions[$db_field] = array_shift($source_id_values);
582       }
583     }
584
585     if (!empty($source_id_values)) {
586       $var_dump = var_export($source_id_values, TRUE);
587       throw new MigrateException(sprintf("Extra unknown items in source IDs: %s", $var_dump));
588     }
589
590     $query = $this->getDatabase()->select($this->mapTableName(), 'map')
591       ->fields('map', $this->destinationIdFields());
592     if (count($this->sourceIdFields()) === count($conditions)) {
593       // Optimization: Use the primary key.
594       $query->condition(self::SOURCE_IDS_HASH, $this->getSourceIdsHash(array_values($conditions)));
595     }
596     else {
597       foreach ($conditions as $db_field => $value) {
598         $query->condition($db_field, $value);
599       }
600     }
601
602     return $query->execute()->fetchAll(\PDO::FETCH_NUM);
603   }
604
605   /**
606    * {@inheritdoc}
607    */
608   public function saveIdMapping(Row $row, array $destination_id_values, $source_row_status = MigrateIdMapInterface::STATUS_IMPORTED, $rollback_action = MigrateIdMapInterface::ROLLBACK_DELETE) {
609     // Construct the source key.
610     $source_id_values = $row->getSourceIdValues();
611     // Construct the source key and initialize to empty variable keys.
612     $fields = [];
613     foreach ($this->sourceIdFields() as $field_name => $key_name) {
614       // A NULL key value is usually an indication of a problem.
615       if (!isset($source_id_values[$field_name])) {
616         $this->message->display($this->t(
617           'Did not save to map table due to NULL value for key field @field',
618           ['@field' => $field_name]), 'error');
619         return;
620       }
621       $fields[$key_name] = $source_id_values[$field_name];
622     }
623
624     if (!$fields) {
625       return;
626     }
627
628     $fields += [
629       'source_row_status' => (int) $source_row_status,
630       'rollback_action' => (int) $rollback_action,
631       'hash' => $row->getHash(),
632     ];
633     $count = 0;
634     foreach ($destination_id_values as $dest_id) {
635       $fields['destid' . ++$count] = $dest_id;
636     }
637     if ($count && $count != count($this->destinationIdFields())) {
638       $this->message->display(t('Could not save to map table due to missing destination id values'), 'error');
639       return;
640     }
641     if ($this->migration->getTrackLastImported()) {
642       $fields['last_imported'] = time();
643     }
644     $keys = [static::SOURCE_IDS_HASH => $this->getSourceIdsHash($source_id_values)];
645     // Notify anyone listening of the map row we're about to save.
646     $this->eventDispatcher->dispatch(MigrateEvents::MAP_SAVE, new MigrateMapSaveEvent($this, $fields));
647     $this->getDatabase()->merge($this->mapTableName())
648       ->key($keys)
649       ->fields($fields)
650       ->execute();
651   }
652
653   /**
654    * {@inheritdoc}
655    */
656   public function saveMessage(array $source_id_values, $message, $level = MigrationInterface::MESSAGE_ERROR) {
657     foreach ($this->sourceIdFields() as $field_name => $source_id) {
658       // If any key value is not set, we can't save.
659       if (!isset($source_id_values[$field_name])) {
660         return;
661       }
662     }
663     $fields[static::SOURCE_IDS_HASH] = $this->getSourceIdsHash($source_id_values);
664     $fields['level'] = $level;
665     $fields['message'] = $message;
666     $this->getDatabase()->insert($this->messageTableName())
667       ->fields($fields)
668       ->execute();
669
670     // Notify anyone listening of the message we've saved.
671     $this->eventDispatcher->dispatch(MigrateEvents::IDMAP_MESSAGE,
672       new MigrateIdMapMessageEvent($this->migration, $source_id_values, $message, $level));
673   }
674
675   /**
676    * {@inheritdoc}
677    */
678   public function getMessageIterator(array $source_id_values = [], $level = NULL) {
679     $query = $this->getDatabase()->select($this->messageTableName(), 'msg')
680       ->fields('msg');
681     if ($source_id_values) {
682       $query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
683     }
684
685     if ($level) {
686       $query->condition('level', $level);
687     }
688     return $query->execute();
689   }
690
691   /**
692    * {@inheritdoc}
693    */
694   public function prepareUpdate() {
695     $this->getDatabase()->update($this->mapTableName())
696       ->fields(['source_row_status' => MigrateIdMapInterface::STATUS_NEEDS_UPDATE])
697       ->execute();
698   }
699
700   /**
701    * {@inheritdoc}
702    */
703   public function processedCount() {
704     return $this->countHelper(NULL, $this->mapTableName());
705   }
706
707   /**
708    * {@inheritdoc}
709    */
710   public function importedCount() {
711     return $this->countHelper([
712       MigrateIdMapInterface::STATUS_IMPORTED,
713       MigrateIdMapInterface::STATUS_NEEDS_UPDATE,
714     ]);
715   }
716
717   /**
718    * {@inheritdoc}
719    */
720   public function updateCount() {
721     return $this->countHelper(MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
722   }
723
724   /**
725    * {@inheritdoc}
726    */
727   public function errorCount() {
728     return $this->countHelper(MigrateIdMapInterface::STATUS_FAILED);
729   }
730
731   /**
732    * {@inheritdoc}
733    */
734   public function messageCount() {
735     return $this->countHelper(NULL, $this->messageTableName());
736   }
737
738   /**
739    * Counts records in a table.
740    *
741    * @param int|array $status
742    *   (optional) Status code(s) to filter the source_row_status column.
743    * @param string $table
744    *   (optional) The table to work. Defaults to NULL.
745    *
746    * @return int
747    *   The number of records.
748    */
749   protected function countHelper($status = NULL, $table = NULL) {
750     // Use database directly to avoid creating tables.
751     $query = $this->database->select($table ?: $this->mapTableName());
752     if (isset($status)) {
753       $query->condition('source_row_status', $status, is_array($status) ? 'IN' : '=');
754     }
755     try {
756       $count = (int) $query->countQuery()->execute()->fetchField();
757     }
758     catch (DatabaseException $e) {
759       // The table does not exist, therefore there are no records.
760       $count = 0;
761     }
762     return $count;
763   }
764
765   /**
766    * {@inheritdoc}
767    */
768   public function delete(array $source_id_values, $messages_only = FALSE) {
769     if (empty($source_id_values)) {
770       throw new MigrateException('Without source identifier values it is impossible to find the row to delete.');
771     }
772
773     if (!$messages_only) {
774       $map_query = $this->getDatabase()->delete($this->mapTableName());
775       $map_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
776       // Notify anyone listening of the map row we're about to delete.
777       $this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
778       $map_query->execute();
779     }
780     $message_query = $this->getDatabase()->delete($this->messageTableName());
781     $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
782     $message_query->execute();
783   }
784
785   /**
786    * {@inheritdoc}
787    */
788   public function deleteDestination(array $destination_id_values) {
789     $map_query = $this->getDatabase()->delete($this->mapTableName());
790     $message_query = $this->getDatabase()->delete($this->messageTableName());
791     $source_id_values = $this->lookupSourceId($destination_id_values);
792     if (!empty($source_id_values)) {
793       foreach ($this->destinationIdFields() as $field_name => $destination_id) {
794         $map_query->condition($destination_id, $destination_id_values[$field_name]);
795       }
796       // Notify anyone listening of the map row we're about to delete.
797       $this->eventDispatcher->dispatch(MigrateEvents::MAP_DELETE, new MigrateMapDeleteEvent($this, $source_id_values));
798       $map_query->execute();
799
800       $message_query->condition(static::SOURCE_IDS_HASH, $this->getSourceIdsHash($source_id_values));
801       $message_query->execute();
802     }
803   }
804
805   /**
806    * {@inheritdoc}
807    */
808   public function setUpdate(array $source_id_values) {
809     if (empty($source_id_values)) {
810       throw new MigrateException('No source identifiers provided to update.');
811     }
812     $query = $this->getDatabase()
813       ->update($this->mapTableName())
814       ->fields(['source_row_status' => MigrateIdMapInterface::STATUS_NEEDS_UPDATE]);
815
816     foreach ($this->sourceIdFields() as $field_name => $source_id) {
817       $query->condition($source_id, $source_id_values[$field_name]);
818     }
819     $query->execute();
820   }
821
822   /**
823    * {@inheritdoc}
824    */
825   public function clearMessages() {
826     $this->getDatabase()->truncate($this->messageTableName())->execute();
827   }
828
829   /**
830    * {@inheritdoc}
831    */
832   public function destroy() {
833     $this->getDatabase()->schema()->dropTable($this->mapTableName());
834     $this->getDatabase()->schema()->dropTable($this->messageTableName());
835   }
836
837   /**
838    * Implementation of \Iterator::rewind().
839    *
840    * This is called before beginning a foreach loop.
841    */
842   public function rewind() {
843     $this->currentRow = NULL;
844     $fields = [];
845     foreach ($this->sourceIdFields() as $field) {
846       $fields[] = $field;
847     }
848     foreach ($this->destinationIdFields() as $field) {
849       $fields[] = $field;
850     }
851     $this->result = $this->getDatabase()->select($this->mapTableName(), 'map')
852       ->fields('map', $fields)
853       ->orderBy('destid1')
854       ->execute();
855     $this->next();
856   }
857
858   /**
859    * Implementation of \Iterator::current().
860    *
861    * This is called when entering a loop iteration, returning the current row.
862    */
863   public function current() {
864     return $this->currentRow;
865   }
866
867   /**
868    * Implementation of \Iterator::key().
869    *
870    * This is called when entering a loop iteration, returning the key of the
871    * current row. It must be a scalar - we will serialize to fulfill the
872    * requirement, but using getCurrentKey() is preferable.
873    */
874   public function key() {
875     return serialize($this->currentKey);
876   }
877
878   /**
879    * {@inheritdoc}
880    */
881   public function currentDestination() {
882     if ($this->valid()) {
883       $result = [];
884       foreach ($this->destinationIdFields() as $destination_field_name => $idmap_field_name) {
885         if (!is_null($this->currentRow[$idmap_field_name])) {
886           $result[$destination_field_name] = $this->currentRow[$idmap_field_name];
887         }
888       }
889       return $result;
890     }
891     else {
892       return NULL;
893     }
894   }
895
896   /**
897    * @inheritdoc
898    */
899   public function currentSource() {
900     if ($this->valid()) {
901       $result = [];
902       foreach ($this->sourceIdFields() as $field_name => $source_id) {
903         $result[$field_name] = $this->currentKey[$source_id];
904       }
905       return $result;
906     }
907     else {
908       return NULL;
909     }
910   }
911
912   /**
913    * Implementation of \Iterator::next().
914    *
915    * This is called at the bottom of the loop implicitly, as well as explicitly
916    * from rewind().
917    */
918   public function next() {
919     $this->currentRow = $this->result->fetchAssoc();
920     $this->currentKey = [];
921     if ($this->currentRow) {
922       foreach ($this->sourceIdFields() as $map_field) {
923         $this->currentKey[$map_field] = $this->currentRow[$map_field];
924         // Leave only destination fields.
925         unset($this->currentRow[$map_field]);
926       }
927     }
928   }
929
930   /**
931    * Implementation of \Iterator::valid().
932    *
933    * This is called at the top of the loop, returning TRUE to process the loop
934    * and FALSE to terminate it.
935    */
936   public function valid() {
937     return $this->currentRow !== FALSE;
938   }
939
940   /**
941    * Returns the migration plugin manager.
942    *
943    * @todo Inject as a dependency in https://www.drupal.org/node/2919158.
944    *
945    * @return \Drupal\migrate\Plugin\MigrationPluginManagerInterface
946    *   The migration plugin manager.
947    */
948   protected function getMigrationPluginManager() {
949     return \Drupal::service('plugin.manager.migration');
950   }
951
952   /**
953    * {@inheritdoc}
954    */
955   public function getHighestId() {
956     array_filter(
957       $this->migration->getDestinationPlugin()->getIds(),
958       function (array $id) {
959         if ($id['type'] !== 'integer') {
960           throw new \LogicException('Cannot determine the highest migrated ID without an integer ID column');
961         }
962       }
963     );
964
965     // List of mapping tables to look in for the highest ID.
966     $map_tables = [
967       $this->migration->id() => $this->mapTableName(),
968     ];
969
970     // If there's a bundle, it means we have a derived migration and we need to
971     // find all the mapping tables from the related derived migrations.
972     if ($base_id = substr($this->migration->id(), 0, strpos($this->migration->id(), static::DERIVATIVE_SEPARATOR))) {
973       $migration_manager = $this->getMigrationPluginManager();
974       $migrations = $migration_manager->getDefinitions();
975       foreach ($migrations as $migration_id => $migration) {
976         if ($migration['id'] === $base_id) {
977           // Get this derived migration's mapping table and add it to the list
978           // of mapping tables to look in for the highest ID.
979           $stub = $migration_manager->createInstance($migration_id);
980           $map_tables[$migration_id] = $stub->getIdMap()->mapTableName();
981         }
982       }
983     }
984
985     // Get the highest id from the list of map tables.
986     $ids = [0];
987     foreach ($map_tables as $map_table) {
988       // If the map_table does not exist then continue on to the next map_table.
989       if (!$this->getDatabase()->schema()->tableExists($map_table)) {
990         continue;
991       }
992
993       $query = $this->getDatabase()->select($map_table, 'map')
994         ->fields('map', $this->destinationIdFields())
995         ->range(0, 1);
996       foreach (array_values($this->destinationIdFields()) as $order_field) {
997         $query->orderBy($order_field, 'DESC');
998       }
999       $ids[] = $query->execute()->fetchField();
1000     }
1001
1002     // Return the highest of all the mapped IDs.
1003     return (int) max($ids);
1004   }
1005
1006 }