ApsaraDB for SelectDB can automatically subscribe to data in Kafka and synchronize data from Kafka by using Doris Kafka Connector. This topic describes how to use Doris Kafka Connector to synchronize data from a Kafka data source to an ApsaraDB for SelectDB instance.
Background information
Kafka Connect is a reliable tool for transferring data between Apache Kafka and other systems. You can define connectors to import data to or export data from a Kafka data source.
Doris Kafka Connector provided by Apache Doris runs in a Kafka Connect cluster. Doris Kafka Connector can read data from Kafka topics and write the data to ApsaraDB for SelectDB.
In business scenarios, you may push the updated data in databases to Kafka by using Debezium Connector or call API operations to write JSON-formatted data to Kafka in real time. Then, you can use Doris Kafka Connector to automatically subscribe to data in Kafka and synchronize the data to an ApsaraDB for SelectDB instance.
Operation modes of Kafka Connect
Kafka Connect provides standalone mode and distributed mode. You can use an operation mode based on your business requirements.
Standalone mode
We recommend that you do not use the standalone mode in the production environment.
Configure the standalone mode
Configure the connect-standalone.properties file.
# Modify the IP address of the broker.
bootstrap.servers=127.0.0.1:9092
Create the connect-selectdb-sink.properties file in the Kafka config directory and configure the following items in the file:
name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Enable the standalone mode
$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties
Distributed mode
Configure the distributed mode
Configure the connect-distributed.properties file.
# Modify the IP address of the broker.
bootstrap.servers=127.0.0.1:9092
# Modify the group ID. The group ID must be consistent in the same cluster.
group.id=connect-cluster
Enable the distributed mode
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
Configure a connector
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-selectdb-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
"doris.user":"admin",
"doris.password":"***",
"doris.database":"test_db",
"doris.http.port":"8080",
"doris.query.port":"9030",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'
Parameters
Parameter | Description |
name | The name of the connector. In most cases, the name is a string that does not contain ISO control characters and must be unique in the Kafka Connect cluster. |
connector.class | The class name or alias of the connector. Set the value to |
topics | The name of the topic that serves as the data source. Separate multiple topic names with commas (,). |
doris.topic2table.map | The mapping between a topic and a table. Separate multiple mapping relationships with commas (,). Example: |
buffer.count.records | The number of data records that are buffered in the memory for each Kafka partition before data is flushed to the ApsaraDB for SelectDB instance. Default value: 10000. |
buffer.flush.time | The interval at which the buffer is refreshed in the memory. Unit: seconds. Default value: 120. |
buffer.size.bytes | The cumulative size of the data records that are buffered in the memory for each Kafka partition. Unit: bytes. Default value: 5000000. |
doris.urls | The endpoint of the ApsaraDB for SelectDB instance. To obtain the endpoint of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console. Go to the Instance Details page of the instance and view the endpoint in the Network Information section. Example: selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
doris.http.port | The HTTP port number of the ApsaraDB for SelectDB instance. Default value: 8080. |
doris.query.port | The MySQL port number of the ApsaraDB for SelectDB instance. Default value: 9030. |
doris.user | The username that is used to connect to the ApsaraDB for SelectDB instance. |
doris.password | The password of the username that is used to connect to the ApsaraDB for SelectDB instance. |
doris.database | The name of the database to which data is written in the ApsaraDB for SelectDB instance. |
key.converter | The converter class for converting specific keys in the JSON-formatted data. |
value.converter | The converter class for converting specific values in the JSON-formatted data. |
jmx | Specifies whether to use Java Management Extensions (JMX) to monitor Doris Kafka Connector. For more information, see Use JMX to monitor Doris Kafka Connector. Default value: TRUE. |
enable.delete | Specifies whether to simultaneously delete records. Default value: false. |
label.prefix | The label prefix that is used when you import data by using Stream Load. The default value is the name of the connector. |
auto.redirect | Specifies whether to redirect Stream Load requests. If you enable the auto redirect feature, Stream Load requests are redirected to the backend (BE) to which data is to be written over the frontend (FE). The BE information is no longer displayed. |
load.model | The mode in which the data is imported. Valid values:
Default value: |
sink.properties.* | The parameters for importing data by using Stream Load. For example, the For more information, see Import data by using Stream Load. |
delivery.guarantee | The method that is used to ensure data consistency when the consumed Kafka data is imported to ApsaraDB for SelectDB. Valid values: You can set this parameter to |
enable.2pc | Specifies whether to enable the two-phase commit mode. You can enable the two-phase commit mode to ensure the exactly-once semantics. |
For more information about other common configuration items of Kafka Connect Sink, see the Configuring Connectors section in Kafka 3.7 Documentation.
Examples
Prepare the environment
Install an Apache Kafka cluster whose version is 2.4.0 or later or Confluent Cloud. In this example, a standalone Kafka cluster is used.
# Download and decompress the package. wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -zxvf kafka_2.12-2.4.0.tgz bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties
Download the doris-kafka-connector-1.0.0.jar package and store the JAR package in the KAKFA_HOME/libs directory.
Create an ApsaraDB for SelectDB instance. For more information, see Create an instance.
Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an instance.
Create a test database and a test table.
Execute the following statement to create a test database:
CREATE DATABASE test_db;
Execute the following statements to create a test table:
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Example 1: Synchronize JSON-formatted data
Configure an ApsaraDB for SelectDB Sink.
In this example, the standalone mode is used. Create the selectdb-sink.properties file in the Kafka config directory and configure the following items in the file:
name=selectdb_sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test_topic doris.topic2table.map=test_topic:example_tbl buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=*** doris.database=test_db key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Optional. Configure a dead-letter queue. errors.tolerance=all errors.deadletterqueue.topic.name=test_error errors.deadletterqueue.context.headers.enable = true errors.deadletterqueue.topic.replication.factor=1
Start Kafka Connect.
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties
Example 2: Use Debezium Connector to synchronize data from a MySQL database to an ApsaraDB for SelectDB instance
In some business scenarios, you need to synchronize data from business databases in real time. In this case, the change data capture (CDC) mechanism is required.
Debezium Connector is a CDC tool that is developed based on Kafka Connect. Debezium Connector can connect to various databases, such as MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB, and continuously send data from the databases to Kafka topics in a unified format for downstream sinks to consume in real time. In this example, a MySQL data source is used.
Download the Debezium Connector package.
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
Decompress the downloaded package.
tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
Store all the extracted JAR packages in the KAKFA_HOME/libs directory.
Configure the MySQL data source.
Create the mysql-source.properties file in the Kafka config directory and configure the following items in the file:
name=mysql-source connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com database.port=3306 database.user=testuser database.password=**** database.server.id=1 # The unique identifier of the client in Kafka. database.server.name=test123 # The database and table from which data is synchronized. By default, data is synchronized from all databases and tables. database.include.list=test table.include.list=test.test_table database.history.kafka.bootstrap.servers=localhost:9092 # The Kafka topic that is used to store the schema change events of the database and table. database.history.kafka.topic=dbhistory transforms=unwrap # For more information, visit https://debezium.io/documentation/reference/stable/transformations/event-flattening.html. transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # Record change events for DELETE operations. transforms.unwrap.delete.handling.mode=rewrite
After the configuration is complete, the Kafka topic is named in the
SERVER_NAME.DATABASE_NAME.TABLE_NAME
format by default.NoteFor more information about how to configure Debezium Connector, see Debezium connector for MySQL.
Configure an ApsaraDB for SelectDB Sink.
Create the selectdb-sink.properties file in the Kafka config directory and configure the following items in the file:
name=selectdb-sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test123.test.test_table doris.topic2table.map=test123.test.test_table:test_table buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=**** doris.database=test key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Optional. Configure a dead-letter queue. #errors.tolerance=all #errors.deadletterqueue.topic.name=test_error #errors.deadletterqueue.context.headers.enable = true #errors.deadletterqueue.topic.replication.factor=1
NoteBefore you synchronize data to an ApsaraDB for SelectDB instance, you must create a database and a table in the instance.
Start Kafka Connect.
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
NoteAfter you start Kafka Connect, you can view the logs/connect.log file to check whether Kafka Connect is started.
Advanced usage
Manage connectors
# Query the status of the connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# Delete the current connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# Pause the current connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# Restart the current connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# Restart the tasks in the connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST
For more information, see Kafka Connect REST Interface for Confluent Platform.
Configure a dead-letter queue
By default, the conversion process or the errors that occur during the conversion process may cause the connector to fail. You can also tolerate such errors by configuring the connector to skip the errors. You can write the details of errors, failed operations, and abnormal data records with various levels of details to a dead-letter queue.
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1
For more information, see Error Reporting in Connect.
Access a Kafka cluster with SSL authentication
To access a Kafka cluster with SSL authentication by using Kafka Connect, you must provide the certificate file client.truststore.jks that is used to authenticate the public key of a Kafka broker. You can add the following configurations to the connect-distributed.properties
file:
# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234
For more information about the configurations for accessing a Kafka cluster with SSL authentication by using Kafka Connect, see Configure Kafka Connect.