This topic describes how to synchronize data from a PolarDB for MySQL cluster to a Message Queue for Apache Kafka instance by using Data Transmission Service (DTS). This helps you improve your capability of managing messages.

Prerequisites

Limits

CategoryDescription
Limits on the source database
  • The tables to be synchronized must have PRIMARY KEY or UNIQUE constraints, and all fields must be unique. Otherwise, the destination may contain duplicate data records.
  • If you select tables as the objects to be synchronized and you need to edit the tables, such as renaming tables or columns, in the destination database, you can synchronize up to 1,000 tables in a single data synchronization task. If you run a task to synchronize more than 1,000 tables, a request error occurs. In this case, we recommend that you configure multiple tasks to synchronize the tables or configure a task to synchronize the entire database.
  • If you need to synchronize incremental data, the binary logging feature must be enabled and the loose_polar_log_bin parameter must be set to on. Otherwise, error messages are returned during precheck and the data synchronization task cannot be started. For more information about how to enable the binary logging feature and set the loose_polar_log_bin parameter, see Enable binary logging and Modify parameters.
    Note
    • If you enable the binary logging feature for a PolarDB for MySQL cluster, you are charged for the storage space that is occupied by binary logs.
    • For an incremental data synchronization task, the binary logs of the source database are retained for at least 24 hours. For a full and incremental data synchronization task, the binary logs of the source database are retained for at least seven days. Otherwise, DTS may fail to obtain the binary logs and the task may fail. In exceptional circumstances, data inconsistency or loss may occur. After full data synchronization is complete, you can set the retention period to more than 24 hours. Make sure that you set the retention period of binary logs based on the preceding requirements. Otherwise, the SLA of DTS does not guarantee service reliability or performance.

Other limits
  • Read-only nodes of the source PolarDB for MySQL cluster cannot be synchronized.
  • Before you synchronize data, evaluate the impact of data synchronization on the performance of the source and destination databases. We recommend that you synchronize data during off-peak hours. During initial full data synchronization, DTS uses the read and write resources of the source and destination databases. This may increase the loads on the database servers.
  • During initial full data synchronization, concurrent INSERT operations cause fragmentation in the tables of the destination database. After full data synchronization is complete, the tablespace of the destination database is larger than that of the source database.
  • We recommend that you do not use tools such as pt-online-schema-change to perform DDL operations on source tables during data synchronization. Otherwise, data synchronization may fail.
  • If you use only DTS to write data to the destination database, you can use Data Management (DMS) to perform online DDL operations on source tables during data synchronization. For more information, see Perform lock-free operations.
  • During data synchronization, we recommend that you use only DTS to write data to the destination database. This prevents data inconsistency between the source and destination databases. If you use tools other than DTS to write data to the destination database, data loss may occur in the destination database when you use DMS to perform online DDL operations.
Usage notesDTS executes the CREATE DATABASE IF NOT EXISTS `test` statement in the source database as scheduled to move forward the binary log file position.

Size limit of a single record

The maximum size of a single record that can be written to Kafka is 10 MB. Therefore, if a row of source data exceeds 10 MB, the relevant DTS task will be interrupted because DTS cannot write the record to Kafka. In this scenario, we recommend that you do not synchronize the whole tables that contain large fields and synchronize only some fields of the tables. When you configure a DTS task, you must exclude the records of these large fields. If tables that contain large fields are included in the objects of the task, you must remove the tables, add the tables to the objects again, and then set filter conditions to exclude the large fields.

Supported synchronization topologies

  • One-way one-to-one synchronization
  • One-way one-to-many synchronization
  • One-way many-to-one synchronization
  • One-way cascade synchronization
For more information synchronization topologies that are supported by DTS, see Synchronization topologies.

SQL operations that can be synchronized

