All Products
Search
Document Center

ApsaraMQ for RocketMQ:Synchronize MySQL change data to ApsaraMQ for RocketMQ by using Canal

Last Updated:Mar 10, 2026

When your application needs to react to database changes in real time -- syncing a replica, refreshing a cache, or updating a search index -- you need a reliable way to capture those changes and deliver them downstream. Canal is a lightweight, open-source change data capture (CDC) tool that parses MySQL binary logs and delivers each row-level change to ApsaraMQ for RocketMQ as a message. Consumers then process these messages to drive business logic such as data synchronization, cache invalidation, or event-driven workflows.

This topic walks you through deploying Canal on an Elastic Compute Service (ECS) instance, connecting it to MySQL and ApsaraMQ for RocketMQ, and verifying end-to-end message delivery.

How it works

Canal connects to MySQL as a replication client, reads binary logs, and forwards each change record to ApsaraMQ for RocketMQ as a message. Downstream consumers then process these messages in real time.

CDC solution

The setup involves three steps:

  1. Configure MySQL -- Enable binary logging and create a test database and table.

  2. Deploy Canal -- Install Canal Deployer on an ECS instance and connect it to both MySQL and ApsaraMQ for RocketMQ.

  3. Verify the pipeline -- Insert a row into MySQL and confirm the corresponding message appears in the ApsaraMQ for RocketMQ console.

Use cases

CDC pipelines built with Canal and ApsaraMQ for RocketMQ are commonly used for:

  • Database mirroring -- Keep a replica database in sync with the source.

  • Real-time backup -- Stream incremental changes to a backup store without full dumps.

  • Index building and real-time maintenance -- Build and maintain indexes in real time, such as sharding heterogeneous indexes and inverted indexes.

  • Cache invalidation -- Refresh or invalidate business caches when underlying data is modified.

  • Processing of incremental data -- Trigger downstream workflows based on specific data changes.

Prerequisites

Important

Canal does not support serverless ApsaraMQ for RocketMQ 5.x instances. Use a standard (non-serverless) instance.

Resources

Before you begin, make sure that you have:

ResourceRequirementReference
ApsaraMQ for RocketMQ instanceA running 5.x instance with a topic and group createdCreate an instance
MySQL instanceA running MySQL instance (ApsaraDB RDS for MySQL or self-managed)Create an ApsaraDB RDS for MySQL instance
ECS instanceA machine to deploy Canal Deployer, with network access to both the MySQL and ApsaraMQ for RocketMQ instancesCreate and manage an ECS instance

Network

The ECS instance or container running Canal Deployer must reach both the MySQL instance and the ApsaraMQ for RocketMQ instance over the network. Deploy all resources in the same virtual private cloud (VPC) for the simplest setup.

Supported versions

ServiceVersionNotes
Canal1.1.6For other versions, see Canal releases.
MySQL8.0Canal also supports MySQL 5.1.x, 5.5.x, 5.6.x, and 5.7.x.
ApsaraMQ for RocketMQ5.xCanal supports both 4.x and 5.x instances. 5.x is recommended.

Step 1: Configure MySQL

1.1 Enable binary logging

ApsaraDB RDS for MySQL

Binary logging is enabled by default on ApsaraDB RDS for MySQL instances, and the required replication permissions are granted automatically. Skip to 1.2 Create a database.

Self-managed MySQL

  1. Edit the my.cnf file to enable binary logging in ROW format:

        [mysqld]
        log-bin=mysql-bin           # Enable binary logging
        binlog-format=ROW           # Required format for Canal
        server_id=1                 # Must differ from the Canal server ID
  2. Create a dedicated MySQL user with replication permissions:

        CREATE USER canal IDENTIFIED BY 'canal';
        GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
        FLUSH PRIVILEGES;

1.2 Create a database

Create a database named canal:

CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

1.3 Create a table

Create a table named students in the canal database:

CREATE TABLE students (
    id INT AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    age INT,
    gender VARCHAR(10),
    PRIMARY KEY (id)
);

Step 2: Deploy Canal

Run the following commands on the ECS instance where you plan to host Canal Deployer.

2.1 Install JDK

sudo yum install java-1.8.0-openjdk

2.2 Download and extract Canal Deployer

# Download Canal Deployer 1.1.6
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

# Create an installation directory and extract the archive
sudo mkdir -p /usr/local/canal-server
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server

2.3 Configure canal.properties

Open the main configuration file:

sudo vi /usr/local/canal-server/conf/canal.properties

Set the following parameters. Replace the placeholder values with your actual ApsaraMQ for RocketMQ instance details.

# Server mode -- deliver changes to ApsaraMQ for RocketMQ
canal.serverMode = rocketMQ

# ApsaraMQ for RocketMQ authentication credentials
# Find these on the Intelligent Authentication tab of the Access Control page
# in the ApsaraMQ for RocketMQ console.
canal.aliyun.accessKey = <your-access-key>
canal.aliyun.secretKey = <your-secret-key>

# Access channel -- set to "cloud" for Alibaba Cloud managed instances
canal.mq.accessChannel = cloud

