ApsaraDB for SelectDB支援通過Doris Kafka連接器(Doris Kafka Connector)自動訂閱和同步Kafka中的資料。本文介紹Doris Kafka Connector同步資料到ApsaraDB for SelectDB的方法。
背景資訊
Kafka Connect是一款在Apache Kafka和其他系統之間進行可靠資料轉送的工具,可通過定義Connectors來將大量資料遷入或者遷出Kafka。
Doris社區提供的Kafka連接器,運行在Kafka Connect叢集中,支援從Kafka Topic中讀取資料,並將資料寫入ApsaraDB for SelectDB中。
在業務情境中,使用者通常會通過Debezium Connector將資料庫的變更資料推送至Kafka,或者調用API將JSON格式資料即時寫入Kafka。Doris Kafka Connector會自動訂閱Kafka中的資料,並將這些資料同步到ApsaraDB for SelectDB中。
Kafka Connect的運行模式
Kafka Connect有兩種種運行模式,您可以根據具體需求選擇合適的模式。
Standalone模式
不建議在生產環境中使用Standalone模式。
配置Standalone
配置connect-standalone.properties。
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
在Kafka config目錄下建立connect-selectdb-sink.properties,並配置如下內容:
name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
啟動Standalone
$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties
Distributed模式
配置Distributed
配置connect-distributed.properties。
# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
# 修改 group.id,同一叢集的需要一致
group.id=connect-cluster
啟動Distributed
$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
增加Connector
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
"name":"test-selectdb-sink-cluster",
"config":{
"connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
"topics":"topic_test",
"doris.topic2table.map": "topic_test:test_kafka_tbl",
"buffer.count.records":"10000",
"buffer.flush.time":"120",
"buffer.size.bytes":"5000000",
"doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
"doris.user":"admin",
"doris.password":"***",
"doris.database":"test_db",
"doris.http.port":"8080",
"doris.query.port":"9030",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}'
配置項
參數 | 說明 |
name | Connector的名稱。一般命名為不包含ISO控制符的字串,必須在Kafka Connect環境中唯一。 |
connector.class | Connector類的名稱或者別名。固定為 |
topics | 資料來源Topic。不同Topic之間以英文逗號(,)進行分隔。 |
doris.topic2table.map | Topic和Table表的對應關係,多個對應關係之間以英文逗號(,)進行分隔例: |
buffer.count.records | 在flush到ApsaraDB for SelectDB之前,每個Kafka分區在記憶體中緩衝的記錄數。預設10000條記錄。 |
buffer.flush.time | 記憶體中緩衝的重新整理間隔,單位為秒。預設為120秒。 |
buffer.size.bytes | 每個Kafka分區在記憶體中緩衝的記錄的累積大小,單位為位元組。預設為5000000。 |
doris.urls | ApsaraDB for SelectDB串連地址。 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取相關參數。 樣本:selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
doris.http.port | ApsaraDB for SelectDB的 HTTP協議連接埠,預設為8080。 |
doris.query.port | ApsaraDB for SelectDBMySQL協議連接埠,預設為9030。 |
doris.user | ApsaraDB for SelectDB使用者名稱。 |
doris.password | ApsaraDB for SelectDB密碼。 |
doris.database | 寫入資料的ApsaraDB for SelectDB的資料庫。 |
key.converter | 指定Key的JSON轉換器類。 |
value.converter | 指定Value的JSON轉換器類。 |
jmx | 通過 JMX 擷取 Connector 內部監控指標。具體操作,請參見Doris-Connector-JMX。預設為TRUE。 |
enable.delete | 是否同步刪除記錄, 預設 false。 |
label.prefix | 通過Stream Load匯入資料時的 label 首碼。預設為Connector應用程式名稱。 |
auto.redirect | 是否重新導向Stream Load請求。開啟後StreamLoad將通過FE重新導向到需要寫入資料的 BE,並且不再顯示擷取 BE 資訊。 |
load.model | 匯入資料的方式。當前支援以下兩種方式:
預設為 |
sink.properties.* | Stream Load 的匯入參數。 例如: 定義資料行分隔符號 更多資訊,請參見Stream Load。 |
delivery.guarantee | 消費 Kafka 資料匯入到ApsaraDB for SelectDB時,資料一致性的保障方式。 支援 當前 ApsaraDB for SelectDB 只能保障使用 copy into 匯入的資料 |
其他 Kafka Connect Sink 通用配置項,詳情請參見connect_configuring。
使用樣本
環境準備
安裝Apache Kafka叢集或Confluent Cloud,版本不低於2.4.0,本樣本以Kafka單機環境為基本環境。
#下載並解壓 wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -zxvf kafka_2.12-2.4.0.tgz bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties
下載doris-kafka-connector-1.0.0.jar,並將JAR包放到KAKFA_HOME/libs目錄下。
建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體。
通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體。
建立測試資料庫和測試表。
建立測試資料庫。
CREATE DATABASE test_db;
建立測試表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
樣本一:快速同步JSON資料
配置SelectDB Sink
以Standalone模式為例,在Kafka config目錄下建立selectdb-sink.properties,並配置如下內容:
name=selectdb_sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test_topic doris.topic2table.map=test_topic:example_tbl buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=*** doris.database=test_db key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter #配置無效信件佇列,可選 errors.tolerance=all errors.deadletterqueue.topic.name=test_error errors.deadletterqueue.context.headers.enable = true errors.deadletterqueue.topic.replication.factor=1
啟動Kafka Connect
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties
樣本二:使用Debezium同步MySQL資料到ApsaraDB for SelectDB
很多業務情境經常需要從業務資料庫中即時同步資料,這個時候就需要使用資料庫的變更資料擷取CDC(Change Data Capture)機制。
Debezium是基於Kafka Connect的CDC工具,可以對接MySQL、PostgreSQL、SQL Server、Oracle、MongoDB等多種資料庫,把資料庫的資料持續以統一的格式發送到Kafka的Topic中,以供下遊Sink端進行即時消費,本文以MySQL為例。
下載Debezium。
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
解壓下載檔案。
tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
將解壓後的所有JAR包放置到KAKFA_HOME/libs目錄下。
配置MySQL Source。
在Kafka config目錄下建立mysql-source.properties,並配置如下內容:
name=mysql-source connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com database.port=3306 database.user=testuser database.password=**** database.server.id=1 # kafka中的該client的唯一標識 database.server.name=test123 # 需要同步的資料庫和表,預設是同步所有資料庫和表 database.include.list=test table.include.list=test.test_table database.history.kafka.bootstrap.servers=localhost:9092 # 用於儲存資料庫表結構變化的 Kafka topic database.history.kafka.topic=dbhistory transforms=unwrap # 參考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState # 記錄刪除事件 transforms.unwrap.delete.handling.mode=rewrite
配置好之後,Kafka中預設的Topic名稱格式是
SERVER_NAME.DATABASE_NAME.TABLE_NAME
。說明Debezium配置請參見Debezium connector for MySQL。
配置雲資料庫 SelectDB 版 Sink。
在Kafka config目錄下建立selectdb-sink.properties,配置以下內容:
name=selectdb-sink connector.class=org.apache.doris.kafka.connector.DorisSinkConnector topics=test123.test.test_table doris.topic2table.map=test123.test.test_table:test_table buffer.count.records=10000 buffer.flush.time=120 buffer.size.bytes=5000000 doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com doris.http.port=8080 doris.query.port=9030 doris.user=admin doris.password=**** doris.database=test key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter #配置無效信件佇列,可選 #errors.tolerance=all #errors.deadletterqueue.topic.name=test_error #errors.deadletterqueue.context.headers.enable = true #errors.deadletterqueue.topic.replication.factor=1
說明同步到ApsaraDB for SelectDB時,需要提前建立好資料庫和表。
啟動Kafka Connect。
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
說明啟動後,可以觀察日誌logs/connect.log是否啟動成功。
使用進階
Connector操作
# 查看connector狀態
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 刪除當前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 暫停當前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 重啟當前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# 重啟connector內的tasks
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST
詳細資料請參見:Connect REST Interface。
無效信件佇列
預設情況下,轉換過程中或轉換過程中遇到的任何錯誤都會導致連接器失敗。每個連接器配置還可以通過跳過它們來容忍此類錯誤,可選擇將每個錯誤和失敗操作的詳細資料以及有問題的記錄(具有不同層級的詳細資料)寫入無效信件佇列以便記錄。
errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1
詳細資料請參見:connect_errorreporting。
訪問SSL認證的Kafka叢集
通過kafka-connect訪問SSL認證的Kafka叢集需要您提供用於認證Kafka Broker公開金鑰的認證檔案(client.truststore.jks)。您可以在connect-distributed.properties
檔案中增加以下配置:
# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234
關於通過kafka-connect串連SSL認證的Kafka叢集配置說明可以參考:Configure Kafka Connect。