Pull merge.
[yaffs-website] / web / core / modules / migrate / src / Plugin / migrate / source / SqlBase.php
1 <?php
2
3 namespace Drupal\migrate\Plugin\migrate\source;
4
5 use Drupal\Core\Database\ConnectionNotDefinedException;
6 use Drupal\Core\Database\Database;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\State\StateInterface;
9 use Drupal\migrate\Exception\RequirementsException;
10 use Drupal\migrate\MigrateException;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Plugin\migrate\id_map\Sql;
13 use Drupal\migrate\Plugin\MigrateIdMapInterface;
14 use Drupal\migrate\Plugin\RequirementsInterface;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
16
17 /**
18  * Sources whose data may be fetched via a database connection.
19  *
20  * Available configuration keys:
21  * - database_state_key: (optional) Name of the state key which contains an
22  *   array with database connection information.
23  * - key: (optional) The database key name. Defaults to 'migrate'.
24  * - target: (optional) The database target name. Defaults to 'default'.
25  * - batch_size: (optional) Number of records to fetch from the database during
26  *   each batch. If omitted, all records are fetched in a single query.
27  * - ignore_map: (optional) Source data is joined to the map table by default to
28  *   improve migration performance. If set to TRUE, the map table will not be
29  *   joined. Using expressions in the query may result in column aliases in the
30  *   JOIN clause which would be invalid SQL. If you run into this, set
31  *   ignore_map to TRUE.
32  *
33  * For other optional configuration keys inherited from the parent class, refer
34  * to \Drupal\migrate\Plugin\migrate\source\SourcePluginBase.
35  *
36  * About the source database determination:
37  * - If the source plugin configuration contains 'database_state_key', its value
38  *   is taken as the name of a state key which contains an array with the
39  *   database configuration.
40  * - Otherwise, if the source plugin configuration contains 'key', the database
41  *   configuration with that name is used.
42  * - If both 'database_state_key' and 'key' are omitted in the source plugin
43  *   configuration, the database connection named 'migrate' is used by default.
44  * - If all of the above steps fail, RequirementsException is thrown.
45  *
46  * Drupal Database API supports multiple database connections. The connection
47  * parameters are defined in $databases array in settings.php or
48  * settings.local.php. It is also possible to modify the $databases array in
49  * runtime. For example, Migrate Drupal, which provides the migrations from
50  * Drupal 6 / 7, asks for the source database connection parameters in the UI
51  * and then adds the $databases['migrate'] connection in runtime before the
52  * migrations are executed.
53  *
54  * As described above, the default source database is $databases['migrate']. If
55  * the source plugin needs another source connection, the database connection
56  * parameters should be added to the $databases array as, for instance,
57  * $databases['foo']. The source plugin can then use this connection by setting
58  * 'key' to 'foo' in its configuration.
59  *
60  * For a complete example on migrating data from an SQL source, refer to
61  * https://www.drupal.org/docs/8/api/migrate-api/migrating-data-from-sql-source
62  *
63  * @see https://www.drupal.org/docs/8/api/database-api
64  * @see \Drupal\migrate_drupal\Plugin\migrate\source\DrupalSqlBase
65  */
66 abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {
67
68   /**
69    * The query string.
70    *
71    * @var \Drupal\Core\Database\Query\SelectInterface
72    */
73   protected $query;
74
75   /**
76    * The database object.
77    *
78    * @var \Drupal\Core\Database\Connection
79    */
80   protected $database;
81
82   /**
83    * State service for retrieving database info.
84    *
85    * @var \Drupal\Core\State\StateInterface
86    */
87   protected $state;
88
89   /**
90    * The count of the number of batches run.
91    *
92    * @var int
93    */
94   protected $batch = 0;
95
96   /**
97    * Number of records to fetch from the database during each batch.
98    *
99    * A value of zero indicates no batching is to be done.
100    *
101    * @var int
102    */
103   protected $batchSize = 0;
104
105   /**
106    * {@inheritdoc}
107    */
108   public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
109     parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
110     $this->state = $state;
111     // If we are using high water, but haven't yet set a high water mark, skip
112     // joining the map table, as we want to get all available records.
113     if ($this->getHighWaterProperty() && $this->getHighWater() === NULL) {
114       $this->configuration['ignore_map'] = TRUE;
115     }
116   }
117
118   /**
119    * {@inheritdoc}
120    */
121   public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
122     return new static(
123       $configuration,
124       $plugin_id,
125       $plugin_definition,
126       $migration,
127       $container->get('state')
128     );
129   }
130
131   /**
132    * Prints the query string when the object is used as a string.
133    *
134    * @return string
135    *   The query string.
136    */
137   public function __toString() {
138     return (string) $this->query();
139   }
140
141   /**
142    * Gets the database connection object.
143    *
144    * @return \Drupal\Core\Database\Connection
145    *   The database connection.
146    */
147   public function getDatabase() {
148     if (!isset($this->database)) {
149       // Look first for an explicit state key containing the configuration.
150       if (isset($this->configuration['database_state_key'])) {
151         $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
152       }
153       // Next, use explicit configuration in the source plugin.
154       elseif (isset($this->configuration['key'])) {
155         $this->database = $this->setUpDatabase($this->configuration);
156       }
157       // Next, try falling back to the global state key.
158       elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
159         $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
160       }
161       // If all else fails, let setUpDatabase() fallback to the 'migrate' key.
162       else {
163         $this->database = $this->setUpDatabase([]);
164       }
165     }
166     return $this->database;
167   }
168
169   /**
170    * Gets a connection to the referenced database.
171    *
172    * This method will add the database connection if necessary.
173    *
174    * @param array $database_info
175    *   Configuration for the source database connection. The keys are:
176    *    'key' - The database connection key.
177    *    'target' - The database connection target.
178    *    'database' - Database configuration array as accepted by
179    *      Database::addConnectionInfo.
180    *
181    * @return \Drupal\Core\Database\Connection
182    *   The connection to use for this plugin's queries.
183    *
184    * @throws \Drupal\migrate\Exception\RequirementsException
185    *   Thrown if no source database connection is configured.
186    */
187   protected function setUpDatabase(array $database_info) {
188     if (isset($database_info['key'])) {
189       $key = $database_info['key'];
190     }
191     else {
192       // If there is no explicit database configuration at all, fall back to a
193       // connection named 'migrate'.
194       $key = 'migrate';
195     }
196     if (isset($database_info['target'])) {
197       $target = $database_info['target'];
198     }
199     else {
200       $target = 'default';
201     }
202     if (isset($database_info['database'])) {
203       Database::addConnectionInfo($key, $target, $database_info['database']);
204     }
205     try {
206       $connection = Database::getConnection($target, $key);
207     }
208     catch (ConnectionNotDefinedException $e) {
209       // If we fell back to the magic 'migrate' connection and it doesn't exist,
210       // treat the lack of the connection as a RequirementsException.
211       if ($key == 'migrate') {
212         throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
213       }
214       else {
215         throw $e;
216       }
217     }
218     return $connection;
219   }
220
221   /**
222    * {@inheritdoc}
223    */
224   public function checkRequirements() {
225     if ($this->pluginDefinition['requirements_met'] === TRUE) {
226       $this->getDatabase();
227     }
228   }
229
230   /**
231    * Wrapper for database select.
232    */
233   protected function select($table, $alias = NULL, array $options = []) {
234     $options['fetch'] = \PDO::FETCH_ASSOC;
235     return $this->getDatabase()->select($table, $alias, $options);
236   }
237
238   /**
239    * Adds tags and metadata to the query.
240    *
241    * @return \Drupal\Core\Database\Query\SelectInterface
242    *   The query with additional tags and metadata.
243    */
244   protected function prepareQuery() {
245     $this->query = clone $this->query();
246     $this->query->addTag('migrate');
247     $this->query->addTag('migrate_' . $this->migration->id());
248     $this->query->addMetaData('migration', $this->migration);
249
250     return $this->query;
251   }
252
253   /**
254    * {@inheritdoc}
255    */
256   protected function initializeIterator() {
257     // Initialize the batch size.
258     if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
259       // Valid batch sizes are integers >= 0.
260       if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
261         $this->batchSize = $this->configuration['batch_size'];
262       }
263       else {
264         throw new MigrateException("batch_size must be greater than or equal to zero");
265       }
266     }
267
268     // If a batch has run the query is already setup.
269     if ($this->batch == 0) {
270       $this->prepareQuery();
271
272       // Get the key values, for potential use in joining to the map table.
273       $keys = [];
274
275       // The rules for determining what conditions to add to the query are as
276       // follows (applying first applicable rule):
277       // 1. If the map is joinable, join it. We will want to accept all rows
278       //    which are either not in the map, or marked in the map as NEEDS_UPDATE.
279       //    Note that if high water fields are in play, we want to accept all rows
280       //    above the high water mark in addition to those selected by the map
281       //    conditions, so we need to OR them together (but AND with any existing
282       //    conditions in the query). So, ultimately the SQL condition will look
283       //    like (original conditions) AND (map IS NULL OR map needs update
284       //      OR above high water).
285       $conditions = $this->query->orConditionGroup();
286       $condition_added = FALSE;
287       $added_fields = [];
288       if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
289         // Build the join to the map table. Because the source key could have
290         // multiple fields, we need to build things up.
291         $count = 1;
292         $map_join = '';
293         $delimiter = '';
294         foreach ($this->getIds() as $field_name => $field_schema) {
295           if (isset($field_schema['alias'])) {
296             $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
297           }
298           $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
299           $delimiter = ' AND ';
300         }
301
302         $alias = $this->query->leftJoin($this->migration->getIdMap()
303           ->getQualifiedMapTableName(), 'map', $map_join);
304         $conditions->isNull($alias . '.sourceid1');
305         $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
306         $condition_added = TRUE;
307
308         // And as long as we have the map table, add its data to the row.
309         $n = count($this->getIds());
310         for ($count = 1; $count <= $n; $count++) {
311           $map_key = 'sourceid' . $count;
312           $this->query->addField($alias, $map_key, "migrate_map_$map_key");
313           $added_fields[] = "$alias.$map_key";
314         }
315         if ($n = count($this->migration->getDestinationIds())) {
316           for ($count = 1; $count <= $n; $count++) {
317             $map_key = 'destid' . $count++;
318             $this->query->addField($alias, $map_key, "migrate_map_$map_key");
319             $added_fields[] = "$alias.$map_key";
320           }
321         }
322         $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
323         $added_fields[] = "$alias.source_row_status";
324       }
325       // 2. If we are using high water marks, also include rows above the mark.
326       //    But, include all rows if the high water mark is not set.
327       if ($this->getHighWaterProperty()) {
328         $high_water_field = $this->getHighWaterField();
329         $high_water = $this->getHighWater();
330         // We check against NULL because 0 is an acceptable value for the high
331         // water mark.
332         if ($high_water !== NULL) {
333           $conditions->condition($high_water_field, $high_water, '>');
334           $condition_added = TRUE;
335         }
336         // Always sort by the high water field, to ensure that the first run
337         // (before we have a high water value) also has the results in a
338         // consistent order.
339         $this->query->orderBy($high_water_field);
340       }
341       if ($condition_added) {
342         $this->query->condition($conditions);
343       }
344       // If the query has a group by, our added fields need it too, to keep the
345       // query valid.
346       // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
347       $group_by = $this->query->getGroupBy();
348       if ($group_by && $added_fields) {
349         foreach ($added_fields as $added_field) {
350           $this->query->groupBy($added_field);
351         }
352       }
353     }
354
355     // Download data in batches for performance.
356     if (($this->batchSize > 0)) {
357       $this->query->range($this->batch * $this->batchSize, $this->batchSize);
358     }
359     $statement = $this->query->execute();
360     $statement->setFetchMode(\PDO::FETCH_ASSOC);
361     return new \IteratorIterator($statement);
362   }
363
364   /**
365    * Position the iterator to the following row.
366    */
367   protected function fetchNextRow() {
368     $this->getIterator()->next();
369     // We might be out of data entirely, or just out of data in the current
370     // batch. Attempt to fetch the next batch and see.
371     if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
372       $this->fetchNextBatch();
373     }
374   }
375
376   /**
377    * Prepares query for the next set of data from the source database.
378    */
379   protected function fetchNextBatch() {
380     $this->batch++;
381     unset($this->iterator);
382     $this->getIterator()->rewind();
383   }
384
385   /**
386    * @return \Drupal\Core\Database\Query\SelectInterface
387    */
388   abstract public function query();
389
390   /**
391    * {@inheritdoc}
392    */
393   public function count($refresh = FALSE) {
394     return (int) $this->query()->countQuery()->execute()->fetchField();
395   }
396
397   /**
398    * Checks if we can join against the map table.
399    *
400    * This function specifically catches issues when we're migrating with
401    * unique sets of credentials for the source and destination database.
402    *
403    * @return bool
404    *   TRUE if we can join against the map table otherwise FALSE.
405    */
406   protected function mapJoinable() {
407     if (!$this->getIds()) {
408       return FALSE;
409     }
410     // With batching, we want a later batch to return the same rows that would
411     // have been returned at the same point within a monolithic query. If we
412     // join to the map table, the first batch is writing to the map table and
413     // thus affecting the results of subsequent batches. To be safe, we avoid
414     // joining to the map table when batching.
415     if ($this->batchSize > 0) {
416       return FALSE;
417     }
418     $id_map = $this->migration->getIdMap();
419     if (!$id_map instanceof Sql) {
420       return FALSE;
421     }
422     $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
423     $source_database_options = $this->getDatabase()->getConnectionOptions();
424
425     // Special handling for sqlite which deals with files.
426     if ($id_map_database_options['driver'] === 'sqlite' &&
427       $source_database_options['driver'] === 'sqlite' &&
428       $id_map_database_options['database'] != $source_database_options['database']
429     ) {
430       return FALSE;
431     }
432
433     // FALSE if driver is PostgreSQL and database doesn't match.
434     if ($id_map_database_options['driver'] === 'pgsql' &&
435       $source_database_options['driver'] === 'pgsql' &&
436       $id_map_database_options['database'] != $source_database_options['database']
437       ) {
438       return FALSE;
439     }
440
441     foreach (['username', 'password', 'host', 'port', 'namespace', 'driver'] as $key) {
442       if (isset($source_database_options[$key])) {
443         if ($id_map_database_options[$key] != $source_database_options[$key]) {
444           return FALSE;
445         }
446       }
447     }
448     return TRUE;
449   }
450
451 }