Synchronize change data in a database to ApsaraMQ for RocketMQ by using Canal

Updated at: 2025-03-26 07:16

This topic describes how to synchronize change data in a database to ApsaraMQ for RocketMQ by using Canal.

Background information

Change data capture (CDC) is a solution that is used to identify and capture changes to data in a database. CDC is commonly used to synchronize data between heterogeneous data sources. Canal is a lightweight CDC tool that can be used to parse incremental logs in a database and allows you to subscribe to and consume incremental data. Canal can deliver data change records to ApsaraMQ for RocketMQ and implement diverse business logic by using various message processing policies provided by ApsaraMQ for RocketMQ.

Canal is an open source project. For more information, see canal.

Scenarios

The following items describe common scenarios in which incremental data is subscribed to and consumed based on binary logs:

  • Database mirroring

  • Real-time database backup

  • Index building and real-time index maintenance, such as sharding heterogeneous indexes and inverted indexes

  • Update of business cache

  • Processing of incremental data related to business logic

Solution introduction

The following figure shows the CDC solution that is used to synchronize change data in a database to ApsaraMQ for RocketMQ by using Canal.

CDC方案

In the preceding figure, Canal is disguised as a library that is used to listen to and receive binary logs from the MySQL database. Then, the logs are synchronized to storage or middleware systems such as ApsaraMQ for RocketMQ.

The following steps are performed:

  1. Configure MySQL: Enable the binary logging feature for MySQL and create the database and tables that are required by the test.

  2. Deploy Canal: Deploy Canal Deployer, which serves as a server, and use Canal Deployer to receive the binary logs of the MySQL database.

  3. Verify the result: Verify message sending after data is changed.

Environment requirements

  • Resources

  • Networks

    • The nodes on which Canal Deployer is deployed can connect to databases and ApsaraMQ for RocketMQ instances. In most cases, ECS instances and containers that reside in virtual private clouds (VPCs) can be used as nodes.

  • Versions

    Service

    Version

    Description

    Service

    Version

    Description

    Canal

    1.1.6

    For information about other versions, see Canal.

    MySQL

    8.0

    Canal supports source databases whose engines are MySQL 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x.

    ApsaraMQ for RocketMQ

    5.x

    Canal supports ApsaraMQ for RocketMQ 4.x and 5.x instances. We recommend that you use ApsaraMQ for RocketMQ 5.x instances.

    Important

    Canal does not support serverless ApsaraMQ for RocketMQ 5.x instances.

1. Configure MySQL

1.1 Enable the binary logging feature

ApsaraDB RDS for MySQL instance
Self-managed MySQL database

The binary logging feature is automatically enabled for an ApsaraDB RDS for MySQL instance, and an Alibaba Cloud account is automatically granted the permissions to dump binary logs. You can skip this step.

  • Enable the binary logging feature and specify ROW as the format of binary logs. The following sample code provides an example of the configurations in the my.cnf file:

    [mysqld]
    log-bin=mysql-bin  # Enable the binary logging feature.
    binlog-format=ROW  # Specify ROW as the format of binary logs.
    server_id=1  # Specify the server ID of the MySQL database. The server ID cannot be the same as the secondary node ID of Canal.
  • Create a user named canal and grant the user permissions on the secondary MySQL node.

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

1.2 Create a database

Execute the following SQL statement to create a database named canal:

CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

1.3 Create a table

Execute the following SQL statement to create a table named students in the canal database:

CREATE TABLE students (
    id INT AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    age INT,
    gender VARCHAR(10),
    PRIMARY KEY (id)
);

2. Deploy Canal

2.1 Install JDK

Run the following command to install JDK:

sudo yum install java-1.8.0-openjdk

2.2 Download Canal Deployer

Run the following command to download the installation package of Canal Deployer:

sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

2.3 Decompress the installation package

Run the following commands to create a directory named canal-server and decompress the downloaded installation package to the directory:

# Create a directory named canal-server.
sudo mkdir -p /usr/local/canal-server

