Canal is used to subscribe to and consume incremental data by parsing incremental logs of MySQL databases. In the early days, Alibaba Group needed to synchronize data between the data centers in Hangzhou and the United States. The implementation method was to obtain incremental changes based on business triggers. Since 2010, Alibaba Group began to obtain incremental changes by parsing database logs. Such transformation boosts the subscription to and consumption of the incremental data in databases. Canal supports source databases whose engines are MySQL 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x.
Background information
Canal is used to subscribe to and consume incremental data by parsing incremental logs of MySQL databases. In the early days, Alibaba Group needed to synchronize data between the data centers in Hangzhou and the United States. The implementation method was to obtain incremental changes based on business triggers. Since 2010, Alibaba Group began to obtain incremental changes by parsing database logs. Such transformation boosts the subscription to and consumption of the incremental data in databases. Canal supports source databases whose engines are MySQL 5.1.x, 5.5.x, 5.6.x, 5.7.x, and 8.0.x.
Canal allows you to write data to Kafka, and DataHub is compatible with the Kafka protocol. Therefore, you can use Canal to write incremental data from MySQL to DataHub. To ensure that Canal can write data to DataHub as Canal can write data to Kafka, the following necessary changes have been made to the open source Canal framework:
Kafka TopicName corresponds to ProjectName.TopicName in DataHub. Therefore, the logic used to replace the period (.) in Kafka TopicName with the underscore (_) is removed from the open source Canal framework. This change ensures that Kafka TopicName can be mapped to the correct DataHub topic.
DataHub uses the PLAIN Simple Authentication and Security Layer (SASL) for authentication. Therefore, the environment variable
-Djava.security.auth.login.config=$kafka_jaas_conf
is added to the startup script.
Instructions
This topic provides a basic example on how to use Canal to write data to DataHub like Kafka. For more information about parameters and parameter descriptions, see canal.
1. Download the canal.deployer package
Download the canal.deployer-1.1.5-SNAPSHOT.tar.gz package. Canal that has not been modified for DataHub may not be able to write data to DataHub.
2. Copy the canal.deployer package to a fixed directory and decompress the package
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
3. Modify parameters
3.1 Modify the instance configuration file conf/example/instance.properties
# Modify the database information as needed.
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password: the username and password of the database.
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# Specify a dynamic topic based on the database name or table name.
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
# Database name.Table name: the unique primary key. Multiple tables are separated with comma (,).
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
The MySQL database whose IP address is specified must be initialized and configured. For more information, see QuickStart. For more information about the dynamic topic name based on the database name and the settings of the primary hash key, see MQ-related parameters.
3.2 Modify the Canal configuration file conf/canal.properties
# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
You must configure the canal.serverMode, kafka.bootstrap.servers, kafka.security.protocol, and kafka.sasl.mechanism
parameters. You can also modify other parameters as required. The kafka.bootstrap.servers
parameter specifies the endpoint of Kafka in the region where the destination topic resides. For more information about available endpoints of Kafka, see Compatibility with Kafka.
3.3 Modify the JASS configuration file conf/kafka_client_producer_jaas.conf
kafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
4. Enable and disable Canal
Before you enable Canal, make sure that a DataHub topic is created. For more information about the requirements for the created topic, see Compatibility with Kafka.
4.1 Enable Canal
cd /usr/local/canal/
sh bin/startup.sh
4.2 View logs
Run the vi logs/canal/canal.log
command to view the canal.log file in the logs/canal/ directory.
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
Run the vi logs/example/example.log
command to view the logs of an instance.
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
Run the vi logs/example/meta.log
command to view metadata logs.
A record is generated for each insertion, deletion, and modification of the database in the meta.log file. You can view the meta.log file to check whether Canal has collected data.
tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]
4.3 Disable Canal
cd /usr/local/canal/
sh bin/stop.sh
Examples
DataHub Topic
The destination DataHub topic is of the TUPLE type and has the following schema:
+-------+------+----------+-------------+
| Index | name | type | allow NULL |
+-------+------+----------+-------------+
| 0 | key | STRING | true |
| 1 | val | STRING | true |
+-------+------+----------+-------------+
MySQL
Schema of the source MySQL table
mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid | int(11) | YES | | NULL | |
| pid | int(11) | YES | | NULL | |
| num | int(11) | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)
Data
After the data is written to DataHub, the key field is null, and the value of the val field is a JSON string.
mysql> insert into orders values(1,2,3);
{
"data":[
{
"oid":"1",
"pid":"2",
"num":"3"
}
],
"database":"ggtt",
"es":1591092305000,
"id":2,
"isDdl":false,
"mysqlType":{
"oid":"int(11)",
"pid":"int(11)",
"num":"int(11)"
},
"old":null,
"pkNames":null,
"sql":"",
"sqlType":{
"oid":4,
"pid":4,
"num":4
},
"table":"orders",
"ts":1591092305813,
"type":"INSERT"
}