Operation typeSQL statement
DMLINSERT, UPDATE, and DELETE
DDL
  • CREATE TABLE, ALTER TABLE, DROP TABLE, RENAME TABLE, and TRUNCATE TABLE
  • CREATE VIEW, ALTER VIEW, and DROP VIEW
  • CREATE PROCEDURE, ALTER PROCEDURE, and DROP PROCEDURE
  • CREATE FUNCTION, DROP FUNCTION, CREATE TRIGGER, and DROP TRIGGER
  • CREATE INDEX and DROP INDEX

Permissions required for database accounts

DatabaseRequired permission
Source PolarDB for MySQL clusterRead permissions on the objects to be synchronized

Procedure

  1. Go to the Data Synchronization Tasks page.
    1. Log on to the Data Management (DMS) console.
    2. In the top navigation bar, click DTS.
    3. In the left-side navigation pane, choose DTS (DTS) > Data Synchronization.
    Note
  2. From the drop-down list to the right of Data Synchronization Tasks, select the region in which the data synchronization instance resides.
    Note If you use the new DTS console, you must select the region in which the data synchronization instance resides in the top navigation bar.
  3. Click Create Task. On the page that appears, configure the source and destination databases.
    Warning After you select the source and destination instances, we recommend that you read the limits displayed in the upper part of the page. This helps you create and run the data synchronization task.
    SectionParameterDescription
    N/ATask Name

    DTS automatically generates a task name. We recommend that you specify an informative name to identify the task. You do not need to use a unique task name.

    Source DatabaseSelect Instance
    Select whether to use an existing instance.
    • If you select an existing instance, DTS automatically applies the parameter settings of the instance. You do not need to configure the corresponding parameters again.
    • If you do not use an existing instance, you must configure parameters for the source database.
    Database TypeThe type of the source database. Select PolarDB for MySQL.
    Access MethodThe access method of the source database. Select Alibaba Cloud Instance.
    Instance RegionThe region in which the source PolarDB for MySQL cluster resides.
    Replicate Data Across Alibaba Cloud AccountsSpecifies whether data is synchronized across Alibaba Cloud accounts. In this example, No is selected.
    PolarDB Cluster IDThe ID of the source PolarDB for MySQL cluster.
    Database AccountThe database account of the source PolarDB for MySQL cluster. For more information about the permissions that are required for the account, see Permissions required for database accounts.
    Database Password

    The password of the database account.

    Encryption

    Specifies whether to encrypt the connection to the database. You can select Non-encrypted or SSL-encrypted based on your business requirements. If you want to select SSL-encrypted, you must enable SSL encryption for the ApsaraDB RDS for MySQL instance before you configure the data synchronization task. For more information, see Configure SSL encryption for an ApsaraDB RDS for MySQL instance.

    Destination DatabaseSelect Instance
    Select whether to use an existing instance.
    • If you select an existing instance, DTS automatically applies the parameter settings of the instance. You do not need to configure the corresponding parameters again.
    • If you do not use an existing instance, you must configure parameters for the source database.
    Database TypeThe type of the destination database. Select Kafka.
    Access MethodThe access method of the destination database. Select Express Connect, VPN Gateway, or Smart Access Gateway.
    Note You cannot select Message Queue for Apache Kafka as the instance type. You can use Message Queue for Apache Kafka as a self-managed Kafka instance to configure data synchronization.
    Instance RegionThe region in which the destination Message Queue for Apache Kafka instance resides.
    Connected VPCThe ID of the virtual private cloud (VPC) to which the destination Message Queue for Apache Kafka instance belongs. To obtain the VPC ID, perform the following operations: Log on to the Message Queue for Apache Kafka console and go to the Instance Details page of the Message Queue for Apache Kafka instance. In the Configuration Information section, view the VPC ID.
    IP AddressEnter an IP address that is included in the Default Endpoint parameter of the Message Queue for Apache Kafka instance.
    Note To obtain an IP address, perform the following operations: Log on to the Message Queue for Apache Kafka console and go to the Instance Details page of the Message Queue for Apache Kafka instance. In the Endpoint Information section, obtain an IP address from the Default Endpoint parameter.
    Port NumberThe service port number of the destination Message Queue for Apache Kafka instance. The default port number is 9092.
    Database AccountThe database account of the destination Message Queue for Apache Kafka instance.
    Note If the instance type of the Message Queue for Apache Kafka instance is VPC Instance, you do not need to specify the database account or database password.
    Database Password

    The password of the database account.

    Kafka VersionThe version of the destination Message Queue for Apache Kafka instance.
    EncryptionSpecify whether to encrypt the connection. Select Non-encrypted or SCRAM-SHA-256 based on your business and security requirements.
    TopicThe topic used to receive the synchronized data. Select a topic from the drop-down list.
    Topic That Stores DDL InformationThe topic used to store the DDL information. Select a topic from the drop-down list. If you do not specify this parameter, the DDL information is stored in the topic that is specified by the Topic parameter.
    Use Kafka Schema RegistrySpecifies whether to use Kafka Schema Registry, which provides a serving layer for your metadata. It provides a RESTful API for storing and retrieving your Avro schemas. Valid values:
    • No: does not use Kafka Schema Registry.
    • Yes: uses Kafka Schema Registry. In this case, you must enter the URL or IP address that is registered in Kafka Schema Registry for your Avro schemas.
  4. If a whitelist is configured for your self-managed database, add the CIDR blocks of DTS servers to the whitelist. Then, click Test Connectivity and Proceed.
    Note For more information about the CIDR blocks of DTS servers, see Add the CIDR blocks of DTS servers to the security settings of on-premises databases.
  5. Select objects for the task and configure advanced settings.
    ParameterDescription
    Task Stages

    By default, Incremental Data Synchronization is selected. You must also select Schema Synchronization and Full Data Synchronization. After the precheck is complete, DTS synchronizes the historical data of the selected objects from the source database to the destination database. The historical data is the basis for subsequent incremental synchronization.

    Processing Mode of Conflicting Tables
    • Precheck and Report Errors: checks whether the destination database contains tables that have the same names as tables in the source database. If the source and destination databases do not contain tables that have identical table names, the precheck is passed. Otherwise, an error is returned during the precheck, and the data synchronization task cannot be started.

      Note You can use the object name mapping feature to rename the tables that are synchronized to the destination database. You can use this feature if the source and destination databases contain identical table names and the tables in the destination database cannot be deleted or renamed. For more information, see Map object names.
    • Ignore Errors and Proceed: skips the precheck for identical table names in the source and destination databases.
      Warning If you select Ignore Errors and Proceed, data inconsistency may occur, and your business may be exposed to potential risks.
      • If the source and destination databases have the same schemas, and a data record has the same primary key value as an existing data record in the destination database:
        • During full data synchronization, DTS does not synchronize the data record to the destination database. The existing data record in the destination database is retained.
        • During incremental data synchronization, DTS synchronizes the data record to the destination database. The existing data record in the destination database is overwritten.
      • If the source and destination databases have different schemas, data may fail to be initialized. In this case, only some columns are synchronized or the data synchronization task fails.
    Data Format in KafkaThe format in which data is stored in the Message Queue for Apache Kafka instance.
    • If you select DTS Avro, data is parsed based on the schema definition of DTS Avro. For more information, visit GitHub.
    • If you select Canal Json, data is stored in the Canal JSON format. For more information about the related parameters and examples, see the "Canal JSON" section of the Data formats of a Kafka cluster topic.
    Policy for Shipping Data to Kafka PartitionsSelect a synchronization policy for data synchronized to Kafka partitions based on your business requirements. For more information, see Specify the policy for migrating data to Kafka partitions.
    Capitalization of Object Names in Destination Instance

    The capitalization of database names, table names, and column names in the destination instance. By default, DTS default policy is selected. You can select other options to make sure that the capitalization of object names is consistent with that in the source or destination database. For more information, see Specify the capitalization of object names in the destination instance.

    Source Objects

    Select one or more objects from the Source Objects section and click the Rightwards arrow icon to add the objects to the Selected Objects section.

    Note You can select only tables as the objects to be synchronized.
    Selected Objects
    • To rename an object that you want to synchronize to the destination instance, right-click the object in the Selected Objects section. For more information, see Map the name of a single object.
    • To rename multiple objects at a time, click Batch Edit in the upper-right corner of the Selected Objects section. For more information, see Map multiple object names at a time.
    Note
    • To select the SQL operations performed on a specific database or table, perform the following steps: In the Selected Objects section, right-click an object. In the dialog box that appears, select the SQL operations that you want to synchronize. For more information about the SQL operations that can be synchronized, see SQL operations that can be synchronized.
    • To specify WHERE conditions to filter data, right-click an object in the Selected Objects section. In the dialog box that appears, specify the conditions. For more information about how to specify the conditions, see Use SQL conditions to filter data.
  6. Click Next: Advanced Settings to configure advanced settings.
    ParameterDescription
    Set Alerts
    Specifies whether to configure alerting for the data synchronization task. If the task fails or the synchronization latency exceeds the specified threshold, alert contacts will receive notifications. Valid values:
    Retry Time for Failed Connection
    The retry time range for failed connections. If the source or destination database fails to be connected after the data synchronization task is started, DTS immediately retries a connection within the time range. Valid values: 10 to 1440. Unit: minutes. Default value: 720. We recommend that you set the parameter to a value greater than 30. If DTS reconnects to the source and destination databases within the specified time range, DTS resumes the data synchronization task. Otherwise, the data synchronization task fails.
    Note
    • If you set different retry time ranges for multiple DTS tasks that have the same source or destination database, the shortest retry time range that is set takes precedence.
    • When DTS retries a connection, you are charged for the DTS instance. We recommend that you specify the retry time range based on your business requirements. You can also release the DTS instance at the earliest opportunity after the source and destination instances are released.
    Configure ETL
    Specifies whether to configure the extract, transform, and load (ETL) feature. For more information, see What is ETL?. Valid values:
    Whether to delete SQL operations on heartbeat tables of forward and reverse tasks
    Specifies whether to write SQL operations on heartbeat tables to the source database while the DTS instance is running.
    • Yes: does not write SQL operations on heartbeat tables. In this case, a latency of the DTS instance may be displayed.
    • No: writes SQL operations on heartbeat tables. In this case, specific features such as physical backup and cloning of the source database may be affected.
  7. In the lower part of the page, click Next: Save Task Settings and Precheck.
    Note
    • Before you can start the data synchronization task, DTS performs a precheck. You can start the data synchronization task only after the task passes the precheck.
    • If the task fails to pass the precheck, click View Details next to each failed item. After you analyze the causes based on the check results, troubleshoot the issues. Then, run a precheck again.
    • If an alert is generated for an item during the precheck, perform the following operations based on the scenario:
      • In scenarios where you cannot ignore the alert item, click View Details next to the failed item. After you analyze the causes based on the check results, troubleshoot the issues. Then, run a precheck again.
      • In scenarios where you can ignore the alert item, click Confirm Alert Details next to the failed item. In the View Details dialog box, click Ignore. In the message that appears, click OK. Then, click Precheck Again to run a precheck again. If you ignore the alert item, data inconsistency may occur, and your business may be exposed to potential risks.
  8. Wait until the success rate becomes 100%. Then, click Next: Purchase Instance.
  9. On the Purchase Instance page, configure the billing method and instance class parameters for the data synchronization instance. The following table describes the parameters.
    SectionParameterDescription
    New Instance ClassInstance ClassDTS provides several instance classes that have different performance in synchronization speed. You can select an instance class based on your business scenario. For more information, see Specifications of data synchronization instances.
  10. Read and select the check box for Data Transmission Service (Pay-as-you-go) Service Terms.
  11. Click Buy and Start to start the data synchronization task. You can view the progress of the task in the task list.