# Decompress the downloaded installation package to the canal-server directory.
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server

2.4 Modify configurations

  1. Run the following command to configure the canal.properties file:

sudo vi /usr/local/canal-server/conf/canal.properties
# The server mode.
canal.serverMode = rocketMQ
# The username that is used to access the ApsaraMQ for RocketMQ instance. You can obtain the username on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
canal.aliyun.accessKey = 6W0xz2uPf******
# The password that is used to access the ApsaraMQ for RocketMQ instance. You can obtain the password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
canal.aliyun.secretKey = sK56k1DrGx******
# The method that is used to access message queues.
canal.mq.accessChannel = cloud
# The format in which messages are sent. ApsaraMQ for RocketMQ does not allow you to encapsulate multiple change records into a message. You must set canal.mq.flatMessage to false. After a consumer receives a message body, the consumer must deserialize the message body. For Java applications, you can use the com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) method to implement the deserialization. For applications written in other programming languages, you can refer to the preceding Java method to implement the deserialization. 
canal.mq.flatMessage = false
# The name of the group on the ApsaraMQ for RocketMQ instance.
rocketmq.producer.group = canal_test
# Specify whether to enable the message trace feature.
rocketmq.enable.message.trace = false
# The topic that is used to store message traces.
rocketmq.customized.trace.topic = 
# The namespace of the ApsaraMQ for RocketMQ instance. If the instance is an ApsaraMQ for RocketMQ 5.x instance, you do not need to configure this parameter. 
rocketmq.namespace =
# The endpoint of the ApsaraMQ for RocketMQ instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for RocketMQ console. 
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# The number of retries.
rocketmq.retry.times.when.send.failed = 0
# Specify whether to enable the VIP Netty channel to send messages.
rocketmq.vip.channel.enabled = false
# The message tag.
rocketmq.tag = 
  1. Run the following command to configure the instance.properties file:

sudo vi /usr/local/canal-server/conf/example/instance.properties
# The endpoint of the ApsaraDB RDS for MySQL instance.
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# The username that is used to access the ApsaraDB RDS for MySQL database.
canal.instance.dbUsername=xxx
# The password that is used to access the ApsaraDB RDS for MySQL database.
canal.instance.dbPassword=xxx
# The MySQL tables to which Canal listens. The value of this parameter can be a regular expression. The value canal\\..* specifies that all tables in the canal schema are listened to. 
canal.instance.filter.regex=canal\\..*
# The name of the topic on the ApsaraMQ for RocketMQ instance.
canal.mq.topic=canal_topic

2.5 Start Canal Deployer

Run the following command to start Canal Deployer:

/usr/local/canal-server/bin/startup.sh

2.6 Verify the startup

  1. Run the following command to view the canal.log file and check whether Canal is started:

sudo vi /usr/local/canal-server/logs/canal/canal.log  
2024-07-15 17:24:12.154 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO  c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  1. Run the following command to view the example.log file and confirm that the Canal instance is started:

sudo vi /usr/local/canal-server/logs/example/example.log 
2024-07-15 18:22:15.667 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

3. Verify the result

3.1 Add a data entry to the MySQL database

Execute the following SQL statement to add a data entry to the students table created in 1.3 Create a table:

INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');

3.2 View the message sent by Canal

Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of the instance that you configured. In the left-side navigation pane of the Instance Details page, click Message Query to view the message sent by Canal.

image

  • On this page (1, T)
  • Background information
  • Scenarios
  • Solution introduction
  • Environment requirements
  • 1. Configure MySQL
  • 1.1 Enable the binary logging feature
  • 1.2 Create a database
  • 1.3 Create a table
  • 2. Deploy Canal
  • 2.1 Install JDK
  • 2.2 Download Canal Deployer
  • 2.3 Decompress the installation package
  • 2.4 Modify configurations
  • 2.5 Start Canal Deployer
  • 2.6 Verify the startup
  • 3. Verify the result
  • 3.1 Add a data entry to the MySQL database
  • 3.2 View the message sent by Canal
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare