全部產品
Search
文件中心

ApsaraDB for SelectDB:通過Kafka匯入資料

更新時間:Jul 06, 2024

ApsaraDB for SelectDB支援通過Doris Kafka連接器(Doris Kafka Connector)自動訂閱和同步Kafka中的資料。本文介紹Doris Kafka Connector同步資料到ApsaraDB for SelectDB的方法。

背景資訊

Kafka Connect是一款在Apache Kafka和其他系統之間進行可靠資料轉送的工具,可通過定義Connectors來將大量資料遷入或者遷出Kafka。

Doris社區提供的Kafka連接器,運行在Kafka Connect叢集中,支援從Kafka Topic中讀取資料,並將資料寫入ApsaraDB for SelectDB中。

在業務情境中,使用者通常會通過Debezium Connector將資料庫的變更資料推送至Kafka,或者調用API將JSON格式資料即時寫入Kafka。Doris Kafka Connector會自動訂閱Kafka中的資料,並將這些資料同步到ApsaraDB for SelectDB中。

Kafka Connect的運行模式

Kafka Connect有兩種種運行模式,您可以根據具體需求選擇合適的模式。

Standalone模式

警告

不建議在生產環境中使用Standalone模式。

配置Standalone

配置connect-standalone.properties。

# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092

在Kafka config目錄下建立connect-selectdb-sink.properties,並配置如下內容:

name=test-selectdb-sink
connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
topics=topic_test
doris.topic2table.map=topic_test:test_kafka_tbl
buffer.count.records=10000
buffer.flush.time=120
buffer.size.bytes=5000000
doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
doris.http.port=8030
doris.query.port=9030
doris.user=admin
doris.password=****
doris.database=test_db
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

啟動Standalone

$KAFKA_HOME/bin/connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-selectdb-sink.properties

Distributed模式

配置Distributed

配置connect-distributed.properties。

# 修改 broker 地址
bootstrap.servers=127.0.0.1:9092
 
# 修改 group.id,同一叢集的需要一致
group.id=connect-cluster

啟動Distributed

$KAFKA_HOME/bin/connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties

增加Connector

curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
  "name":"test-selectdb-sink-cluster",
  "config":{
    "connector.class":"org.apache.doris.kafka.connector.DorisSinkConnector",
    "topics":"topic_test",
    "doris.topic2table.map": "topic_test:test_kafka_tbl",
    "buffer.count.records":"10000",
    "buffer.flush.time":"120",
    "buffer.size.bytes":"5000000",
    "doris.urls":"selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com",
    "doris.user":"admin",
    "doris.password":"***",
    "doris.database":"test_db",
    "doris.http.port":"8080",
    "doris.query.port":"9030",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  }
}'

配置項

參數

說明

name

Connector的名稱。一般命名為不包含ISO控制符的字串,必須在Kafka Connect環境中唯一。

connector.class

Connector類的名稱或者別名。固定為org.apache.doris.kafka.connector.DorisSinkConnector

topics

資料來源Topic。不同Topic之間以英文逗號(,)進行分隔。

doris.topic2table.map

Topic和Table表的對應關係,多個對應關係之間以英文逗號(,)進行分隔例:topic1:tb1,topic2:tb2。預設為空白,表示Topic和Table名稱相同。

buffer.count.records

在flush到ApsaraDB for SelectDB之前,每個Kafka分區在記憶體中緩衝的記錄數。預設10000條記錄。

buffer.flush.time

記憶體中緩衝的重新整理間隔,單位為秒。預設為120秒。

buffer.size.bytes

每個Kafka分區在記憶體中緩衝的記錄的累積大小,單位為位元組。預設為5000000。

doris.urls

ApsaraDB for SelectDB串連地址。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取相關參數。

樣本:selectdb-cn4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

doris.http.port

ApsaraDB for SelectDB的 HTTP協議連接埠,預設為8080。

doris.query.port

ApsaraDB for SelectDBMySQL協議連接埠,預設為9030。

doris.user

ApsaraDB for SelectDB使用者名稱。

doris.password

ApsaraDB for SelectDB密碼。

doris.database

寫入資料的ApsaraDB for SelectDB的資料庫。

key.converter

指定Key的JSON轉換器類。

value.converter

指定Value的JSON轉換器類。

jmx

通過 JMX 擷取 Connector 內部監控指標。具體操作,請參見Doris-Connector-JMX。預設為TRUE。

enable.delete

是否同步刪除記錄, 預設 false。

label.prefix

通過Stream Load匯入資料時的 label 首碼。預設為Connector應用程式名稱。

auto.redirect

是否重新導向Stream Load請求。開啟後StreamLoad將通過FE重新導向到需要寫入資料的 BE,並且不再顯示擷取 BE 資訊。

load.model

匯入資料的方式。當前支援以下兩種方式:

  • stream_load方式直接將資料匯入到 SelectDB 中;

  • copy_into方式匯入資料至Object Storage Service,然後將資料載入至 SelectDB中。

預設為stream_load

sink.properties.*

Stream Load 的匯入參數。

例如: 定義資料行分隔符號'sink.properties.column_separator':','

更多資訊,請參見Stream Load

delivery.guarantee

消費 Kafka 資料匯入到ApsaraDB for SelectDB時,資料一致性的保障方式。 支援at_least_onceexactly_once,預設為at_least_once

當前 ApsaraDB for SelectDB 只能保障使用 copy into 匯入的資料exactly_once

說明

其他 Kafka Connect Sink 通用配置項,詳情請參見connect_configuring

使用樣本