# Message format -- must be false for ApsaraMQ for RocketMQ.
# ApsaraMQ for RocketMQ does not allow you to encapsulate multiple change records
# into a message. When set to false, consumers must deserialize the message body.
# For Java, use: com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[])
# For applications written in other programming languages, refer to the preceding
# Java method to implement the deserialization.
canal.mq.flatMessage = false

# Producer group name
rocketmq.producer.group = canal_test

# Message trace (disable unless you need tracing)
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =

# Namespace -- leave blank for ApsaraMQ for RocketMQ 5.x instances
rocketmq.namespace =

# ApsaraMQ for RocketMQ endpoint
# Find this on the Instance Details page in the ApsaraMQ for RocketMQ console.
rocketmq.namesrv.addr = <your-rocketmq-endpoint>

# Retry and channel settings
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

Replace the following placeholders with your actual values:

PlaceholderDescriptionWhere to find it
<your-access-key>Username for the ApsaraMQ for RocketMQ instanceIntelligent Authentication tab on the Access Control page
<your-secret-key>Password for the ApsaraMQ for RocketMQ instanceIntelligent Authentication tab on the Access Control page
<your-rocketmq-endpoint>Endpoint of the ApsaraMQ for RocketMQ instance (for example, rmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080)Instance Details page

2.4 Configure instance.properties

Open the instance configuration file:

sudo vi /usr/local/canal-server/conf/example/instance.properties

Set the following parameters to point Canal at your MySQL instance and target topic:

# MySQL connection
canal.instance.master.address = <your-mysql-endpoint>:3306
canal.instance.dbUsername = <your-mysql-username>
canal.instance.dbPassword = <your-mysql-password>

# Table filter -- listen to all tables in the canal schema
canal.instance.filter.regex = canal\\..*

# Target topic on the ApsaraMQ for RocketMQ instance
canal.mq.topic = canal_topic

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
<your-mysql-endpoint>Endpoint of the MySQL instancerm-uf62****.rwlb.rds.aliyuncs.com
<your-mysql-username>MySQL usernamecanal
<your-mysql-password>MySQL password--

2.5 Start Canal Deployer

/usr/local/canal-server/bin/startup.sh

2.6 Verify that Canal started successfully

  1. Check the Canal server log: A successful startup produces output similar to:

        sudo cat /usr/local/canal-server/logs/canal/canal.log
        2024-07-15 17:24:12.154 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
        2024-07-15 17:24:12.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
        2024-07-15 17:24:12.497 [main] INFO  c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
        2024-07-15 17:24:12.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
        2024-07-15 17:24:12.984 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
        2024-07-15 17:24:16.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  2. Check the Canal instance log: Look for the start successful message:

        sudo cat /usr/local/canal-server/logs/example/example.log
        2024-07-15 18:22:15.667 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
        2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
        2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
        2024-07-15 18:22:16.030 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

Step 3: Verify the pipeline

3.1 Insert a row into MySQL

Run the following SQL statement to insert a row into the students table created in Step 1.3:

INSERT INTO students (name, age, gender) VALUES ('Tome', 18, 'male');

3.2 Check the message in the ApsaraMQ for RocketMQ console

  1. Log on to the ApsaraMQ for RocketMQ console.

  2. In the left-side navigation pane, click Instances.

  3. Click the name of the instance you configured in Step 2.3.

  4. In the left-side navigation pane, click Message Query.

  5. Locate the message sent by Canal and verify that it contains the change data from the INSERT statement.

Message query result

If the message appears, the CDC pipeline is working. Canal continues to capture any INSERT, UPDATE, or DELETE operations on tables matching the canal\\..* filter and delivers them to the configured topic.

Configuration reference

The following table lists all ApsaraMQ for RocketMQ-related parameters in canal.properties.

ParameterDescriptionRequired
canal.serverModeDelivery target. Set to rocketMQ.Yes
canal.aliyun.accessKeyApsaraMQ for RocketMQ instance username.Yes
canal.aliyun.secretKeyApsaraMQ for RocketMQ instance password.Yes
canal.mq.accessChannelAccess method. Set to cloud for Alibaba Cloud instances.Yes
canal.mq.flatMessageMessage serialization format. Must be false for ApsaraMQ for RocketMQ.Yes
rocketmq.namesrv.addrApsaraMQ for RocketMQ endpoint.Yes
rocketmq.producer.groupProducer group name on the ApsaraMQ for RocketMQ instance.Yes
rocketmq.namespaceInstance namespace. Leave blank for 5.x instances.No
rocketmq.enable.message.traceEnable message tracing.No
rocketmq.customized.trace.topicTopic for storing message traces.No
rocketmq.retry.times.when.send.failedNumber of send retries on failure.No
rocketmq.vip.channel.enabledUse the VIP Netty channel.No
rocketmq.tagMessage tag.No

What to do next

  • To monitor CDC message flow, enable message tracing by setting rocketmq.enable.message.trace to true.

  • To capture changes from additional databases or tables, update the canal.instance.filter.regex parameter in instance.properties with the desired regular expression pattern.

  • To learn more about Canal configuration and features, see the Canal documentation.