This topic describes how to use Canal to synchronize data from a MySQL database to ApsaraMQ for Kafka.
Background information
Canal can parse the incremental data of a MySQL database and allows you to subscribe to and consume the incremental data. Canal assumes the role of a secondary MySQL database and initiates a dump request to the primary MySQL database. After the primary MySQL database receives the dump request, the primary MySQL database pushes binary logs to Canal. Canal parses the binary logs, obtains the incremental data, and then synchronizes the incremental data. Canal can connect to ApsaraMQ for Kafka and write the incremental data of a MySQL database to ApsaraMQ for Kafka for analysis. For more information about how Canal works, see Canal documentation on GitHub.Prerequisites
Before you use Canal to synchronize data, make sure that the following conditions are met:
- MySQL is installed and initialized, and related settings are configured. For more information, see Canal Quick Start.
- An instance is created in the ApsaraMQ for Kafka console, and a topic is created in the instance. For more information, see Step 3: Create resources.
Procedure
- Download the Canal package. In the example of this topic, V1.1.5 is used.
- Run the following command to create a directory. In the example, the /home/doc/tools/canal.deployer-1.1.5 directory is created.
mkdir -p /home/doc/tools/canal.deployer-1.1.5
- Copy the Canal package to the /home/doc/tools/canal.deployer-1.1.5 directory and decompress the package.
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5
- In the /home/doc/tools/canal.deployer-1.1.5 directory, run the following command to modify the instance.properties file:
vi conf/example/instance.properties
Set the parameters based on the description in the Parameters in the instance.properties file table.
# Set the following parameters based on the information about your MySQL database. ################################################# ... # The URL of the database. canal.instance.master.address=192.168.XX.XX:3306 # The username and password that are used to connect to the database. ... canal.instance.dbUsername=**** canal.instance.dbPassword=**** ... # mq config # The topic that you created in the Message Queue for Apache Kafka console. canal.mq.topic=mysql_test # Specify a dynamic topic based on the database name or table name. #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* # The partition in the Message Queue for Apache Kafka topic to which data is to be synchronized. canal.mq.partition=0 # The following two parameters cannot be set together with the canal.mq.partition parameter. If you set the following two parameters, data is synchronized to different partitions in the Message Queue for Apache Kafka topic. #canal.mq.partitionsNum=3 # Database name.Table name:Unique primary key. Separate multiple table settings with commas (,). #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
Table 1. Parameters in the instance.properties file Parameter Required Description canal.instance.master.address Yes The URL of the MySQL database. canal.instance.dbUsername Yes The username that is used to connect to the MySQL database. canal.instance.dbPassword Yes The password that is used to connect to the MySQL database. canal.mq.topic Yes The topic in the ApsaraMQ for Kafka instance. You can create a topic on the Topics page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources. canal.mq.dynamicTopic No The regular expression that is used to match dynamic topics. After you specify the regular expression, Canal evaluates the expression to synchronize data of different tables from the MySQL database to different topics in the Message Queue for Apache Kafka instance. For more information, see Parameter description. canal.mq.partition No The partition in the ApsaraMQ for Kafka topic to which the database data is to be synchronized. canal.mq.partitionsNum No The number of partitions in the topic. This parameter is used together with the canal.mq.partitionHash parameter to synchronize data to different partitions in the ApsaraMQ for Kafka topic. canal.mq.partitionHash No The regular expression that is used to match partitions. For more information, see Parameter description. - Run the following command to open the canal.properties file:
vi conf/canal.properties
Set the parameters based on the description in the Parameters in the canal.properties file table.
- If you want to connect a client to ApsaraMQ for Kafka over the Internet, the SASL_SSL protocol is used for authentication and encryption, and the Secure Sockets Layer (SSL) endpoint is required. For more information about endpoints, see Comparison among endpoints.
# ... # Set the value to kafka. canal.serverMode = kafka # ... # Configure the settings of Message Queue for Apache Kafka. # The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache Kafka console. kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093 # Set the parameters as required or retain the following default settings: kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 # If a client connects to Message Queue for Apache Kafka over the Internet, the SASL_SSL protocol is used for authentication and encryption. You must specify the network protocol and identity authentication mechanism. kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks kafka.ssl.truststore.password= KafkaOnsClient kafka.security.protocol= SASL_SSL kafka.sasl.mechanism = PLAIN kafka.ssl.endpoint.identification.algorithm =
Table 2. Parameters in the canal.properties file Parameter Required Description canal.serverMode Yes The server type of Canal. Set this parameter to kafka. kafka.bootstrap.servers Yes The endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console. kafka.ssl.truststore.location Yes The storage path of the root SSL certificate kafka.client.truststore.jks. Note If a client connects to Message Queue for Apache Kafka over the Internet, authentication and encryption are required to ensure the security of message transmission. This means that you must use the SSL endpoint to connect to Message Queue for Apache Kafka and use the SASL_SSL protocol for authentication and encryption. For more information, see Comparison among endpoints.kafka.acks Yes The level of acknowledgment (ACK) that the client can receive from a ApsaraMQ for Kafka broker after the broker receives the message. Valid values: - 0: The client never waits for an ACK from the broker.
- 1: The client receives an ACK after the leader receives the message. The leader writes the message to its log but responds without waiting for a full ACK from all followers.
- all: The client receives an ACK after all in-sync replicas receive the message. The leader waits for the full set of in-sync replicas to acknowledge the message.
kafka.compression.type Yes The algorithm that is used to compress data. By default, data is not compressed. Valid values: - none
- gzip
- snappy
kafka.batch.size Yes The maximum size of messages to accumulate in a batch that the client will send. Unit: byte. This parameter specifies the maximum number of bytes that can be sent in a batch. Each time the client sends a request to a broker, data is distributed across batches. This reduces the number of requests to be sent. A small batch size may cause lower throughput, whereas a large batch size may cause more wasted memory space. After you set this parameter, a fixed size of buffer is allocated for message processing. This helps improve the performance of both clients and brokers.
Note The kafka.batch.size and kafka.linger.ms parameters are used to configure batch message processing. If the size of a batch or the duration that the client waits for messages to accumulate in the batch exceeds the specified threshold, the messages in the batch are ready to be sent.kafka.linger.ms Yes The maximum duration that the client waits for messages to accumulate in a batch. Unit: milliseconds. The client sends a request to a broker when the waiting duration reaches the specified value. This facilitates batch message processing and reduces the number of requests to be sent.
kafka.max.request.size Yes The maximum number of bytes of a request sent by the client. kafka.buffer.memory Yes The total size of memory that the client can use to buffer messages that are waiting to be sent to a broker. kafka.max.in.flight.requests.per.connection Yes The number of unacknowledged requests that the client can send on a single connection. If this parameter is set to 1, the client cannot send requests to the same broker before the broker responds to the request. kafka.retries Yes Specifies whether the client resends a message if the message fails to be sent. If you set this parameter to 0, the client does not resend a message when the message fails to be sent. If you set this parameter to a value greater than 0, the client resends a message when the message fails to be sent. kafka.ssl.truststore.password Yes The password of the truststore in the root SSL certificate. Set this parameter to KafkaOnsClient. kafka.security.protocol Yes Set this parameter to SASL_SSL if the SASL_SSL protocol is used for authentication and encryption. kafka.sasl.mechanism Yes The Simple Authentication and Security Layer (SASL) mechanism that is used for identity authentication. Set this parameter to PLAIN if the SSL endpoint is used to connect to Message Queue for Apache Kafka. If a client connects to Message Queue for Apache Kafka over the Internet, SASL is used for identity authentication. You must configure environment variables in the bin/startup.sh file and specify the username and password of the SASL user for the ApsaraMQ for Kafka instance in the kafka_client_producer_jaas.conf file.
- Run the
vi bin/startup.sh
command and configure environment variables in the startup.sh file:JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"
- Run the
vi conf/kafka_client_producer_jaas.conf
command and specify the username and password of the SASL user for the Message Queue for Apache Kafka instance in the kafka_client_producer_jaas.conf file.Note- If the access control list (ACL) feature is disabled for the Message Queue for Apache Kafka instance, you can obtain the default username and password of the SASL user on the Instance Details page in the ApsaraMQ for Kafka console.
- If the ACL feature is enabled for the Message Queue for Apache Kafka instance, make sure that the SASL user to be used is of the PLAIN type and that the user is authorized to send and consume messages. For more information, see Grant permissions to SASL users.
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="The username of the SASL user" password="The password of the SASL user"; };
- If a client connects to Message Queue for Apache Kafka in a virtual private cloud (VPC), authentication and encryption are not required. The PLAINTEXT protocol is used to transmit messages, and the default endpoint is used to connect to ApsaraMQ for Kafka. In this case, you need only to set the canal.serverMode and kafka.bootstrap.servers parameters. For more information about endpoints, see Comparison among endpoints.
# ... # Set the value to kafka. canal.serverMode = kafka # ... # Configure the settings of Message Queue for Apache Kafka. # The default endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint on the Instance Details page in the Message Queue for Apache Kafka console. kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 # Set the parameters as required or retain the following default settings: kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0
- If you want to connect a client to ApsaraMQ for Kafka over the Internet, the SASL_SSL protocol is used for authentication and encryption, and the Secure Sockets Layer (SSL) endpoint is required. For more information about endpoints, see Comparison among endpoints.
- In the /home/doc/tools/canal.deployer-1.1.5 directory, run the following command to start Canal:
sh bin/startup.sh
- Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log to confirm that Canal is connected to ApsaraMQ for Kafka and Canal is running.
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
- Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/example/example.log to confirm that a CanalInstance object is started.
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
- Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log to confirm that Canal is connected to ApsaraMQ for Kafka and Canal is running.
Test the configurations
After Canal is started, perform a data synchronization test.
- In a MySQL database named mysql, create a table named T_Student. The following sample code provides an example on how to query the data in the table:
mysql> select * from T_Student; +--------+---------+------+------+ | stuNum | stuName | age | sex | +--------+---------+------+------+ | 1 | Wang | 18 | girl | | 2 | Zhang | 17 | boy | +--------+---------+------+------+ 2 rows in set (0.00 sec)
Check the log file /home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log. Each time an add, a delete, or an update operation is performed in the database, a record is generated in the meta.log file. Check the log file to confirm whether Canal has collected data.tail -f example/meta.log 2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]
- Log on to the ApsaraMQ for Kafka console and query messages to check whether the incremental data of the MySQL database is synchronized to the ApsaraMQ for Kafka instance. For more information about how to query messages in the console, see Query messages.
- After data is synchronized, run the following command to stop Canal:
sh bin/stop.sh