3 namespace Drupal\migrate\Plugin\migrate\source;
5 use Drupal\Core\Database\ConnectionNotDefinedException;
6 use Drupal\Core\Database\Database;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\State\StateInterface;
9 use Drupal\migrate\Exception\RequirementsException;
10 use Drupal\migrate\MigrateException;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Plugin\migrate\id_map\Sql;
13 use Drupal\migrate\Plugin\MigrateIdMapInterface;
14 use Drupal\migrate\Plugin\RequirementsInterface;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
18 * Sources whose data may be fetched via a database connection.
20 * Available configuration keys:
21 * - database_state_key: (optional) Name of the state key which contains an
22 * array with database connection information.
23 * - key: (optional) The database key name. Defaults to 'migrate'.
24 * - target: (optional) The database target name. Defaults to 'default'.
25 * - batch_size: (optional) Number of records to fetch from the database during
26 * each batch. If omitted, all records are fetched in a single query.
27 * - ignore_map: (optional) Source data is joined to the map table by default.
28 * If set to TRUE, the map table will not be joined.
30 * For other optional configuration keys inherited from the parent class, refer
31 * to \Drupal\migrate\Plugin\migrate\source\SourcePluginBase.
33 * About the source database determination:
34 * - If the source plugin configuration contains 'database_state_key', its value
35 * is taken as the name of a state key which contains an array with the
36 * database configuration.
37 * - Otherwise, if the source plugin configuration contains 'key', the database
38 * configuration with that name is used.
39 * - If both 'database_state_key' and 'key' are omitted in the source plugin
40 * configuration, the database connection named 'migrate' is used by default.
41 * - If all of the above steps fail, RequirementsException is thrown.
43 * Drupal Database API supports multiple database connections. The connection
44 * parameters are defined in $databases array in settings.php or
45 * settings.local.php. It is also possible to modify the $databases array in
46 * runtime. For example, Migrate Drupal, which provides the migrations from
47 * Drupal 6 / 7, asks for the source database connection parameters in the UI
48 * and then adds the $databases['migrate'] connection in runtime before the
49 * migrations are executed.
51 * As described above, the default source database is $databases['migrate']. If
52 * the source plugin needs another source connection, the database connection
53 * parameters should be added to the $databases array as, for instance,
54 * $databases['foo']. The source plugin can then use this connection by setting
55 * 'key' to 'foo' in its configuration.
57 * For a complete example on migrating data from an SQL source, refer to
58 * https://www.drupal.org/docs/8/api/migrate-api/migrating-data-from-sql-source
60 * @see https://www.drupal.org/docs/8/api/database-api
61 * @see \Drupal\migrate_drupal\Plugin\migrate\source\DrupalSqlBase
63 abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {
68 * @var \Drupal\Core\Database\Query\SelectInterface
73 * The database object.
75 * @var \Drupal\Core\Database\Connection
80 * State service for retrieving database info.
82 * @var \Drupal\Core\State\StateInterface
87 * The count of the number of batches run.
94 * Number of records to fetch from the database during each batch.
96 * A value of zero indicates no batching is to be done.
100 protected $batchSize = 0;
105 public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
106 parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
107 $this->state = $state;
113 public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
119 $container->get('state')
124 * Prints the query string when the object is used as a string.
129 public function __toString() {
130 return (string) $this->query();
134 * Gets the database connection object.
136 * @return \Drupal\Core\Database\Connection
137 * The database connection.
139 public function getDatabase() {
140 if (!isset($this->database)) {
141 // Look first for an explicit state key containing the configuration.
142 if (isset($this->configuration['database_state_key'])) {
143 $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
145 // Next, use explicit configuration in the source plugin.
146 elseif (isset($this->configuration['key'])) {
147 $this->database = $this->setUpDatabase($this->configuration);
149 // Next, try falling back to the global state key.
150 elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
151 $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
153 // If all else fails, let setUpDatabase() fallback to the 'migrate' key.
155 $this->database = $this->setUpDatabase([]);
158 return $this->database;
162 * Gets a connection to the referenced database.
164 * This method will add the database connection if necessary.
166 * @param array $database_info
167 * Configuration for the source database connection. The keys are:
168 * 'key' - The database connection key.
169 * 'target' - The database connection target.
170 * 'database' - Database configuration array as accepted by
171 * Database::addConnectionInfo.
173 * @return \Drupal\Core\Database\Connection
174 * The connection to use for this plugin's queries.
176 * @throws \Drupal\migrate\Exception\RequirementsException
177 * Thrown if no source database connection is configured.
179 protected function setUpDatabase(array $database_info) {
180 if (isset($database_info['key'])) {
181 $key = $database_info['key'];
184 // If there is no explicit database configuration at all, fall back to a
185 // connection named 'migrate'.
188 if (isset($database_info['target'])) {
189 $target = $database_info['target'];
194 if (isset($database_info['database'])) {
195 Database::addConnectionInfo($key, $target, $database_info['database']);
198 $connection = Database::getConnection($target, $key);
200 catch (ConnectionNotDefinedException $e) {
201 // If we fell back to the magic 'migrate' connection and it doesn't exist,
202 // treat the lack of the connection as a RequirementsException.
203 if ($key == 'migrate') {
204 throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
216 public function checkRequirements() {
217 if ($this->pluginDefinition['requirements_met'] === TRUE) {
218 $this->getDatabase();
223 * Wrapper for database select.
225 protected function select($table, $alias = NULL, array $options = []) {
226 $options['fetch'] = \PDO::FETCH_ASSOC;
227 return $this->getDatabase()->select($table, $alias, $options);
231 * Adds tags and metadata to the query.
233 * @return \Drupal\Core\Database\Query\SelectInterface
234 * The query with additional tags and metadata.
236 protected function prepareQuery() {
237 $this->query = clone $this->query();
238 $this->query->addTag('migrate');
239 $this->query->addTag('migrate_' . $this->migration->id());
240 $this->query->addMetaData('migration', $this->migration);
248 protected function initializeIterator() {
249 // Initialize the batch size.
250 if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
251 // Valid batch sizes are integers >= 0.
252 if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
253 $this->batchSize = $this->configuration['batch_size'];
256 throw new MigrateException("batch_size must be greater than or equal to zero");
260 // If a batch has run the query is already setup.
261 if ($this->batch == 0) {
262 $this->prepareQuery();
264 // Get the key values, for potential use in joining to the map table.
267 // The rules for determining what conditions to add to the query are as
268 // follows (applying first applicable rule):
269 // 1. If the map is joinable, join it. We will want to accept all rows
270 // which are either not in the map, or marked in the map as NEEDS_UPDATE.
271 // Note that if high water fields are in play, we want to accept all rows
272 // above the high water mark in addition to those selected by the map
273 // conditions, so we need to OR them together (but AND with any existing
274 // conditions in the query). So, ultimately the SQL condition will look
275 // like (original conditions) AND (map IS NULL OR map needs update
276 // OR above high water).
277 $conditions = $this->query->orConditionGroup();
278 $condition_added = FALSE;
280 if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
281 // Build the join to the map table. Because the source key could have
282 // multiple fields, we need to build things up.
286 foreach ($this->getIds() as $field_name => $field_schema) {
287 if (isset($field_schema['alias'])) {
288 $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
290 $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
291 $delimiter = ' AND ';
294 $alias = $this->query->leftJoin($this->migration->getIdMap()
295 ->getQualifiedMapTableName(), 'map', $map_join);
296 $conditions->isNull($alias . '.sourceid1');
297 $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
298 $condition_added = TRUE;
300 // And as long as we have the map table, add its data to the row.
301 $n = count($this->getIds());
302 for ($count = 1; $count <= $n; $count++) {
303 $map_key = 'sourceid' . $count;
304 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
305 $added_fields[] = "$alias.$map_key";
307 if ($n = count($this->migration->getDestinationIds())) {
308 for ($count = 1; $count <= $n; $count++) {
309 $map_key = 'destid' . $count++;
310 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
311 $added_fields[] = "$alias.$map_key";
314 $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
315 $added_fields[] = "$alias.source_row_status";
317 // 2. If we are using high water marks, also include rows above the mark.
318 // But, include all rows if the high water mark is not set.
319 if ($this->getHighWaterProperty()) {
320 $high_water_field = $this->getHighWaterField();
321 $high_water = $this->getHighWater();
323 $conditions->condition($high_water_field, $high_water, '>');
324 $condition_added = TRUE;
326 // Always sort by the high water field, to ensure that the first run
327 // (before we have a high water value) also has the results in a
329 $this->query->orderBy($high_water_field);
331 if ($condition_added) {
332 $this->query->condition($conditions);
334 // If the query has a group by, our added fields need it too, to keep the
336 // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
337 $group_by = $this->query->getGroupBy();
338 if ($group_by && $added_fields) {
339 foreach ($added_fields as $added_field) {
340 $this->query->groupBy($added_field);
345 // Download data in batches for performance.
346 if (($this->batchSize > 0)) {
347 $this->query->range($this->batch * $this->batchSize, $this->batchSize);
349 return new \IteratorIterator($this->query->execute());
353 * Position the iterator to the following row.
355 protected function fetchNextRow() {
356 $this->getIterator()->next();
357 // We might be out of data entirely, or just out of data in the current
358 // batch. Attempt to fetch the next batch and see.
359 if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
360 $this->fetchNextBatch();
365 * Prepares query for the next set of data from the source database.
367 protected function fetchNextBatch() {
369 unset($this->iterator);
370 $this->getIterator()->rewind();
374 * @return \Drupal\Core\Database\Query\SelectInterface
376 abstract public function query();
381 public function count($refresh = FALSE) {
382 return $this->query()->countQuery()->execute()->fetchField();
386 * Checks if we can join against the map table.
388 * This function specifically catches issues when we're migrating with
389 * unique sets of credentials for the source and destination database.
392 * TRUE if we can join against the map table otherwise FALSE.
394 protected function mapJoinable() {
395 if (!$this->getIds()) {
398 // With batching, we want a later batch to return the same rows that would
399 // have been returned at the same point within a monolithic query. If we
400 // join to the map table, the first batch is writing to the map table and
401 // thus affecting the results of subsequent batches. To be safe, we avoid
402 // joining to the map table when batching.
403 if ($this->batchSize > 0) {
406 $id_map = $this->migration->getIdMap();
407 if (!$id_map instanceof Sql) {
410 $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
411 $source_database_options = $this->getDatabase()->getConnectionOptions();
413 // Special handling for sqlite which deals with files.
414 if ($id_map_database_options['driver'] === 'sqlite' &&
415 $source_database_options['driver'] === 'sqlite' &&
416 $id_map_database_options['database'] != $source_database_options['database']
421 // FALSE if driver is PostgreSQL and database doesn't match.
422 if ($id_map_database_options['driver'] === 'pgsql' &&
423 $source_database_options['driver'] === 'pgsql' &&
424 $id_map_database_options['database'] != $source_database_options['database']
429 foreach (['username', 'password', 'host', 'port', 'namespace', 'driver'] as $key) {
430 if (isset($source_database_options[$key])) {
431 if ($id_map_database_options[$key] != $source_database_options[$key]) {