All Products
Search
Document Center

ApsaraDB for SelectDB:Import data by using Doris Kafka Connector

Last Updated:Dec 09, 2024

ApsaraDB for SelectDB can automatically subscribe to data in Kafka and synchronize data from Kafka by using Doris Kafka Connector. This topic describes how to use Doris Kafka Connector to synchronize data from a Kafka data source to an ApsaraDB for SelectDB instance.

Background information

Kafka Connect is a reliable tool for transferring data between Apache Kafka and other systems. You can define connectors to import data to or export data from a Kafka data source.

Doris Kafka Connector provided by Apache Doris runs in a Kafka Connect cluster. Doris Kafka Connector can read data from Kafka topics and write the data to ApsaraDB for SelectDB.

In business scenarios, you may push the updated data in databases to Kafka by using Debezium Connector or call API operations to write JSON-formatted data to Kafka in real time. Then, you can use Doris Kafka Connector to automatically subscribe to data in Kafka and synchronize the data to an ApsaraDB for SelectDB instance.

Operation modes of Kafka Connect

Kafka Connect provides standalone mode and distributed mode. You can use an operation mode based on your business requirements.

Standalone mode

Warning

We recommend that you do not use the standalone mode in the production environment.

Configure the standalone mode

Configure the connect-standalone.properties file.

# Modify the IP address of the broker.
bootstrap.servers=127.0.0.1:9092

Create the connect-selectdb-sink.properties file in the Kafka config directory and configure the following items in the file:

name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Enable the standalone mode

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

Distributed mode

Configure the distributed mode

Configure the connect-distributed.properties file.

# Modify the IP address of the broker.
bootstrap.servers=127.0.0.1:9092
 
# Modify the group ID. The group ID must be consistent in the same cluster.
group.id=connect-cluster

Enable the distributed mode

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

Configure a connector

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-selectdb-sink-cluster",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "topics":"topic_test",
    "doris.topic2table.map": "topic_test:test_kafka_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
    "doris.user":"admin",
    "doris.password":"***",
    "doris.database":"test_db",
    "doris.http.port":"8080",
    "doris.query.port":"9030",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  }
}'

Parameters

Parameter

Description

name

The name of the connector. In most cases, the name is a string that does not contain ISO control characters and must be unique in the Kafka Connect cluster.

connector.class

The class name or alias of the connector. Set the value to com.selectdb.kafka.connector.SelectdbSinkConnector.

topics

The name of the topic that serves as the data source. Separate multiple topic names with commas (,).

doris.topic2table.map

The mapping between a topic and a table. Separate multiple mapping relationships with commas (,). Example: topic1:tb1,topic2:tb2. By default, this parameter is left empty, which indicates that the topic and table share the same name.

buffer.count.records

The number of data records that are buffered in the memory for each Kafka partition before data is flushed to the ApsaraDB for SelectDB instance. Default value: 10000.

buffer.flush.time

The interval at which the buffer is refreshed in the memory. Unit: seconds. Default value: 120.

buffer.size.bytes

The cumulative size of the data records that are buffered in the memory for each Kafka partition. Unit: bytes. Default value: 5000000.

doris.urls

The endpoint of the ApsaraDB for SelectDB instance.

To obtain the endpoint of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console. Go to the Instance Details page of the instance and view the endpoint in the Network Information section.

Example: selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

doris.http.port

The HTTP port number of the ApsaraDB for SelectDB instance. Default value: 8080.

doris.query.port

The MySQL port number of the ApsaraDB for SelectDB instance. Default value: 9030.

doris.user

The username that is used to connect to the ApsaraDB for SelectDB instance.

doris.password

The password of the username that is used to connect to the ApsaraDB for SelectDB instance.

doris.database

The name of the database to which data is written in the ApsaraDB for SelectDB instance.

key.converter

The converter class for converting specific keys in the JSON-formatted data.

value.converter

The converter class for converting specific values in the JSON-formatted data.

jmx

Specifies whether to use Java Management Extensions (JMX) to monitor Doris Kafka Connector. For more information, see Use JMX to monitor Doris Kafka Connector. Default value: TRUE.

enable.delete

Specifies whether to simultaneously delete records. Default value: false.

label.prefix

The label prefix that is used when you import data by using Stream Load. The default value is the name of the connector.

auto.redirect

Specifies whether to redirect Stream Load requests. If you enable the auto redirect feature, Stream Load requests are redirected to the backend (BE) to which data is to be written over the frontend (FE). The BE information is no longer displayed.

load.model

The mode in which the data is imported. Valid values:

  • stream_load: Data is directly imported to ApsaraDB for SelectDB.

  • copy_into: Data is imported to Object Storage Service (OSS) and then loaded to ApsaraDB for SelectDB.

Default value: stream_load.

sink.properties.*

The parameters for importing data by using Stream Load.

For example, the sink.properties.column_separator parameter specifies the column delimiter.

For more information, see Import data by using Stream Load.

delivery.guarantee

The method that is used to ensure data consistency when the consumed Kafka data is imported to ApsaraDB for SelectDB. Valid values: at_least_once and exactly_once. Default value: at_least_once.

You can set this parameter to exactly_once only if you set the load.model parameter to copy_into.ApsaraDB for SelectDB

