本文以MySQL資料庫為例介紹如何使用Canal接入雲訊息佇列 RocketMQ 版,實現MySQL資料庫Binlog資料的變更處理。
背景資訊
CDC(Change Data Capture)是一種監測並捕獲資料庫變更的典型技術方案,常應用於異構資料來源之間的資料同步。Canal作為一款輕量級的CDC工具,可基於資料庫增量日誌解析,提供增量變更資料的訂閱和消費能力。Canal可以將變更記錄可靠地投遞到雲訊息佇列 RocketMQ 版中,藉助雲訊息佇列 RocketMQ 版豐富的訊息處理策略實現多樣化的商務邏輯。
Canal是一個開源專案,倉庫地址請參見Canal。
應用情境
基於Binlog日誌實現增量訂閱和消費的典型業務情境如下:
資料庫鏡像
資料庫即時備份
索引構建和即時維護(拆分異構索引、倒排索引等)
業務Cache重新整理
帶商務邏輯的增量資料處理
方案介紹
基於Canal和雲訊息佇列 RocketMQ 版的CDC方案如下:

如上圖所示,Canal將自己偽裝成庫,監聽並接收資料庫的Binlog,並同步到雲訊息佇列 RocketMQ 版等儲存或其他中介軟體系統。
具體操作步驟如下:
配置MySQL:開啟MySQL的Binlog功能,建立測試需要的資料庫和表。
部署Canal:部署一個canal-deployer(server),監聽並接收MySQL資料庫的Binlog。
測實驗證:驗證資料變動後訊息發送的情況。
環境要求
資源要求
處於運行中狀態的雲訊息佇列 RocketMQ 版執行個體。執行個體建立,請參見建立訊息佇列RocketMQ版執行個體。
處於運行中的MySQL執行個體。本文以阿里雲RDS MySQL為例,執行個體建立,請參見建立RDS MySQL執行個體。
用於部署運行Canal相關組件的機器。本文以ECS為例,執行個體建立和使用,請參見通過控制台使用ECS執行個體(快捷版)。
網路要求
部署canal-deployer(server)的節點可以串連資料庫和雲訊息佇列 RocketMQ 版執行個體,一般位於VPC內的ECS和容器都可以串連。
版本要求
服務 | 版本 | 說明 |
Canal | 1.1.6 | 其他版本請參見Canal Release。 |
MySQL | 8.0 | 支援源端MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。 |
雲訊息佇列 RocketMQ 版 | 5.x |
|
1.配置MySQL
1.1 開啟Binlog功能
阿里雲RDS MySQL
阿里雲RDS MySQL預設已開啟Binlog功能,並且帳號預設具有Binlog dump許可權,可以直接跳過這一步。
自建MySQL
開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下:
[mysqld] log-bin=mysql-bin # 開啟binlog binlog-format=ROW # 選擇ROW模式 server_id=1 # 配置MySQL replaction需要定義,不要和canal的slaveId重複建立使用者canal並授權MySQL slave 的許可權。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1.2 建立資料庫
執行下面的SQL,建立一個名為canal的資料庫。
CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;1.3 建立表
執行下面的SQL,在canal資料庫建立一個名為students的表。
CREATE TABLE students (
id INT AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT,
gender VARCHAR(10),
PRIMARY KEY (id)
);2.部署Canal
2.1 安裝JDK
執行下面的命令,安裝JDK。
sudo yum install java-1.8.0-openjdk2.2 下載canal-deployer
執行下面的命令,下載canal-deployer安裝包。
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz2.3 解壓縮安裝包
執行下面的命令,建立目錄canal-server,並將下載的安裝包解壓縮到canal-server目錄中。
# 建立目錄canal-server
sudo mkdir -p /usr/local/canal-server
# 將下載的安裝包解壓縮到canal-server目錄中
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server2.4 修改配置
配置Canal啟動的參數,詳細配置請參見參數說明。
執行下面的命令,配置canal.properties。
sudo vi /usr/local/canal-server/conf/canal.properties# 服務端模式
canal.serverMode = rocketMQ
# AccessKey ID,阿里雲身分識別驗證標識。
canal.aliyun.accessKey = 6W0xz2uPf******
# AccessKey Secret,阿里雲身分識別驗證密鑰
canal.aliyun.secretKey = sK56k1DrGx******
# 訊息佇列接入的方式
canal.mq.accessChannel = cloud
# 訊息發送格式。雲訊息佇列 RocketMQ 版不支援批量發送,canal.mq.flatMessage需要設定成false;消費端擷取到的訊息body後需還原序列化body內容;Java語言可使用com.alibaba.otter.canal.client.CanalMessageDeserializer#deserializer(byte[]) 方法轉化,其他語言需參照該方法自行實現。
canal.mq.flatMessage = false
# 雲訊息佇列 RocketMQ 版執行個體中Group名
rocketmq.producer.group = canal_test
# 是否開啟訊息軌跡
rocketmq.enable.message.trace = false
# message trace的topic
rocketmq.customized.trace.topic =
# 雲訊息佇列 RocketMQ 版執行個體的命名空間。雲訊息佇列 RocketMQ 版5.x執行個體無需填寫該參數。
rocketmq.namespace =
# 雲訊息佇列 RocketMQ 版執行個體的存取點。在雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面擷取。
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# 重試次數
rocketmq.retry.times.when.send.failed = 0
# 是否啟用VIP Netty通道發送訊息
rocketmq.vip.channel.enabled = false
# 訊息的tag配置
rocketmq.tag = 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
執行下面的命令,配置instance.properties。
sudo vi /usr/local/canal-server/conf/example/instance.properties# 阿里雲RDS MySQL資料庫的串連地址
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# 阿里雲RDS MySQL資料庫的帳號
canal.instance.dbUsername=xxx
# 阿里雲RDS MySQL資料庫的密碼
canal.instance.dbPassword=xxx
# mysql 資料解析關注的表,PerlRegex,canal\\..*表示canal schema下所有表
canal.instance.filter.regex=canal\\..*
# 雲訊息佇列 RocketMQ 版執行個體的topic名稱
canal.mq.topic=canal_topic2.5 啟動canal-deployer
執行下面的命令啟動canal-deployer。
/usr/local/canal-server/bin/startup.sh2.6 驗證啟動
執行下面的命令查看canal.log記錄檔,確認Canal成功啟動。
sudo vi /usr/local/canal-server/logs/canal/canal.log 2024-07-15 17:24:12.154 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......執行下面的命令查看example.log記錄檔,確認Canal Instance成功啟動。
sudo vi /usr/local/canal-server/logs/example/example.log 2024-07-15 18:22:15.667 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO c.a.otter.canal.instance.core.AbstractCanalIn3.測實驗證
3.1 向MYSQL資料庫中添加資料
執行下面的SQL,向步驟1.3 建立表所建立的表students添加一條資料。
INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');3.2 查看Canal發送的訊息
登入雲訊息佇列 RocketMQ 版控制台,找到部署時配置的執行個體,在訊息查詢頁面查看訊息如下:
