This topic describes how to create a MySQL source connector and use DataWorks to synchronize data from ApsaraDB RDS for MySQL to topics in your ApsaraMQ for Kafka instance.
Prerequisites
- The connector feature is enabled for your ApsaraMQ for Kafka instance. For more information, see Enable the connector feature. Important Make sure that your ApsaraMQ for Kafka instance is deployed in the China (Shenzhen), China (Chengdu), China (Beijing), China (Zhangjiakou), China (Hangzhou), China (Shanghai), or Singapore region.
- An ApsaraDB RDS for MySQL instance is created. For information about how to create an ApsaraDB RDS for MySQL instance, see Create an ApsaraDB RDS for MySQL instance.
- A database and a database account are created in the ApsaraDB RDS for MySQL instance. For information about how to create a database and a database account, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.
- A table is created in the database. For information about the common SQL statements that are used in ApsaraDB RDS for MySQL, see Commonly used SQL statements for MySQL.
- DataWorks is authorized to access your elastic network interfaces (ENIs) regardless of whether you use an Alibaba Cloud account or a Resource Access Management (RAM) user. To grant permissions to an account, go to the Cloud Resource Access Authorization page. Important If you use a RAM user, make sure that the RAM user is granted the following permissions:
- AliyunDataWorksFullAccess: the permissions to manage all DataWorks resources within the Alibaba Cloud account.
- AliyunBSSOrderAccess: the permissions to purchase Alibaba Cloud services.
For information about how to attach policies to RAM users, see Step 2: Grant permissions to RAM users.
- Both the data source and the data destination are created by using your account. The data source is an ApsaraDB RDS for MySQL instance. The data destination is your ApsaraMQ for Kafka instance.
- The CIDR block of the virtual private cloud (VPC) where the ApsaraDB RDS for MySQL instance resides and the CIDR block of the VPC where the ApsaraMQ for Kafka instance resides do not overlap. If the CIDR blocks overlap, a MySQL source connector cannot be created in Message Queue for Apache Kafka.
Background information
You can create a MySQL source connector in the ApsaraMQ for Kafka console to synchronize data from tables in your ApsaraDB RDS for MySQL instance to topics in your ApsaraMQ for Kafka instance. The connector is created and run by using DataWorks, as shown in the following figure.After a MySQL source connector is created in the ApsaraMQ for Kafka console, DataWorks Basic Edition is automatically activated free of charge. DataWorks Basic Edition creates DataWorks workspaces and exclusive resource groups for data integration. The DataWorks workspaces are free of charge, and exclusive resource groups are paid services. The specifications of an exclusive resource group for data integration are 4 vCPUs and 8 GB memory. Resource groups support monthly subscriptions. By default, an exclusive resource group for data integration is automatically renewed upon expiration. For more information about the billing of DataWorks, see Overview.
In addition, DataWorks automatically generates destination topics in ApsaraMQ for Kafka based on the configurations of your MySQL source connector. Source tables and destination topics are related based on one-to-one mappings. By default, DataWorks generates a topic that contains six partitions for each table that has a primary key and a topic that contains one partition for each table that does not have a primary key. Make sure that the available numbers of topics and partitions in your Message Queue for Apache Kafka instance are sufficient. Otherwise, the MySQL source connector fails to be created because DataWorks fails to create topics.
The name of each topic is in the <Specified prefix>_<Name of the source table>
format. The underscore (_) is automatically added by the system. The following figure provides an example.
In this example, the specified prefix is mysql. The source tables to be synchronized are table_1, table_2, ..., and table_n. DataWorks automatically generates topics for you to receive the data that is synchronized from the source tables. The topics are named mysql_table_1, mysql_table_2, ..., and mysql_table_n.
Precautions
- Regions
- Your data source and destination Message Queue for Apache Kafka instance may not be deployed in the same region. In this case, make sure that you have a Cloud Enterprise Network (CEN) instance within the Alibaba Cloud account and that the VPCs of your data source and Message Queue for Apache Kafka instance are attached to the CEN instance. In addition, make sure that the bandwidth for cross-region connections is configured to ensure end-to-end connectivity between the data source and the data destination.
Otherwise, a CEN instance may be automatically created, and the VPCs of your destination Message Queue for Apache Kafka instance and the Elastic Compute Service (ECS) instance where your exclusive resource group resides are attached to the CEN instance to ensure end-to-end connectivity. The bandwidth of the automatically created CEN instance is extremely low because the bandwidth is not manually configured. This may cause a connectivity error when you create a MySQL source connector or when the connector is running.
- Your data source and destination Message Queue for Apache Kafka instance may be in the same region. In this case, when you create a MySQL source connector, an ENI is automatically created in the VPC of the data source or Message Queue for Apache Kafka instance. The ENI is also automatically bound to the ECS instance where your exclusive resource group resides. This ensures end-to-end connectivity between the data source and the data destination.
- Your data source and destination Message Queue for Apache Kafka instance may not be deployed in the same region. In this case, make sure that you have a Cloud Enterprise Network (CEN) instance within the Alibaba Cloud account and that the VPCs of your data source and Message Queue for Apache Kafka instance are attached to the CEN instance. In addition, make sure that the bandwidth for cross-region connections is configured to ensure end-to-end connectivity between the data source and the data destination.
- Exclusive resource groups in DataWorks
- DataWorks allows you to use each exclusive resource group to run up to three MySQL source connectors. If DataWorks finds that an existing resource group has been used to run less than three MySQL source connectors when you create a MySQL source connector, DataWorks uses this resource group to run the newly created MySQL source connector.
- Each exclusive resource group in DataWorks can be associated with the ENIs of up to two VPCs. DataWorks may fail to create a MySQL source connector by using a resource group if the ENI to be associated with the resource group and ENIs that are already associated with the resource group have overlapping CIDR blocks or due to other technical limits. In this case, even if less than three connectors are running on this resource group, DataWorks still creates a resource group to ensure that the MySQL source connector can be created.
Create and deploy a MySQL source connector
- Log on to the ApsaraMQ for Kafka console.
- In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
- In the left-side navigation pane, click Connectors.
- On the Connectors page, select the instance in which the data source topic resides from the Select Instance drop-down list and click Create Connector.
- In the Create Connector wizard, perform the following operations:
- Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column. On the Connectors page, if the value of the Status parameter is displayed as Running for the connector, the connector is created.Note If the connector fails to be created, check whether all the prerequisites that are described in this topic are met.
If you need to modify the configurations of the MySQL source connector, click Task Configurations in the Actions column. In the DataWorks console to which you are redirected, modify the configurations of the connector.
Verify the result
- Insert data into a source table in the ApsaraDB RDS for MySQL database. The following sample code provides an example on how to insert data into a source table:
For more information about SQL statements, see Commonly used SQL statements for MySQL.INSERT INTO mysql_tbl (mysql_title, mysql_author, submission_date) VALUES ("mysql2kafka", "tester", NOW())
- Use the message query feature of ApsaraMQ for Kafka to check whether the data of the source table in the ApsaraDB RDS for MySQL database is synchronized to the corresponding topic in your ApsaraMQ for Kafka instance. For more information, see Query messages.The following sample code provides an example of the data that is synchronized from a source table in ApsaraDB RDS for MySQL to a topic in ApsaraMQ for Kafka. For more information about message structures and fields, see Appendix: Message formats.
{ "schema":{ "dataColumn":[ { "name":"mysql_id", "type":"LONG" }, { "name":"mysql_title", "type":"STRING" }, { "name":"mysql_author", "type":"STRING" }, { "name":"submission_date", "type":"DATE" } ], "primaryKey":[ "mysql_id" ], "source":{ "dbType":"MySQL", "dbName":"mysql_to_kafka", "tableName":"mysql_tbl" } }, "payload":{ "before":null, "after":{ "dataColumn":{ "mysql_title":"mysql2kafka", "mysql_author":"tester", "submission_date":1614700800000 } }, "sequenceId":"1614748790461000000", "timestamp":{ "eventTime":1614748870000, "systemTime":1614748870925, "checkpointTime":1614748870000 }, "op":"INSERT", "ddl":null }, "version":"0.0.1" }