enable.2pc

Specifies whether to enable the two-phase commit mode. You can enable the two-phase commit mode to ensure the exactly-once semantics.

Note

For more information about other common configuration items of Kafka Connect Sink, see the Configuring Connectors section in Kafka 3.7 Documentation.

Examples

Prepare the environment

  1. Install an Apache Kafka cluster whose version is 2.4.0 or later or Confluent Cloud. In this example, a standalone Kafka cluster is used.

    # Download and decompress the package. 
    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    tar -zxvf kafka_2.12-2.4.0.tgz
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
    bin/kafka-server-start.sh -daemon config/server.properties
  2. Download the doris-kafka-connector-1.0.0.jar package and store the JAR package in the KAKFA_HOME/libs directory.

  3. Create an ApsaraDB for SelectDB instance. For more information, see Create an instance.

  4. Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an instance.

  5. Create a test database and a test table.

    1. Execute the following statement to create a test database:

      CREATE DATABASE test_db;
    2. Execute the following statements to create a test table:

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Example 1: Synchronize JSON-formatted data

  1. Configure an ApsaraDB for SelectDB Sink.

    In this example, the standalone mode is used. Create the selectdb-sink.properties file in the Kafka config directory and configure the following items in the file:

    name=selectdb_sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test_topic
    doris.topic2table.map=test_topic:example_tbl
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=***
    doris.database=test_db
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # Optional. Configure a dead-letter queue.
    errors.tolerance=all
    errors.deadletterqueue.topic.name=test_error
    errors.deadletterqueue.context.headers.enable = true
    errors.deadletterqueue.topic.replication.factor=1
  2. Start Kafka Connect.

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

Example 2: Use Debezium Connector to synchronize data from a MySQL database to an ApsaraDB for SelectDB instance

In some business scenarios, you need to synchronize data from business databases in real time. In this case, the change data capture (CDC) mechanism is required.

Debezium Connector is a CDC tool that is developed based on Kafka Connect. Debezium Connector can connect to various databases, such as MySQL, PostgreSQL, SQL Server, Oracle, and MongoDB, and continuously send data from the databases to Kafka topics in a unified format for downstream sinks to consume in real time. In this example, a MySQL data source is used.

  1. Download the Debezium Connector package.

    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  2. Decompress the downloaded package.

    tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  3. Store all the extracted JAR packages in the KAKFA_HOME/libs directory.

  4. Configure the MySQL data source.

    Create the mysql-source.properties file in the Kafka config directory and configure the following items in the file:

    name=mysql-source
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
    database.port=3306
    database.user=testuser
    database.password=****
    database.server.id=1
    # The unique identifier of the client in Kafka.
    database.server.name=test123
    # The database and table from which data is synchronized. By default, data is synchronized from all databases and tables.
    database.include.list=test
    table.include.list=test.test_table
    database.history.kafka.bootstrap.servers=localhost:9092
    # The Kafka topic that is used to store the schema change events of the database and table.
    database.history.kafka.topic=dbhistory
    transforms=unwrap
    # For more information, visit https://debezium.io/documentation/reference/stable/transformations/event-flattening.html.
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    # Record change events for DELETE operations.
    transforms.unwrap.delete.handling.mode=rewrite

    After the configuration is complete, the Kafka topic is named in the SERVER_NAME.DATABASE_NAME.TABLE_NAME format by default.

    Note

    For more information about how to configure Debezium Connector, see Debezium connector for MySQL.

  5. Configure an ApsaraDB for SelectDB Sink.

    Create the selectdb-sink.properties file in the Kafka config directory and configure the following items in the file:

    name=selectdb-sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test123.test.test_table
    doris.topic2table.map=test123.test.test_table:test_table
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=****
    doris.database=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    # Optional. Configure a dead-letter queue.
    #errors.tolerance=all
    #errors.deadletterqueue.topic.name=test_error
    #errors.deadletterqueue.context.headers.enable = true
    #errors.deadletterqueue.topic.replication.factor=1
    Note

    Before you synchronize data to an ApsaraDB for SelectDB instance, you must create a database and a table in the instance.

  6. Start Kafka Connect.

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
    Note

    After you start Kafka Connect, you can view the logs/connect.log file to check whether Kafka Connect is started.

Advanced usage

Manage connectors

# Query the status of the connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# Delete the current connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# Pause the current connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# Restart the current connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# Restart the tasks in the connector.
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST

For more information, see Kafka Connect REST Interface for Confluent Platform.

Configure a dead-letter queue

By default, the conversion process or the errors that occur during the conversion process may cause the connector to fail. You can also tolerate such errors by configuring the connector to skip the errors. You can write the details of errors, failed operations, and abnormal data records with various levels of details to a dead-letter queue.

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

For more information, see Error Reporting in Connect.

Access a Kafka cluster with SSL authentication

To access a Kafka cluster with SSL authentication by using Kafka Connect, you must provide the certificate file client.truststore.jks that is used to authenticate the public key of a Kafka broker. You can add the following configurations to the connect-distributed.properties file:

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
 
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

For more information about the configurations for accessing a Kafka cluster with SSL authentication by using Kafka Connect, see Configure Kafka Connect.