If you want to search and analyze the production data in your ApsaraDB RDS for MySQL database by using Alibaba Cloud Elasticsearch, you can use Data Transmission Service (DTS) to synchronize the data to an Elasticsearch cluster in real time. The synchronization is implemented based on a real-time synchronization task. This topic describes how to create a real-time synchronization task to synchronize data from an ApsaraDB RDS for MySQL database to an Alibaba Cloud Elasticsearch cluster in real time. This topic also describes how to verify the synchronization results of full data and incremental data.
Background information
DTS is a data transmission service that integrates data migration, data subscription, and real-time data synchronization. For more information, see DTS. DTS supports synchronization of data changes generated by insert, delete, and update operations. For information about the versions of data sources from which DTS can synchronize data, see Overview of data synchronization scenarios.
You can use DTS to synchronize full or incremental data from MySQL to Elasticsearch. This solution is suitable for scenarios in which you have high requirements for the performance of real-time synchronization from a relational database or you need to synchronize full or incremental data from a relational database to an Alibaba Cloud Elasticsearch cluster.
Prerequisites
An ApsaraDB RDS for MySQL instance and an Alibaba Cloud Elasticsearch cluster are created. We recommend that you create the ApsaraDB RDS for MySQL instance and Elasticsearch cluster in the same virtual private cloud (VPC).
For information about how to create an ApsaraDB RDS for MySQL instance, see Create an ApsaraDB RDS for MySQL instance. In this example, an ApsaraDB RDS for MySQL instance that runs MySQL V5.7 is created.
For information about how to create an Alibaba Cloud Elasticsearch cluster, see Create an Alibaba Cloud Elasticsearch cluster. In this example, an Elasticsearch V6.7 cluster of the Standard Edition is created.
Limits
You cannot use DTS to synchronize data to an Alibaba Cloud Elasticsearch V7.16 or V8.X cluster.
DTS does not synchronize data changes generated by DDL operations. If a DDL operation is performed on a table in the source database during data synchronization, you must perform the following operations: Remove the table from the data synchronization task, remove the index for the table from the Elasticsearch cluster, and then add the table to the data synchronization task again. For more information, see Remove an object from a data synchronization task and Add an object to a data synchronization task.
Precautions
DTS uses read and write resources of the source and destination RDS instances during initial full data synchronization. This may increase the loads of the RDS instances. If the instance performance is unfavorable, the specification is low, or the data volume is large, database services may become unavailable. For example, DTS occupies a large amount of read and write resources in the following cases: a large number of slow SQL queries are performed on the source RDS instance, the tables have no primary keys, or a deadlock occurs in the destination RDS instance. Before data synchronization, evaluate the impact of data synchronization on the performance of the source and destination RDS instances. We recommend that you synchronize data during off-peak hours. For example, you can synchronize data when the CPU utilization of the source and destination RDS instances is less than 30%.
If you want to add columns to the source table, modify the mappings of the index that corresponds to the table. Then, perform the related DDL operation on the source table, pause the data synchronization task, and start the task again.
Procedure
Step 1: Make preparations
Enable the Auto Indexing feature for the Elasticsearch cluster.
For more information, see Configure the YML file. In this example, an Elasticsearch V6.7 cluster is used.
NoteTo ensure data security, Alibaba Cloud Elasticsearch disables Auto Indexing by default. When you use DTS to synchronize data to an Elasticsearch cluster, you must create indexes in the Elasticsearch cluster by submitting data instead of calling the create index API. Therefore, before you use DTS to synchronize data, you must enable Auto Indexing for the Elasticsearch cluster.
Create a database and a table. Then, insert data into the table.
You can use an ApsaraDB RDS database or a self-managed database that is created on your on-premises machine. In this example, an ApsaraDB RDS for MySQL database is used. For information about how to create an ApsaraDB RDS for MySQL database and create a table in the database, see General workflow to use ApsaraDB RDS for MySQL.
ImportantWe recommend that you use an ApsaraDB RDS for MySQL instance in the same region as the Elasticsearch cluster. If the instance and cluster reside in different regions, network connectivity between them cannot be ensured.
In this example, the following ApsaraDB RDS for MySQL database is created, and the following SQL statements are executed to create a table in the database and insert data into the table.
Create an ApsaraDB RDS for MySQL database
test_logstash
Create a table in the database and insert data into the table
-- create table CREATE TABLE `es_test` ( `id` bigint(32) NOT NULL, `name` varchar(32) NULL, `age` bigint(32) NULL, `hobby` varchar(32) NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; -- insert data INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');
Step 2: Configure a data synchronization task
Go to the Data Synchronization Tasks page.
Log on to the Data Management (DMS) console.
In the top navigation bar, click DTS.
In the left-side navigation pane, choose .
NoteOperations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.
You can also go to the Data Synchronization Tasks page of the new DTS console.
On the Data Synchronization page, click Create Task. Then, create and configure a data synchronization task as prompted.
You must configure the source database and destination database, the objects from which you want to synchronize data, the field mappings, the advanced settings, and the names of fields that are synchronized. For more information, see the topics in Synchronize data from a MySQL database and the Synchronize data from a PolarDB-X 2.0 instance to an Elasticsearch cluster topic. The following table describes the parameters that are configured in this example.
Configure the source database and destination database.
Category
Parameter
Description
None
Task Name
DTS automatically generates a task name. You do not need to use a unique task name.
We recommend that you use an informative name for easy identification.
Source Database
Database Type
The source database type. Select MySQL.
Access Method
The access method of the source database. Select Alibaba Cloud Instance.
Instance Region
The region in which the source MySQL database resides.
Replicate Data Across Alibaba Cloud Accounts
In this example, No is selected because data is synchronized within the same Alibaba Cloud account.
RDS Instance ID
The ID of the source ApsaraDB RDS for MySQL instance.
Database Account
The account that is used to connect to the ApsaraDB RDS for MySQL database from which you want to synchronize data. The account must be granted read permissions on the database.
Database Password
The password that is used to connect to the ApsaraDB RDS for MySQL database. Enter the password that corresponds to the username specified by Database Account.
Encryption
Specifies whether to encrypt the connection to the source ApsaraDB RDS for MySQL database. Valid values: Non-encrypted and SSL-encrypted. If you select SSL-encrypted, you must enable SSL encryption for the ApsaraDB RDS for MySQL instance before you configure the data synchronization task. For more information, see Configure the SSL encryption feature.
Destination Database
Database Type
The destination database type. Select Elasticsearch.
Access Method
The access method of the destination Elasticsearch cluster. The value of this parameter is fixed as Alibaba Cloud Instance.
Instance Region
The region in which the destination Elasticsearch cluster resides. We recommend that you select the region in which the source ApsaraDB RDS for MySQL database resides.
Instance ID
The ID of the destination Elasticsearch cluster.
Database Account
The username that is used to connect to the destination Elasticsearch cluster. The default username is elastic.
Database Password
The password that is used to connect to the destination Elasticsearch cluster. Enter the password that corresponds to the username specified by Database Account.
Configure the objects from which you want to synchronize data.
Parameter
Description
Synchronization Types
The synchronization types. By default, Incremental Data Synchronization is selected. You must also select Schema Synchronization and Full Data Synchronization. After the precheck is complete, DTS synchronizes the historical data of the selected objects from the source database to the destination database. The historical data is the basis for subsequent incremental synchronization.
Processing Mode of Conflicting Tables
Precheck and Report Errors: checks whether the destination database contains tables that have the same names as tables in the source database. If the source and destination databases do not contain tables that have identical table names, the precheck is passed. Otherwise, an error is returned during the precheck and the data synchronization task cannot be started.
NoteIf the source and destination databases contain identical table names and the tables in the destination database cannot be deleted or renamed, you can use the object name mapping feature to rename the tables that are synchronized to the destination database. For more information, see Map object names.
Ignore Errors and Proceed: skips the precheck for identical table names in the source and destination databases.
WarningIf you select Ignore Errors and Proceed, data inconsistency may occur and your business may be exposed to the following potential risks:
If the source and destination databases have the same schema and a data record in the destination database has the same primary key value or unique key value as a data record in the source database:
During full data synchronization, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.
During incremental data synchronization, DTS synchronizes the data record to the destination database. The existing data record in the destination database is overwritten.
If the source and destination databases have different schemas, data may fail to be initialized. In this case, only some columns are synchronized, or the data synchronization task fails. Proceed with caution.
Index Name
Table Name
If you select Table Name, the system creates an index that has the same name as the table in the Elasticsearch cluster. In this example, the index name is es_test.
Database Name_Table Name
If you select Database Name_Table Name, the system creates an index whose name is in the format of Database name_Table name. In this example, the index name is test_logstash_es_test.
Source Objects
Select one or more objects from the Source Objects section and click the icon to move the objects to the Selected Objects section.
Selected Objects
To rename an object that you want to synchronize to the destination instance, right-click the object in the Selected Objects section. For more information, see the Map the name of a single object section of the "Map object names" topic.
To rename multiple objects at a time, click Batch Edit in the upper-right corner of the Selected Objects section. For more information, see the Map multiple object names at a time section of the "Map object names" topic.
Configure field mappings and change the names of fields that are synchronized.
If you want to change the names of fields that are synchronized, you can click the name of the table to which the fields belong in the Selected Objects list, and specify the name of the index and the name of the index type in the Elasticsearch cluster for the table. After the configuration is complete, click OK. The following table describes the parameters that are configured in this example. You can retain the default values for other parameters that are not described in the following table. For more information, see Map the name of a single object.
Parameter
Description
Index Name
The name of the destination index. You can specify a name based on your business requirements. For information about the concept of an index, see Terms.
ImportantThe name that you enter must be unique in the Elasticsearch cluster. Otherwise, the
index already exists
error message appears.Type Name
The name of the destination index type. You can specify a name based on your business requirements. For information about the concept of an index type, see Terms.
Filter Conditions
You can specify SQL conditions to filter data. Only data that meets the specified conditions can be synchronized to the destination Elasticsearch cluster. For more information, see Set filter conditions.
Parameter
Select the column parameter and parameter value. For more information, see Mapping parameters.
ImportantIf you set index to false for a field that you added, the field cannot be queried. For more information, see index.
Configure advanced parameters.
In this example, default values are retained for the advanced parameters. The following table describes the advanced parameters.
Parameter
Description
Select the dedicated cluster used to schedule the task
By default, DTS schedules the data synchronization task by using a public cluster. You do not need to specify a node in a public cluster for task scheduling.
If you want to use a dedicated cluster, you must purchase one first, and make sure that the dedicated cluster can successfully run before you can select it.
You can purchase a dedicated cluster to run DTS tasks such as data migration tasks, data synchronization tasks, and subscription tasks. Dedicated clusters can isolate resources for your DTS tasks from the resources used by DTS instances of other users. Compared with DTS instances in public clusters, DTS instances in dedicated clusters have higher stability and better performance.
Set Alerts
Specifies whether to configure alerting for the data synchronization task. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:
No: does not configure alerting.
Yes: configures alerting. In this case, you must also configure the alert threshold and alert contacts. For more information, see Configure monitoring and alerting.
Retry Time for Failed Connections
The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set this parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
NoteIf you specify different retry time ranges for multiple data synchronization tasks that have the same source or destination database, the shortest retry time range takes precedence.
When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.
Shard Configuration
The number of primary shards and the number of replica shards for each primary shard. You can specify the numbers based on the default shard configuration of the destination Elasticsearch cluster. By default, an index in an Elasticsearch cluster whose version is earlier than V7.X has five primary shards and one replica shard for each primary shard. By default, an index in an Elasticsearch cluster whose version is V7.X or later has one primary shard and one replica shard for the primary shard.
ImportantThe number of shards and the size of each shard are important factors that affect the stability and performance of an Elasticsearch cluster. You must appropriately configure shards for indexes in an Elasticsearch cluster. For information about how to plan shards for indexes, see Evaluate specifications and storage capacity.
String Index
The method used to index strings that are synchronized to the destination Elasticsearch cluster. Valid values:
analyzed: indicates that the strings are analyzed before indexing. You must select an analyzer. For information about the analyzer types, reference Built-in analyzer reference.
not analyzed: indicates that the strings are indexed with the original values.
no: indicates that the strings are not indexed.
Time Zone
If you synchronize data of a date or time data type such as DATETIME or TIMESTAMP, you can select a time zone.
NoteIf the date and time data type in the destination Elasticsearch cluster do not need a time zone, you must specify the document type for the date and time data types.
DOCID
The default value of this parameter is the primary key of the table in the destination Elasticsearch cluster. If the table does not have a primary key, the value of this parameter is the ID column that is automatically generated by Elasticsearch.
Configure ETL
The ETL configuration feature does not support changes on table schemas in a destination. If changes on table schemas are required, you must make the changes in the destination before the data synchronization task starts.
If you change the existing ETL-related configurations, the data synchronization task may be interrupted, or historical data may be changed. Make sure that your business is not affected if you change the existing ETL-related configurations.
If you change the existing ETL-related configurations and restart the data synchronization task, the new configurations take effect only for incremental data that is generated after the restart. The new configurations do not take effect for historical data.
For more information about how to make ETL-related configurations, see Configure ETL in a data migration or data synchronization task.
Configure the routing policy and the value of the _id field in the Elasticsearch cluster for the table from which you want to synchronize data.
The following table describes the parameters that are configured in this example.
Parameter
Description
Set _routing
Specifies whether to store a document on a specific shard of the destination Elasticsearch cluster. For more information, see _routing. Valid values:
Yes: You can specify custom columns for routing.
No: The _id value is used for routing.
NoteIf the version of the destination Elasticsearch cluster is 7.x, you must select No.
Value of _id
Primary key column
Composite primary keys are merged into one column.
Business key
If you select a business key, you must specify the business key column.
After the configuration is complete, save the data synchronization task, perform a pre-check on the task, and purchase a DTS instance to start the data synchronization task.
After the DTS instance is purchased, the data synchronization task starts to run. You can view the data synchronization progress of the task on the Data Synchronization Tasks page. After the full data in the ApsaraDB RDS for MySQL database is synchronized, the synchronization of the incremental data in the database starts. During the synchronization of the incremental data, you can view the full data that is synchronized to the Elasticsearch cluster.
ImportantThe data types supported by the ApsaraDB RDS for MySQL instance and those supported by the Elasticsearch cluster are different. Therefore, the data types in the ApsaraDB RDS for MySQL instance and those in the Elasticsearch cluster do not have one-to-one mapping relationships. When DTS synchronizes data from a source to a destination, DTS establishes mappings between source fields and destination fields based on the data types supported by the destination. For more information, see Data type mappings for schema synchronization.
Step 3: Verify the data synchronization results
- Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted. For more information about how to log on to the Kibana console, see Log on to the Kibana console.Note In this example, an Elasticsearch V6.7.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.
- In the left-side navigation pane of the page that appears, click Dev Tools.
On the Console tab, run the following command to query the full data that is synchronized to the destination Elasticsearch cluster:
GET /es_test_index/es_test_type/_search
If the data synchronization is successful, the result shown in the following figure is returned.
Insert a data record into the source table and view the synchronization result of the data record in the destination Elasticsearch cluster.
In this example, the following SQL statement is executed to insert a data record:
INSERT INTO `test_logstash`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');
If the synchronization of the data record is successful, the result shown in the following figure is returned.