3 namespace Drupal\migrate\Plugin\migrate\source;
5 use Drupal\Core\Database\ConnectionNotDefinedException;
6 use Drupal\Core\Database\Database;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\State\StateInterface;
9 use Drupal\migrate\Exception\RequirementsException;
10 use Drupal\migrate\MigrateException;
11 use Drupal\migrate\Plugin\MigrationInterface;
12 use Drupal\migrate\Plugin\migrate\id_map\Sql;
13 use Drupal\migrate\Plugin\MigrateIdMapInterface;
14 use Drupal\migrate\Plugin\RequirementsInterface;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
18 * Sources whose data may be fetched via a database connection.
20 * Available configuration keys:
21 * - database_state_key: (optional) Name of the state key which contains an
22 * array with database connection information.
23 * - key: (optional) The database key name. Defaults to 'migrate'.
24 * - target: (optional) The database target name. Defaults to 'default'.
25 * - batch_size: (optional) Number of records to fetch from the database during
26 * each batch. If omitted, all records are fetched in a single query.
27 * - ignore_map: (optional) Source data is joined to the map table by default to
28 * improve migration performance. If set to TRUE, the map table will not be
29 * joined. Using expressions in the query may result in column aliases in the
30 * JOIN clause which would be invalid SQL. If you run into this, set
33 * For other optional configuration keys inherited from the parent class, refer
34 * to \Drupal\migrate\Plugin\migrate\source\SourcePluginBase.
36 * About the source database determination:
37 * - If the source plugin configuration contains 'database_state_key', its value
38 * is taken as the name of a state key which contains an array with the
39 * database configuration.
40 * - Otherwise, if the source plugin configuration contains 'key', the database
41 * configuration with that name is used.
42 * - If both 'database_state_key' and 'key' are omitted in the source plugin
43 * configuration, the database connection named 'migrate' is used by default.
44 * - If all of the above steps fail, RequirementsException is thrown.
46 * Drupal Database API supports multiple database connections. The connection
47 * parameters are defined in $databases array in settings.php or
48 * settings.local.php. It is also possible to modify the $databases array in
49 * runtime. For example, Migrate Drupal, which provides the migrations from
50 * Drupal 6 / 7, asks for the source database connection parameters in the UI
51 * and then adds the $databases['migrate'] connection in runtime before the
52 * migrations are executed.
54 * As described above, the default source database is $databases['migrate']. If
55 * the source plugin needs another source connection, the database connection
56 * parameters should be added to the $databases array as, for instance,
57 * $databases['foo']. The source plugin can then use this connection by setting
58 * 'key' to 'foo' in its configuration.
60 * For a complete example on migrating data from an SQL source, refer to
61 * https://www.drupal.org/docs/8/api/migrate-api/migrating-data-from-sql-source
63 * @see https://www.drupal.org/docs/8/api/database-api
64 * @see \Drupal\migrate_drupal\Plugin\migrate\source\DrupalSqlBase
66 abstract class SqlBase extends SourcePluginBase implements ContainerFactoryPluginInterface, RequirementsInterface {
71 * @var \Drupal\Core\Database\Query\SelectInterface
76 * The database object.
78 * @var \Drupal\Core\Database\Connection
83 * State service for retrieving database info.
85 * @var \Drupal\Core\State\StateInterface
90 * The count of the number of batches run.
97 * Number of records to fetch from the database during each batch.
99 * A value of zero indicates no batching is to be done.
103 protected $batchSize = 0;
108 public function __construct(array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration, StateInterface $state) {
109 parent::__construct($configuration, $plugin_id, $plugin_definition, $migration);
110 $this->state = $state;
116 public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition, MigrationInterface $migration = NULL) {
122 $container->get('state')
127 * Prints the query string when the object is used as a string.
132 public function __toString() {
133 return (string) $this->query();
137 * Gets the database connection object.
139 * @return \Drupal\Core\Database\Connection
140 * The database connection.
142 public function getDatabase() {
143 if (!isset($this->database)) {
144 // Look first for an explicit state key containing the configuration.
145 if (isset($this->configuration['database_state_key'])) {
146 $this->database = $this->setUpDatabase($this->state->get($this->configuration['database_state_key']));
148 // Next, use explicit configuration in the source plugin.
149 elseif (isset($this->configuration['key'])) {
150 $this->database = $this->setUpDatabase($this->configuration);
152 // Next, try falling back to the global state key.
153 elseif (($fallback_state_key = $this->state->get('migrate.fallback_state_key'))) {
154 $this->database = $this->setUpDatabase($this->state->get($fallback_state_key));
156 // If all else fails, let setUpDatabase() fallback to the 'migrate' key.
158 $this->database = $this->setUpDatabase([]);
161 return $this->database;
165 * Gets a connection to the referenced database.
167 * This method will add the database connection if necessary.
169 * @param array $database_info
170 * Configuration for the source database connection. The keys are:
171 * 'key' - The database connection key.
172 * 'target' - The database connection target.
173 * 'database' - Database configuration array as accepted by
174 * Database::addConnectionInfo.
176 * @return \Drupal\Core\Database\Connection
177 * The connection to use for this plugin's queries.
179 * @throws \Drupal\migrate\Exception\RequirementsException
180 * Thrown if no source database connection is configured.
182 protected function setUpDatabase(array $database_info) {
183 if (isset($database_info['key'])) {
184 $key = $database_info['key'];
187 // If there is no explicit database configuration at all, fall back to a
188 // connection named 'migrate'.
191 if (isset($database_info['target'])) {
192 $target = $database_info['target'];
197 if (isset($database_info['database'])) {
198 Database::addConnectionInfo($key, $target, $database_info['database']);
201 $connection = Database::getConnection($target, $key);
203 catch (ConnectionNotDefinedException $e) {
204 // If we fell back to the magic 'migrate' connection and it doesn't exist,
205 // treat the lack of the connection as a RequirementsException.
206 if ($key == 'migrate') {
207 throw new RequirementsException("No database connection configured for source plugin " . $this->pluginId, [], 0, $e);
219 public function checkRequirements() {
220 if ($this->pluginDefinition['requirements_met'] === TRUE) {
221 $this->getDatabase();
226 * Wrapper for database select.
228 protected function select($table, $alias = NULL, array $options = []) {
229 $options['fetch'] = \PDO::FETCH_ASSOC;
230 return $this->getDatabase()->select($table, $alias, $options);
234 * Adds tags and metadata to the query.
236 * @return \Drupal\Core\Database\Query\SelectInterface
237 * The query with additional tags and metadata.
239 protected function prepareQuery() {
240 $this->query = clone $this->query();
241 $this->query->addTag('migrate');
242 $this->query->addTag('migrate_' . $this->migration->id());
243 $this->query->addMetaData('migration', $this->migration);
251 protected function initializeIterator() {
252 // Initialize the batch size.
253 if ($this->batchSize == 0 && isset($this->configuration['batch_size'])) {
254 // Valid batch sizes are integers >= 0.
255 if (is_int($this->configuration['batch_size']) && ($this->configuration['batch_size']) >= 0) {
256 $this->batchSize = $this->configuration['batch_size'];
259 throw new MigrateException("batch_size must be greater than or equal to zero");
263 // If a batch has run the query is already setup.
264 if ($this->batch == 0) {
265 $this->prepareQuery();
267 // Get the key values, for potential use in joining to the map table.
270 // The rules for determining what conditions to add to the query are as
271 // follows (applying first applicable rule):
272 // 1. If the map is joinable, join it. We will want to accept all rows
273 // which are either not in the map, or marked in the map as NEEDS_UPDATE.
274 // Note that if high water fields are in play, we want to accept all rows
275 // above the high water mark in addition to those selected by the map
276 // conditions, so we need to OR them together (but AND with any existing
277 // conditions in the query). So, ultimately the SQL condition will look
278 // like (original conditions) AND (map IS NULL OR map needs update
279 // OR above high water).
280 $conditions = $this->query->orConditionGroup();
281 $condition_added = FALSE;
283 if (empty($this->configuration['ignore_map']) && $this->mapJoinable()) {
284 // Build the join to the map table. Because the source key could have
285 // multiple fields, we need to build things up.
289 foreach ($this->getIds() as $field_name => $field_schema) {
290 if (isset($field_schema['alias'])) {
291 $field_name = $field_schema['alias'] . '.' . $this->query->escapeField($field_name);
293 $map_join .= "$delimiter$field_name = map.sourceid" . $count++;
294 $delimiter = ' AND ';
297 $alias = $this->query->leftJoin($this->migration->getIdMap()
298 ->getQualifiedMapTableName(), 'map', $map_join);
299 $conditions->isNull($alias . '.sourceid1');
300 $conditions->condition($alias . '.source_row_status', MigrateIdMapInterface::STATUS_NEEDS_UPDATE);
301 $condition_added = TRUE;
303 // And as long as we have the map table, add its data to the row.
304 $n = count($this->getIds());
305 for ($count = 1; $count <= $n; $count++) {
306 $map_key = 'sourceid' . $count;
307 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
308 $added_fields[] = "$alias.$map_key";
310 if ($n = count($this->migration->getDestinationIds())) {
311 for ($count = 1; $count <= $n; $count++) {
312 $map_key = 'destid' . $count++;
313 $this->query->addField($alias, $map_key, "migrate_map_$map_key");
314 $added_fields[] = "$alias.$map_key";
317 $this->query->addField($alias, 'source_row_status', 'migrate_map_source_row_status');
318 $added_fields[] = "$alias.source_row_status";
320 // 2. If we are using high water marks, also include rows above the mark.
321 // But, include all rows if the high water mark is not set.
322 if ($this->getHighWaterProperty()) {
323 $high_water_field = $this->getHighWaterField();
324 $high_water = $this->getHighWater();
326 $conditions->condition($high_water_field, $high_water, '>');
327 $condition_added = TRUE;
329 // Always sort by the high water field, to ensure that the first run
330 // (before we have a high water value) also has the results in a
332 $this->query->orderBy($high_water_field);
334 if ($condition_added) {
335 $this->query->condition($conditions);
337 // If the query has a group by, our added fields need it too, to keep the
339 // @see https://dev.mysql.com/doc/refman/5.7/en/group-by-handling.html
340 $group_by = $this->query->getGroupBy();
341 if ($group_by && $added_fields) {
342 foreach ($added_fields as $added_field) {
343 $this->query->groupBy($added_field);
348 // Download data in batches for performance.
349 if (($this->batchSize > 0)) {
350 $this->query->range($this->batch * $this->batchSize, $this->batchSize);
352 $statement = $this->query->execute();
353 $statement->setFetchMode(\PDO::FETCH_ASSOC);
354 return new \IteratorIterator($statement);
358 * Position the iterator to the following row.
360 protected function fetchNextRow() {
361 $this->getIterator()->next();
362 // We might be out of data entirely, or just out of data in the current
363 // batch. Attempt to fetch the next batch and see.
364 if ($this->batchSize > 0 && !$this->getIterator()->valid()) {
365 $this->fetchNextBatch();
370 * Prepares query for the next set of data from the source database.
372 protected function fetchNextBatch() {
374 unset($this->iterator);
375 $this->getIterator()->rewind();
379 * @return \Drupal\Core\Database\Query\SelectInterface
381 abstract public function query();
386 public function count($refresh = FALSE) {
387 return (int) $this->query()->countQuery()->execute()->fetchField();
391 * Checks if we can join against the map table.
393 * This function specifically catches issues when we're migrating with
394 * unique sets of credentials for the source and destination database.
397 * TRUE if we can join against the map table otherwise FALSE.
399 protected function mapJoinable() {
400 if (!$this->getIds()) {
403 // With batching, we want a later batch to return the same rows that would
404 // have been returned at the same point within a monolithic query. If we
405 // join to the map table, the first batch is writing to the map table and
406 // thus affecting the results of subsequent batches. To be safe, we avoid
407 // joining to the map table when batching.
408 if ($this->batchSize > 0) {
411 $id_map = $this->migration->getIdMap();
412 if (!$id_map instanceof Sql) {
415 $id_map_database_options = $id_map->getDatabase()->getConnectionOptions();
416 $source_database_options = $this->getDatabase()->getConnectionOptions();
418 // Special handling for sqlite which deals with files.
419 if ($id_map_database_options['driver'] === 'sqlite' &&
420 $source_database_options['driver'] === 'sqlite' &&
421 $id_map_database_options['database'] != $source_database_options['database']
426 // FALSE if driver is PostgreSQL and database doesn't match.
427 if ($id_map_database_options['driver'] === 'pgsql' &&
428 $source_database_options['driver'] === 'pgsql' &&
429 $id_map_database_options['database'] != $source_database_options['database']
434 foreach (['username', 'password', 'host', 'port', 'namespace', 'driver'] as $key) {
435 if (isset($source_database_options[$key])) {
436 if ($id_map_database_options[$key] != $source_database_options[$key]) {