08653b2332b6d0bdce60d079d42a1e27b468c401
[yaffs-website] / web / core / modules / migrate / src / Plugin / migrate / source / SqlBase.php
1 <?php
2
3 namespace Drupal\migrate\Plugin\migrate\source;
4
5 use Drupal\Core\Database\ConnectionNotDefinedException;
6 use Drupal\Core\Database\Database;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\State\StateInterface;
9 use Drupal\migrate\Exception\RequirementsException;
10 use Drupal\migrate\MigrateException;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Plugin\migrate\id_map\Sql;
13 use Drupal\migrate\Plugin\MigrateIdMapInterface;
14 use Drupal\migrate\Plugin\RequirementsInterface;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
16
17 /**
18  * Sources whose data may be fetched via a database connection.
19  *
20  * Available configuration keys:
21  * - database_state_key: (optional) Name of the state key which contains an
22  *   array with database connection information.
23  * - key: (optional) The database key name. Defaults to 'migrate'.
24  * - target: (optional) The database target name. Defaults to 'default'.
25  * - batch_size: (optional) Number of records to fetch from the database during
26  *   each batch. If omitted, all records are fetched in a single query.
27  * - ignore_map: (optional) Source data is joined to the map table by default.
28  *   If set to TRUE, the map table will not be joined.
29  *
30  * For other optional configuration keys inherited from the parent class, refer
31  * to \Drupal\migrate\Plugin\migrate\source\SourcePluginBase.
32  *
33  * About the source database determination:
34  * - If the source plugin configuration contains 'database_state_key', its value
35  *   is taken as the name of a state key which contains an array with the
36  *   database configuration.
37  * - Otherwise, if the source plugin configuration contains 'key', the database
38  *   configuration with that name is used.
39  * - If both 'database_state_key' and 'key' are omitted in the source plugin
40  *   configuration, the database connection named 'migrate' is used by default.
41  * - If all of the above steps fail, RequirementsException is thrown.
42  *
43  * Drupal Database API supports multiple database connections. The connection
44  * parameters are defined in $databases array in settings.php or
45  * settings.local.php. It is also possible to modify the $databases array in
46  * runtime. For example, Migrate Drupal, which provides the migrations from
47  * Drupal 6 / 7, asks for the source database connection parameters in the UI
48  * and then adds the $databases['migrate'] connection in runtime before the
49  * migrations are executed.
50  *
51  * As described above, the default source database is $databases['migrate']. If
52  * the source plugin needs another source connection, the database connection
53  * parameters should be added to the $databases array as, for instance,
54  * $databases['foo']. The source plugin can then use this connection by setting
55  * 'key' to 'foo' in its configuration.
56  *
57  * For a complete example on migrating data from an SQL source, refer to
58  * https://www.drupal.org/docs/8/api/migrate-api/migrating-data-from-sql-source
59  *
60  * @see https://www.drupal.org/docs/8/api/database-api
61  * @see \Drupal\migrate_drupal\Plugin\migrate\source\DrupalSqlBase
62  */
63 abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {
64
65   /**
66    * The query string.
67    *
68    * @var \Drupal\Core\Database\Query\SelectInterface
69    */
70   protected $query;
71
72   /**
73    * The database object.
74    *
75    * @var \Drupal\Core\Database\Connection
76    */
77   protected $database;
78
79   /**
80    * State service for retrieving database info.
81    *
82    * @var \Drupal\Core\State\StateInterface
83    */
84   protected $state;
85
86   /**
87    * The count of the number of batches run.
88    *
89    * @var int
90    */
91   protected $batch = 0;
92
93   /**
94    * Number of records to fetch from the database during each batch.
95    *
96    * A value of zero indicates no batching is to be done.
97    *
98    * @var int
99    */
100   protected $batchSize = 0;
101
102   /**
103    * {@inheritdoc}
104    */
105   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
106     parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
107     $this->state = $state;
108   }
109
110   /**
111    * {@inheritdoc}
112    */
113   public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
114     return new static(
115       $configuration,
116       $plugin_id,
117       $plugin_definition,
118       $migration,
119       $container->get('state')
120     );
121   }
122
123   /**
124    * Prints the query string when the object is used as a string.
125    *
126    * @return string
127    *   The query string.
128    */
129   public function __toString() {
130     return (string) $this->query();
131   }
132
133   /**
134    * Gets the database connection object.
135    *
136    * @return \Drupal\Core\Database\Connection
137    *   The database connection.
138    */
139   public function getDatabase() {
140     if (!isset($this->database)) {
141       // Look first for an explicit state key containing the configuration.
142       if (isset($this->configuration['database_state_key'])) {
143         $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
144       }
145       // Next, use explicit configuration in the source plugin.
146       elseif (isset($this->configuration['key'])) {
147         $this->database = $this->setUpDatabase($this->configuration);
148       }
149       // Next, try falling back to the global state key.
150       elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
151         $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
152       }
153       // If all else fails, let setUpDatabase() fallback to the 'migrate' key.
154       else {
155         $this->database = $this->setUpDatabase([]);
156       }
157     }
158     return $this->database;
159   }
160
161   /**
162    * Gets a connection to the referenced database.
163    *
164    * This method will add the database connection if necessary.
165    *
166    * @param array $database_info
167    *   Configuration for the source database connection. The keys are:
168    *    'key' - The database connection key.
169    *    'target' - The database connection target.
170    *    'database' - Database configuration array as accepted by
171    *      Database::addConnectionInfo.
172    *
173    * @return \Drupal\Core\Database\Connection
174    *   The connection to use for this plugin's queries.
175    *
176    * @throws \Drupal\migrate\Exception\RequirementsException
177    *   Thrown if no source database connection is configured.
178    */
179   protected function setUpDatabase(array $database_info) {
180     if (isset($database_info['key'])) {
181       $key = $database_info['key'];
182     }
183     else {
184       // If there is no explicit database configuration at all, fall back to a
185       // connection named 'migrate'.
186       $key = 'migrate';
187     }
188     if (isset($database_info['target'])) {
189       $target = $database_info['target'];
190     }
191     else {
192       $target = 'default';
193     }
194     if (isset($database_info['database'])) {
195       Database::addConnectionInfo($key, $target, $database_info['database']);
196     }
197     try {
198       $connection = Database::getConnection($target, $key);
199     }
200     catch (ConnectionNotDefinedException $e) {
201       // If we fell back to the magic 'migrate' connection and it doesn't exist,
202       // treat the lack of the connection as a RequirementsException.
203       if ($key == 'migrate') {
204         throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
205       }
206       else {
207         throw $e;
208       }
209     }
210     return $connection;
211   }
212
213   /**
214    * {@inheritdoc}
215    */
216   public function checkRequirements() {
217     if ($this->pluginDefinition['requirements_met'] === TRUE) {
218       $this->getDatabase();
219     }
220   }
221
222   /**
223    * Wrapper for database select.
224    */
225   protected function select($table, $alias = NULL, array $options = []) {
226     $options['fetch'] = \PDO::FETCH_ASSOC;
227     return $this->getDatabase()->select($table, $alias, $options);
228   }
229
230   /**
231    * Adds tags and metadata to the query.
232    *
233    * @return \Drupal\Core\Database\Query\SelectInterface
234    *   The query with additional tags and metadata.
235    */
236   protected function prepareQuery() {
237     $this->query = clone $this->query();
238     $this->query->addTag('migrate');
239     $this->query->addTag('migrate_' . $this->migration->id());
240     $this->query->addMetaData('migration', $this->migration);
241
242     return $this->query;
243   }
244
245   /**
246    * {@inheritdoc}
247    */
248   protected function initializeIterator() {
249     // Initialize the batch size.
250     if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
251       // Valid batch sizes are integers >= 0.
252       if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
253         $this->batchSize = $this->configuration['batch_size'];
254       }
255       else {
256         throw new MigrateException("batch_size must be greater than or equal to zero");
257       }
258     }
259
260     // If a batch has run the query is already setup.
261     if ($this->batch == 0) {
262       $this->prepareQuery();
263
264       // Get the key values, for potential use in joining to the map table.
265       $keys = [];
266
267       // The rules for determining what conditions to add to the query are as
268       // follows (applying first applicable rule):
269       // 1. If the map is joinable, join it. We will want to accept all rows
270       //    which are either not in the map, or marked in the map as NEEDS_UPDATE.
271       //    Note that if high water fields are in play, we want to accept all rows
272       //    above the high water mark in addition to those selected by the map
273       //    conditions, so we need to OR them together (but AND with any existing
274       //    conditions in the query). So, ultimately the SQL condition will look
275       //    like (original conditions) AND (map IS NULL OR map needs update
276       //      OR above high water).
277       $conditions = $this->query->orConditionGroup();
278       $condition_added = FALSE;
279       $added_fields = [];
280       if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
281         // Build the join to the map table. Because the source key could have
282         // multiple fields, we need to build things up.
283         $count = 1;
284         $map_join = '';
285         $delimiter = '';
286         foreach ($this->getIds() as $field_name => $field_schema) {
287           if (isset($field_schema['alias'])) {
288             $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
289           }
290           $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
291           $delimiter = ' AND ';
292         }
293
294         $alias = $this->query->leftJoin($this->migration->getIdMap()
295           ->getQualifiedMapTableName(), 'map', $map_join);
296         $conditions->isNull($alias . '.sourceid1');
297         $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
298         $condition_added = TRUE;
299
300         // And as long as we have the map table, add its data to the row.
301         $n = count($this->getIds());
302         for ($count = 1; $count <= $n; $count++) {
303           $map_key = 'sourceid' . $count;
304           $this->query->addField($alias, $map_key, "migrate_map_$map_key");
305           $added_fields[] = "$alias.$map_key";
306         }
307         if ($n = count($this->migration->getDestinationIds())) {
308           for ($count = 1; $count <= $n; $count++) {
309             $map_key = 'destid' . $count++;
310             $this->query->addField($alias, $map_key, "migrate_map_$map_key");
311             $added_fields[] = "$alias.$map_key";
312           }
313         }
314         $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
315         $added_fields[] = "$alias.source_row_status";
316       }
317       // 2. If we are using high water marks, also include rows above the mark.
318       //    But, include all rows if the high water mark is not set.
319       if ($this->getHighWaterProperty()) {
320         $high_water_field = $this->getHighWaterField();
321         $high_water = $this->getHighWater();
322         if ($high_water) {
323           $conditions->condition($high_water_field, $high_water, '>');
324           $condition_added = TRUE;
325         }
326         // Always sort by the high water field, to ensure that the first run
327         // (before we have a high water value) also has the results in a
328         // consistent order.
329         $this->query->orderBy($high_water_field);
330       }
331       if ($condition_added) {
332         $this->query->condition($conditions);
333       }
334       // If the query has a group by, our added fields need it too, to keep the
335       // query valid.
336       // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
337       $group_by = $this->query->getGroupBy();
338       if ($group_by && $added_fields) {
339         foreach ($added_fields as $added_field) {
340           $this->query->groupBy($added_field);
341         }
342       }
343     }
344
345     // Download data in batches for performance.
346     if (($this->batchSize > 0)) {
347       $this->query->range($this->batch * $this->batchSize, $this->batchSize);
348     }
349     return new \IteratorIterator($this->query->execute());
350   }
351
352   /**
353    * Position the iterator to the following row.
354    */
355   protected function fetchNextRow() {
356     $this->getIterator()->next();
357     // We might be out of data entirely, or just out of data in the current
358     // batch. Attempt to fetch the next batch and see.
359     if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
360       $this->fetchNextBatch();
361     }
362   }
363
364   /**
365    * Prepares query for the next set of data from the source database.
366    */
367   protected function fetchNextBatch() {
368     $this->batch++;
369     unset($this->iterator);
370     $this->getIterator()->rewind();
371   }
372
373   /**
374    * @return \Drupal\Core\Database\Query\SelectInterface
375    */
376   abstract public function query();
377
378   /**
379    * {@inheritdoc}
380    */
381   public function count($refresh = FALSE) {
382     return $this->query()->countQuery()->execute()->fetchField();
383   }
384
385   /**
386    * Checks if we can join against the map table.
387    *
388    * This function specifically catches issues when we're migrating with
389    * unique sets of credentials for the source and destination database.
390    *
391    * @return bool
392    *   TRUE if we can join against the map table otherwise FALSE.
393    */
394   protected function mapJoinable() {
395     if (!$this->getIds()) {
396       return FALSE;
397     }
398     // With batching, we want a later batch to return the same rows that would
399     // have been returned at the same point within a monolithic query. If we
400     // join to the map table, the first batch is writing to the map table and
401     // thus affecting the results of subsequent batches. To be safe, we avoid
402     // joining to the map table when batching.
403     if ($this->batchSize > 0) {
404       return FALSE;
405     }
406     $id_map = $this->migration->getIdMap();
407     if (!$id_map instanceof Sql) {
408       return FALSE;
409     }
410     $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
411     $source_database_options = $this->getDatabase()->getConnectionOptions();
412
413     // Special handling for sqlite which deals with files.
414     if ($id_map_database_options['driver'] === 'sqlite' &&
415       $source_database_options['driver'] === 'sqlite' &&
416       $id_map_database_options['database'] != $source_database_options['database']
417     ) {
418       return FALSE;
419     }
420
421     // FALSE if driver is PostgreSQL and database doesn't match.
422     if ($id_map_database_options['driver'] === 'pgsql' &&
423       $source_database_options['driver'] === 'pgsql' &&
424       $id_map_database_options['database'] != $source_database_options['database']
425       ) {
426       return FALSE;
427     }
428
429     foreach (['username', 'password', 'host', 'port', 'namespace', 'driver'] as $key) {
430       if (isset($source_database_options[$key])) {
431         if ($id_map_database_options[$key] != $source_database_options[$key]) {
432           return FALSE;
433         }
434       }
435     }
436     return TRUE;
437   }
438
439 }