All Products
Search
Document Center

Elasticsearch:Use DTS to synchronize data from a PolarDB for MySQL database to an Alibaba Cloud Elasticsearch cluster

更新時間:Apr 16, 2024

If you encounter slow queries when you use a PolarDB for MySQL database, you can use Data Transmission Service (DTS) to synchronize production data from the database to an Alibaba Cloud Elasticsearch cluster in real time. Then, you can search for and analyze the synchronized data in the cluster. This topic describes how to synchronize data from a PolarDB for MySQL database to an Elasticsearch cluster.

Background information

The following cloud services are used:

  • DTS is a data transmission service that integrates data migration, data subscription, and real-time data synchronization. For more information, see DTS. You can use DTS to synchronize the following SQL statements: INSERT, DELETE, and UPDATE.

    Important

    When you synchronize data, you must select a data source and a version that are supported by DTS. For more information, see Overview of data synchronization scenarios.

  • PolarDB is a next-generation relational database service developed by Alibaba Cloud. It is compatible with MySQL, PostgreSQL, and Oracle database engines. A PolarDB cluster can provide a maximum of 100 TB of storage space and can be scaled to a maximum of 16 nodes. PolarDB provides superior performance in storage and computing to meet diverse requirements of enterprises. For more information, see PolarDB for MySQL overview.

  • Elasticsearch is a Lucene-based, distributed, real-time search and analytics engine. It allows you to store, query, and analyze large amounts of datasets in near real time. In most cases, it is used as a basic engine or technology to accommodate complex queries and high application performance. For more information, see What is Alibaba Cloud Elasticsearch?

This topic can be used to guide the real-time synchronization of data in relational databases.

Precautions

  • DTS uses read and write resources of the source and destination RDS instances during initial full data synchronization. This may increase the loads of the RDS instances. If the instance performance is unfavorable, the specification is low, or the data volume is large, database services may become unavailable. For example, DTS occupies a large amount of read and write resources in the following cases: a large number of slow SQL queries are performed on the source RDS instance, the tables have no primary keys, or a deadlock occurs in the destination RDS instance. Before data synchronization, evaluate the impact of data synchronization on the performance of the source and destination RDS instances. We recommend that you synchronize data during off-peak hours. For example, you can synchronize data when the CPU utilization of the source and destination RDS instances is less than 30%.

  • If you want to add columns to the source table, modify the mappings of the index that corresponds to the table. Then, perform the related DDL operation on the source table, pause the data synchronization task, and start the task again.

Limits

  • You cannot use DTS to synchronize data to an Alibaba Cloud Elasticsearch V7.16 or V8.X cluster.

  • DTS does not synchronize data changes generated by DDL operations. If a DDL operation is performed on a table in the source database during data synchronization, you must perform the following operations: Remove the table from the data synchronization task, remove the index for the table from the Elasticsearch cluster, and then add the table to the data synchronization task again. For more information, see Remove an object from a data synchronization task and Add an object to a data synchronization task.

Procedure

  1. Step 1: Make preparations

    Create an Elasticsearch cluster and a PolarDB for MySQL cluster and prepare test data.

  2. Step 2: Configure a data synchronization task

    Use DTS to create and start a real-time task to synchronize data from the PolarDB for MySQL database to the Elasticsearch cluster.

  3. Step 3: View the data synchronization result

    Log on to the Kibana console of the Elasticsearch cluster and query the synchronized data.

  4. Step 4: Verify incremental data synchronization

    Add data to the PolarDB for MySQL database and check whether the data is synchronized to the Elasticsearch cluster.