環境準備

  1. 安裝Apache Kafka叢集或Confluent Cloud,版本不低於2.4.0,本樣本以Kafka單機環境為基本環境。

    #下載並解壓 
    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    tar -zxvf kafka_2.12-2.4.0.tgz
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
    bin/kafka-server-start.sh -daemon config/server.properties
  2. 下載doris-kafka-connector-1.0.0.jar,並將JAR包放到KAKFA_HOME/libs目錄下。

  3. 建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體

  4. 通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體

  5. 建立測試資料庫和測試表。

    1. 建立測試資料庫。

      CREATE DATABASE test_db;
    2. 建立測試表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

樣本一:快速同步JSON資料

  1. 配置SelectDB Sink

    以Standalone模式為例,在Kafka config目錄下建立selectdb-sink.properties,並配置如下內容:

    name=selectdb_sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test_topic
    doris.topic2table.map=test_topic:example_tbl
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=***
    doris.database=test_db
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    #配置無效信件佇列,可選
    errors.tolerance=all
    errors.deadletterqueue.topic.name=test_error
    errors.deadletterqueue.context.headers.enable = true
    errors.deadletterqueue.topic.replication.factor=1
  2. 啟動Kafka Connect

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties

樣本二:使用Debezium同步MySQL資料到ApsaraDB for SelectDB

很多業務情境經常需要從業務資料庫中即時同步資料,這個時候就需要使用資料庫的變更資料擷取CDC(Change Data Capture)機制。

Debezium是基於Kafka Connect的CDC工具,可以對接MySQL、PostgreSQL、SQL Server、Oracle、MongoDB等多種資料庫,把資料庫的資料持續以統一的格式發送到Kafka的Topic中,以供下遊Sink端進行即時消費,本文以MySQL為例。

  1. 下載Debezium。

    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.8.Final/debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  2. 解壓下載檔案。

    tar -zxvf debezium-connector-mysql-1.9.8.Final-plugin.tar.gz
  3. 將解壓後的所有JAR包放置到KAKFA_HOME/libs目錄下。

  4. 配置MySQL Source。

    在Kafka config目錄下建立mysql-source.properties,並配置如下內容:

    name=mysql-source
    connector.class=io.debezium.connector.mysql.MySqlConnector
    database.hostname=rm-bp17372257wkz****.rwlb.rds.aliyuncs.com
    database.port=3306
    database.user=testuser
    database.password=****
    database.server.id=1
    # kafka中的該client的唯一標識
    database.server.name=test123
    # 需要同步的資料庫和表,預設是同步所有資料庫和表
    database.include.list=test
    table.include.list=test.test_table
    database.history.kafka.bootstrap.servers=localhost:9092
    # 用於儲存資料庫表結構變化的 Kafka topic
    database.history.kafka.topic=dbhistory
    transforms=unwrap
    # 參考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    # 記錄刪除事件
    transforms.unwrap.delete.handling.mode=rewrite

    配置好之後,Kafka中預設的Topic名稱格式是SERVER_NAME.DATABASE_NAME.TABLE_NAME

    說明

    Debezium配置請參見Debezium connector for MySQL

  5. 配置雲資料庫 SelectDB 版 Sink。

    在Kafka config目錄下建立selectdb-sink.properties,配置以下內容:

    name=selectdb-sink
    connector.class=org.apache.doris.kafka.connector.DorisSinkConnector
    topics=test123.test.test_table
    doris.topic2table.map=test123.test.test_table:test_table
    buffer.count.records=10000
    buffer.flush.time=120
    buffer.size.bytes=5000000
    doris.urls=selectdb-cn-4xl3jv1****-public.selectdbfe.rds.aliyuncs.com
    doris.http.port=8080
    doris.query.port=9030
    doris.user=admin
    doris.password=****
    doris.database=test
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    #配置無效信件佇列,可選
    #errors.tolerance=all
    #errors.deadletterqueue.topic.name=test_error
    #errors.deadletterqueue.context.headers.enable = true
    #errors.deadletterqueue.topic.replication.factor=1
    說明

    同步到ApsaraDB for SelectDB時,需要提前建立好資料庫和表。

  6. 啟動Kafka Connect。

    bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
    說明

    啟動後,可以觀察日誌logs/connect.log是否啟動成功。

使用進階

Connector操作

# 查看connector狀態
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/status -X GET
# 刪除當前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster -X DELETE
# 暫停當前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/pause -X PUT
# 重啟當前connector
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/resume -X PUT
# 重啟connector內的tasks
curl -i http://127.0.0.1:8083/connectors/test-selectdb-sink-cluster/tasks/0/restart -X POST

詳細資料請參見:Connect REST Interface

無效信件佇列

預設情況下,轉換過程中或轉換過程中遇到的任何錯誤都會導致連接器失敗。每個連接器配置還可以通過跳過它們來容忍此類錯誤,可選擇將每個錯誤和失敗操作的詳細資料以及有問題的記錄(具有不同層級的詳細資料)寫入無效信件佇列以便記錄。

errors.tolerance=all
errors.deadletterqueue.topic.name=test_error_topic
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=1

詳細資料請參見:connect_errorreporting

訪問SSL認證的Kafka叢集

通過kafka-connect訪問SSL認證的Kafka叢集需要您提供用於認證Kafka Broker公開金鑰的認證檔案(client.truststore.jks)。您可以在connect-distributed.properties檔案中增加以下配置:

# Connect worker
security.protocol=SSL
ssl.truststore.location=/var/ssl/private/client.truststore.jks
ssl.truststore.password=test1234
 
# Embedded consumer for sink connectors
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/ssl/private/client.truststore.jks
consumer.ssl.truststore.password=test1234

關於通過kafka-connect串連SSL認證的Kafka叢集配置說明可以參考:Configure Kafka Connect