Step 1: Make preparations

  1. Enable the Auto Indexing feature for the Elasticsearch cluster.

    For more information, see Configure the YML file. In this example, an Elasticsearch V6.7 cluster is used.

    Note

    To ensure data security, Alibaba Cloud Elasticsearch disables Auto Indexing by default. When you use DTS to synchronize data to an Elasticsearch cluster, you must create indexes in the Elasticsearch cluster by submitting data instead of calling the create index API. Therefore, before you use DTS to synchronize data, you must enable Auto Indexing for the Elasticsearch cluster.

  2. Create a PolarDB for MySQL cluster and enable binary logging.

  3. Create a PolarDB for MySQL database and a table, and insert test data into the table.

    For more information, see Database Management.

    • Table creation statement

      CREATE TABLE `product` (
          `id` bigint(32) NOT NULL AUTO_INCREMENT,
          `name` varchar(32) NULL,
          `price` varchar(32) NULL,
          `code` varchar(32) NULL,
          `color` varchar(32) NULL,
          PRIMARY KEY (`id`)
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
    • Test data

      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (1,'mobile phone A','2000','amp','golden');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (2,'mobile phone B','2200','bmp','white');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (3,'mobile phone C','2600','cmp','black');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (4,'mobile phone D','2700','dmp','red');
      INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (5,'mobile phone E','2800','emp','silvery');

Step 2: Configure a data synchronization task

  1. Go to the Data Synchronization Tasks page.

    1. Log on to the Data Management (DMS) console.

    2. In the top navigation bar, click DTS.

    3. In the left-side navigation pane, choose DTS (DTS) > Data Synchronization.

    Note
  2. On the Data Synchronization tab, click Create Task. In the Create Data Synchronization Task wizard, create and configure a data synchronization task as prompted.

    You need to configure the source and destination, task objects, mapped fields, advanced settings, database fields, and table fields. The following configurations are used in this example. For more information, see Synchronize data from a PolarDB for MySQL cluster and Synchronize data from a PolarDB-X 2.0 instance to an Elasticsearch cluster.

    1. In the Configure Source and Destination Databases step, configure the source and destination.

      Section

      Parameter

      Description

      None

      Task Name

      • DTS automatically generates a task name. You do not need to use a unique task name.

      • We recommend that you use an informative name for easy identification.

      Source Database

      Database Type

      Select PolarDB for MySQL.

      Access Method

      The value of this parameter is fixed as Alibaba Cloud Instance.

      Instance Region

      The region in which the source PolarDB for MySQL cluster resides.

      Replicate Data Across Alibaba Cloud Accounts

      In this example, No is selected because data is synchronized within the same Alibaba Cloud account.

      PolarDB Cluster ID

      The ID of the source PolarDB for MySQL cluster.

      Database Account

      The account of the source PolarDB for MySQL database. The account must have the read permissions on the database.

      Database Password

      The password for the account of the source PolarDB for MySQL database.

      Destination Database

      Database Type

      Select Elasticsearch.

      Access Method

      The value of this parameter is fixed as Alibaba Cloud Instance.

      Instance Region

      The region in which the destination Elasticsearch cluster resides. We recommend that you select the region in which the source PolarDB for MySQL cluster resides.

      Instance ID

      The ID of the destination Elasticsearch cluster.

      Database Account

      The username that is used to connect to the destination Elasticsearch cluster. The default username is elastic.

      Database Password

      The password that is used to connect to the destination Elasticsearch cluster. Enter the password that corresponds to the username specified by Database Account.

    2. In the Configure Task Objects step, configure objects for the task.

      Parameter or operation

      Description

      Synchronization Types

      The synchronization types. By default, Incremental Data Synchronization is selected. You must also select Schema Synchronization and Full Data Synchronization. After the precheck is complete, DTS synchronizes the historical data of the selected objects from the source database to the destination database. The historical data is the basis for subsequent incremental synchronization.

      Processing Mode of Conflicting Tables

      • Precheck and Report Errors: checks whether the destination database contains tables that have the same names as tables in the source database. If the source and destination databases do not contain tables that have identical table names, the precheck is passed. Otherwise, an error is returned during the precheck and the data synchronization task cannot be started.

        Note

        If the source and destination databases contain identical table names and the tables in the destination database cannot be deleted or renamed, you can use the object name mapping feature to rename the tables that are synchronized to the destination database. For more information, see Map object names.

      • Ignore Errors and Proceed: skips the precheck for identical table names in the source and destination databases.

        Warning

        If you select Ignore Errors and Proceed, data inconsistency may occur and your business may be exposed to the following potential risks:

        • If the source and destination databases have the same schema and a data record in the destination database has the same primary key value or unique key value as a data record in the source database:

          • During full data synchronization, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.

          • During incremental data synchronization, DTS synchronizes the data record to the destination database. The existing data record in the destination database is overwritten.

        • If the source and destination databases have different schemas, data may fail to be initialized. In this case, only some columns are synchronized, or the data synchronization task fails. Proceed with caution.

      Index Name

      • Table Name

        If you select Table Name, the index created on the Elasticsearch cluster uses the same as the table in the PolarDB for MySQL database. If you select this option in this example, the index name is product.

      • Database Name_Table Name

        If you select Database Name_Table Name, the index created on the Elasticsearch cluster is named in the format of Database name_Table name. If you select this option in this example, the index name is estest_product.

      Select the objects to be synchronized

      Select one or more objects from the Source Objects section and click the 向右 icon to move the objects to the Selected Objects section.

      Rename databases and tables

      • To rename an object that you want to synchronize to the destination instance, right-click the object in the Selected Objects section. For more information, see the Map the name of a single object section of the "Map object names" topic.

      • To rename multiple objects at a time, click Batch Edit in the upper-right corner of the Selected Objects section. For more information, see the Map multiple object names at a time section of the "Map object names" topic.

    3. In the Object Configurations step, configure mapped fields. You can choose whether to change the names of synchronized fields in the destination based on your business requirements.

      If you want to change the names of synchronized fields in the destination, right-click the name of the table to which the fields belong in the Selected Objects section. In the Column section of the Edit Table Name dialog box, change field names. You can also change the values of the Index Name and Type Name parameters. After the configuration is complete, click OK. For more information, see Map the name of a single object.

      Parameter

      Description

      Index Name

      The name of the destination index. You can specify a name based on your business requirements. For information about the concept of an index, see Terms.

      Important

      The name that you enter must be unique in the Elasticsearch cluster. Otherwise, the index already exists error message appears.

      Type Name

      The name of the destination index type. You can specify a name based on your business requirements. For information about the concept of an index type, see Terms.

      Filter Conditions

      You can specify SQL conditions to filter data. Only data that meets the specified conditions can be synchronized to the destination Elasticsearch cluster. For more information, see Use SQL conditions to filter data.

      Column

      Select the column parameter and parameter value. For more information, see Mapping parameters.

      Important

      If you set index to false for a field that you added, the field cannot be queried. For more information, see index.

    4. Configure advanced parameters.

      In this example, default values are retained for the advanced parameters. The following table describes the advanced parameters.

      Parameter

      Description

      Select the dedicated cluster used to schedule the task

      • By default, DTS schedules the data synchronization task by using a public cluster. You do not need to specify a node in a public cluster for task scheduling.

      • If you want to use a dedicated cluster, you must purchase one first, and make sure that the dedicated cluster can successfully run before you can select it.

      • You can purchase a dedicated cluster to run DTS tasks such as data migration tasks, data synchronization tasks, and subscription tasks. Dedicated clusters can isolate resources for your DTS tasks from the resources used by DTS instances of other users. Compared with DTS instances in public clusters, DTS instances in dedicated clusters have higher stability and better performance.

      Set Alerts

      Specifies whether to configure alerting for the data synchronization task. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:

      • No: does not configure alerting.

      • Yes: configures alerting. In this case, you must also configure the alert threshold and alert contacts. For more information, see Configure monitoring and alerting.

      Retry Time for Failed Connections

      The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set this parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.

      Note
      • If you specify different retry time ranges for multiple data synchronization tasks that have the same source or destination database, the shortest retry time range takes precedence.

      • When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at your earliest opportunity after the source and destination instances are released.

      Shard Configuration

      The number of primary shards and the number of replica shards for each primary shard. You can specify the numbers based on the default shard configuration of the destination Elasticsearch cluster. By default, an index in an Elasticsearch cluster whose version is earlier than V7.X has five primary shards and one replica shard for each primary shard. By default, an index in an Elasticsearch cluster whose version is V7.X or later has one primary shard and one replica shard for the primary shard.

      Important

      The number of shards and the size of each shard are important factors that affect the stability and performance of an Elasticsearch cluster. You must appropriately configure shards for indexes in an Elasticsearch cluster. For information about how to plan shards for indexes, see Evaluate specifications and storage capacity.

      String Index

      The method used to index strings that are synchronized to the destination Elasticsearch cluster. Valid values:

      • analyzed: indicates that the strings are analyzed before indexing. You must select an analyzer. For information about the analyzer types, reference Built-in analyzer reference.

      • not analyzed: indicates that the strings are indexed with the original values.

      • no: indicates that the strings are not indexed.

      Time Zone

      If you synchronize data of a date or time data type such as DATETIME or TIMESTAMP, you can select a time zone.

      Note

      If the date and time data type in the destination Elasticsearch cluster do not need a time zone, you must specify the document type for the date and time data types.

      DOCID

      The default value of this parameter is the primary key of the table in the destination Elasticsearch cluster. If the table does not have a primary key, the value of this parameter is the ID column that is automatically generated by Elasticsearch.

      Configure ETL

      • The ETL configuration feature does not support changes on table schemas in a destination. If changes on table schemas are required, you must make the changes in the destination before the data synchronization task starts.

      • If you change the existing ETL-related configurations, the data synchronization task may be interrupted, or historical data may be changed. Make sure that your business is not affected if you change the existing ETL-related configurations.

      • If you change the existing ETL-related configurations and restart the data synchronization task, the new configurations take effect only for incremental data that is generated after the restart. The new configurations do not take effect for historical data.

      • For more information about how to make ETL-related configurations, see Configure ETL in a data migration or data synchronization task.

    5. In the Configure Database and Table Fields step, configure the _routing policy and _id value of the table that you want to synchronize to the destination Elasticsearch cluster.

      In this example, the database name is estest, the table name is product, Set _routing is set to No, and Value of _id is set to id. The following table describes the Set _routing and Value of _id parameters.

      Parameter

      Description

      Set _routing

      Specifies whether to store a document on a specific shard of the destination Elasticsearch cluster. For more information, see _routing. Valid values:

      • Yes: You can specify custom columns for routing.

      • No: The _id value is used for routing.

      Note

      If the version of the destination Elasticsearch cluster is 7.x, you must select No.

      Value of _id

      • Primary key column

        Composite primary keys are merged into one column.

      • Business key

        If you select a business key, you must specify the business key column.

  3. Click Next: Save Task Settings and Precheck.

    Then, the system performs a precheck and starts the data synchronization task. You can view the data synchronization progress of the task on the Data Synchronization tab. When the system synchronizes incremental data after full data is synchronized, you can view the synchronized data in the destination Elasticsearch cluster.

Step 3: View the data synchronization result

  1. Log on to the Kibana console of your Elasticsearch cluster and go to the homepage of the Kibana console as prompted.
    For more information about how to log on to the Kibana console, see Log on to the Kibana console.
    Note In this example, an Elasticsearch V6.7.0 cluster is used. Operations on clusters of other versions may differ. The actual operations in the console prevail.
  2. In the left-side navigation pane of the page that appears, click Dev Tools.
  3. On the Console tab of the page that appears, run the following command to query the synchronized full data:

    GET /product/_doc/_search

    If the data synchronization is successful, the result shown in the following figure is returned.查询返回结果

  4. Perform operations in the console to query the synchronized data.

    1. Create an index pattern for the destination index.

      1. In the left-side navigation pane, click Management.

      2. In the Kibana section, click Index Patterns.

      3. Click Create index pattern.

      4. In the Create index pattern section, enter an index pattern name that matches the name of the destination index in the Index pattern name field. For example, you can enter product* or product.

      5. Click Next step.

      6. Click Create index pattern.

    2. In the left-side navigation pane, click Discover.

    3. Select the index pattern that you created to view the synchronized data.

      查看同步成功的数据

Step 4: Verify incremental data synchronization

  1. Log on to the PolarDB console.

  2. Execute the following statement to insert a data record into the PolarDB for MySQL database:

    INSERT INTO `estest`.`product` (`id`,`name`,`price`,`code`,`color`) VALUES (6,'mobile phone F','2750','fmp','white');
  3. Log on to the Kibana console of the destination Elasticsearch cluster. In the left-side navigation pane, click Discover. Then, Select the index pattern that you created to view the synchronized incremental data.

    查看同步成功的增量数据

    Note

    After you delete or modify data in the source PolarDB for MySQL database, you can use the same method to verify data synchronization.