全部產品
Search
文件中心

DataWorks:Kafka資料來源

更新時間:Jun 19, 2024

Kafka資料來源為您提供讀取和寫入Kafka的雙向通道,本文為您介紹DataWorks的Kafka資料同步的能力支援情況。

支援的版本

支援阿里雲Kafka,以及=0.10.2且<=2.2.x的自建Kafka版本。

說明

對於<0.10.2版本Kafka,由於Kafka不支援檢索分區資料offset,並且Kafka資料結構可能不支援時間戳記,進而無法支援資料同步。

使用限制

Kafka資料來源目前僅支援使用獨享Data Integration資源群組

單表離線讀

同時配置parameter.groupId和parameter.kafkaConfig.group.id時,parameter.groupId優先順序高於kafkaConfig配置資訊中的group.id。

單表即時寫

寫入資料不支援去重,即如果任務重設位點或者Failover後再啟動,會導致有重複資料寫入。

整庫即時寫

  • 即時資料同步任務僅支援使用獨享Data Integration資源群組

  • 對於源端同步表有主鍵的情境,同步時會使用主索引值作為kafka記錄的key,確保同主鍵的變更有序寫入kafka的同一分區。

  • 對於源端同步表無主鍵的情境,如果選擇了支援無主鍵表同步選項,則同步時kafka記錄的key為空白。如果要確保表的變更有序寫入kafka,則選擇寫入的kafka topic必須是單分區。如果選擇了自訂同步主鍵,則同步時使用其他非主鍵的一個或幾個欄位的聯合,代替主鍵作為kafka記錄的key。

  • 如果在kafka叢集發生響應異常的情況下,仍要確保有主鍵表同主鍵的變更有序寫入kafka的同一分區,則需要在配置kafka資料來源時,在擴充參數表單中加入如下配置。

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    重要

    添加配置後同步效能會大幅下降,需要在效能和嚴格保序可靠性之間做好權衡。

  • 即時同步寫入kafka的訊息總體格式、同步任務心跳訊息格式及源端更改資料對應的訊息格式,詳情請參見:附錄:訊息格式

支援的欄位類型

Kafka的資料存放區為非結構化的儲存,通常Kafka記錄的資料模組有key、value、offset、timestamp、headers、partition。DataWorks在對Kafka資料進行讀寫時,會按照以下的策略進行資料處理。

離線讀資料

DataWorks讀取Kafka資料時,支援對Kafka資料進行JSON格式的解析,各資料模組的處理方式如下。

Kafka記錄資料模組

處理後的資料類型

key

取決於資料同步任務配置的keyType配置項,keyType參數介紹請參見下文的全量參數說明章節。

value

取決於資料同步任務配置的valueType配置項,valueType參數介紹請參見下文的全量參數說明章節。

offset

Long

timestamp

Long

headers

String

partition

Long

離線寫資料

DataWorks將資料寫入Kafka時,支援寫入JSON格式或text格式的資料,不同的資料同步方案往Kafka資料來源中寫入資料時,對資料的處理策略不一致,詳情如下。

重要
  • 寫入text格式的資料時,不會寫入欄位名資料,使用分隔字元來分隔欄位取值。

  • 即時同步寫入資料到Kafka時,寫入的格式為內建的JSON格式,寫入資料為包含資料庫變更訊息的資料、業務時間和DDL資訊的所有資料,資料格式詳情請參見附錄:訊息格式

同步任務類型

寫入Kafka value的格式

源端欄位類型

寫入時的處理方式

離線同步

DataStudio的離線同步節點

json

字串

UTF8編碼字串

布爾

轉換為UTF8編碼字串"true"或者"false"

時間/日期

yyyy-MM-dd HH:mm:ss格式UTF8編碼字串

數值

UTF8編碼數值字串

位元組流

位元組流會被視為UTF8編碼的字串,被轉換成字串

text

字串

UTF8編碼字串

布爾

轉換為UTF8編碼字串"true"或者"false"

時間/日期

yyyy-MM-dd HH:mm:ss格式UTF8編碼字串

數值

UTF8編碼數值字串

位元組流

位元組流會被視為UTF8編碼的字串,被轉換成字串

即時同步:ETL即時同步至Kafka

DataStudio的即時同步節點

json

字串

UTF8編碼字串

布爾

json布爾類型

時間/日期

  • 對於精確到毫秒以下精度的時間:轉換成表示毫秒時間戳記的13位JSON整數。

  • 對於精確到微秒或者納秒精度的時間:轉換成帶有表示毫秒時間戳記的13位整數,和表示納秒時間戳記的6位小數的JSON浮點數。

數值

json數實值型別

位元組流

位元組流會進行Base64編碼後轉換成UTF8編碼的字串

text

字串

UTF8編碼字串

布爾

轉換為UTF8編碼字串"true"或者"false"

時間/日期

yyyy-MM-dd HH:mm:ss格式UTF8編碼字串

數值

UTF8編碼數值字串

位元組流

位元組流會進行Base64編碼後轉換成UTF8編碼字串

即時同步:整庫即時同步至Kafka

純即時同步增量資料

內建JSON格式

字串

UTF8編碼字串

布爾

json布爾類型

時間/日期

13位毫秒時間戳記

數值

json數值

位元組流

位元組流會進行Base64編碼後轉換成UTF8編碼字串

同步解決方案:一鍵即時同步至Kafka

離線全量+即時增量

內建JSON格式

字串

UTF8編碼字串

布爾

json布爾類型

時間/日期

13位毫秒時間戳記

數值

json數值

位元組流

位元組流會進行Base64編碼後轉換成UTF8編碼字串

資料同步任務開發

Kafka資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源

單表離線同步任務配置指導

單表、整庫即時同步任務配置指導

操作流程請參見DataStudio側即時同步任務配置

單表、整庫全增量即時同步任務配置指導

操作流程請參見Data Integration側同步任務配置

啟用認證配置說明

SSL

配置Kafka資料來源時,特殊認證方式選擇SSL或者SASL_SSL時,表明Kafka叢集開啟了SSL認證,您需要上傳用戶端truststore認證檔案並填寫truststore認證密碼。

  • 如果Kafka叢集為alikafka執行個體,可以參考SSL認證演算法升級說明下載正確的truststore認證檔案,truststore認證密碼為KafkaOnsClient

  • 如果Kafka叢集為EMR執行個體,可以參考使用SSL加密Kafka連結下載正確的truststore認證檔案並擷取truststore認證密碼。

  • 如果是自建叢集,請自行上傳正確的truststore認證,填寫正確的truststore認證密碼。

keystore認證檔案、keystore認證密碼和SSL密鑰密碼只有在Kafka叢集開啟雙向SSL認證時才需要進行配置,用於Kafka叢集服務端認證用戶端身份,Kafka叢集server.propertiesssl.client.auth=required時開啟雙向SSL認證,詳情請參見使用SSL加密Kafka連結

GSSAPI

配置Kafka資料來源時,當Sasl機制選擇GSSAPI時,需要上傳三個認證檔案,分別是JAAS設定檔Kerberos設定檔以及Keytab檔案,並在獨享資源群組進行DNS/HOST設定,下面分別介紹三種檔案以及獨享資源群組DNS、HOST的配置方式。

  • JAAS設定檔

    JAAS檔案必須以KafkaClient開頭,之後使用一個大括弧包含所有配置項:

    • 大括弧內第一行定義使用的登入組件類,對於各類Sasl認證機制,登入組件類是固定的,後續的每個配置項以key=value格式書寫。

    • 除最後一個配置項,其他配置項結尾不能有分號。

    • 最後一個配置項結尾必須有分號,在大括弧"}"之後也必須加上一個分號。

    不符合以上格式要求將導致JAAS設定檔解析出錯,典型的JAAS設定檔格式如下(根據實際情況替換以下內容中的xxx):

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="xxx"
       storeKey=true
       serviceName="kafka-server"
       principal="kafka-client@EXAMPLE.COM";
    };

    配置項

    說明

    登入模組

    必須配置com.sun.security.auth.module.Krb5LoginModule。

    useKeyTab

    必須指定為true。

    keyTab

    可以指定任意路徑,在同步任務運行時會自動下載資料來源配置時上傳的keyTab檔案到本地,並使用本地檔案路徑填充keyTab配置項。

    storeKey

    決定用戶端是否儲存密鑰,配置true或者false均可,不影響資料同步。

    serviceName

    對應Kafka服務端server.properties設定檔中的sasl.kerberos.service.name配置項,請根據實際情況配置該項。

    principal

    Kafka用戶端使用的kerberos principal,請根據實際情況配置該項,並確保上傳的keyTab檔案包含該principal的密鑰。

  • Kerberos設定檔

    Kerberos設定檔必須包含兩個模組[libdefaults]和[realms]

    • [libdefaults]模組指定kerberos認證參數,模組中每個配置項以key=value格式書寫。

    • [realms]模組指定kdc地址,可以包含多個realm子模組,每個realm子模組以realm名稱=開頭。

    後面緊跟一組用大括弧包含配置項,每個配置項也以key=value格式書寫,典型的Kerberos設定檔格式如下(根據實際情況替換以下內容中的xxx):

    [libdefaults]
      default_realm = xxx
    
    [realms]
      xxx = {
        kdc = xxx
      }

    配置項

    說明

    [libdefaults].default_realm

    訪問Kafka叢集節點時預設使用的realm,一般情況下與JAAS設定檔中指定用戶端principal所在realm一致。

    [libdefaults]其他參數

    [libdefaults]模組可以指定其他一些kerberos認證參數,例如ticket_lifetime等,請根據實際需要配置。

    [realms].realm名稱

    需要與JAAS設定檔中指定用戶端principal所在realm,以及[libdefaults].default_realm一致,如果JAAS設定檔中指定用戶端principal所在realm和[libdefaults].default_realm不一致,則需要包含兩組realms子模組分別對應JAAS設定檔中指定用戶端principal所在realm和[libdefaults].default_realm。

    [realms].realm名稱.kdc

    以ip:port格式指定kdc地址和連接埠,例如kdc=10.0.0.1:88,連接埠如果省略預設使用88連接埠,例如kdc=10.0.0.1。

  • Keytab檔案

    Keytab檔案需要包含JAAS設定檔指定principal的密鑰,並且能夠通過kdc的驗證。例如本地當前工作目錄有名為client.keytab的檔案,可以通過以下命令驗證Keytab檔案是否包含指定principal的密鑰。

    klist -ket ./client.keytab
    
    Keytab name: FILE:client.keytab
    KVNO Timestamp           Principal
    ---- ------------------- ------------------------------------------------------
       7 2018-07-30T10:19:16 test@demo.com (des-cbc-md5)
  • 獨享資源群組DNS、HOST配置

    開啟Kerberos認證的Kafka叢集,會使用Kafka叢集中節點的hostname作為節點在kdc(Kerberos的服務端程式,即密鑰分發中心)中註冊的principal的一部分,而用戶端訪問Kafka叢集節點時,會根據本地的DNS、HOST設定,推導Kafka叢集節點的principal,進而從kdc擷取節點的訪問憑證。使用獨享資源群組訪問開啟Kerberos認證的Kafka叢集時,需要正確配置DNS、HOST,以確保順利從kdc擷取Kafka叢集節點的訪問憑證:

    • DNS設定

      當獨享資源群組綁定的VPC中,使用PrivateZone執行個體進行了Kafka叢集節點的網域名稱解析設定,則可以在DataWorks管控台,獨享資源群組對應的VPC繫結項目增加100.100.2.136和100.100.2.138兩個IP的自訂路由,即可使PrivateZone針對Kafka叢集節點的網域名稱解析設定對獨享資源群組生效。

    • HOST設定

      當獨享資源群組綁定的VPC中,未使用PrivateZone執行個體進行了Kafka叢集節點的網域名稱解析設定,則需要在DataWorks管控台,獨享資源群組網路設定中逐個將Kafka叢集節點的IP地址與網域名稱映射添加到Host配置中。

PLAIN

配置Kafka資料來源時,當Sasl機制選擇PLAIN時,JAAS檔案必須以KafkaClient開頭,之後使用一個大括弧包含所有配置項。

  • 大括弧內第一行定義使用的登入組件類,對於各類Sasl認證機制,登入組件類是固定的,後續的每個配置項以key=value格式書寫。

  • 除最後一個配置項,其他配置項結尾不能有分號。

  • 最後一個配置項結尾必須有分號,在大括弧"}"之後也必須加上一個分號。

不符合以上格式要求將導致JAAS設定檔解析出錯,典型的JAAS設定檔格式如下(根據實際情況替換以下內容中的xxx):

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

配置項

說明

登入模組

必須配置org.apache.kafka.common.security.plain.PlainLoginModul

username

使用者名稱,請根據實際情況配置該項。

password

密碼,請根據實際情況配置該項。

常見問題

附錄:指令碼Demo與參數說明

附錄:離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

Reader指令碼Demo

從Kafka讀取資料的JSON配置,如下所示。

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent": 1,//並發數
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    }
}

Reader指令碼參數

參數

描述

是否必選

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。

server

Kafka的broker server地址,格式為ip:port

您可以只配置一個server,但請務必保證Kafka叢集中所有broker的IP地址都可以連通DataWorks。

topic

Kafka的Topic,是Kafka處理資源的訊息源(feeds of messages)的彙總。

column

需要讀取的Kafka資料,支援常量列、資料列和屬性列:

  • 常量列:使用單引號包裹的列為常量列,例如["'abc'", "'123'"]

  • 資料列

    • 如果您的資料是一個JSON,支援擷取JSON的屬性,例如["event_id"]

    • 如果您的資料是一個JSON,支援擷取JSON的嵌套子屬性,例如["tag.desc"]

  • 屬性列

    • __key__表示訊息的key。

    • __value__表示訊息的完整內容 。

    • __partition__表示當前訊息所在分區。

    • __headers__表示當前訊息headers資訊。

    • __offset__表示當前訊息的位移量。

    • __timestamp__表示當前訊息的時間戳記。

    完整樣本如下。

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]

keyType

Kafka的Key的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

valueType

Kafka的Value的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

beginDateTime

資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。yyyymmddhhmmss格式的時間字串,可以配合調度參數使用。詳情請參見調度參數支援的格式

說明

Kafka 0.10.2及以上的版本支援該功能。

需要和beginOffset二選一。

說明

beginDateTimeendDateTime配合使用。

endDateTime

資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。yyyymmddhhmmss格式的時間字串,可以配合調度參數使用。詳情請參見調度參數支援的格式

說明

Kafka 0.10.2及以上的版本支援該功能。

需要和endOffset二選一。

說明

endDateTimebeginDateTime配合使用。

beginOffset

資料消費的開始時間位點,您可以配置以下形式:

  • 數字形式(例如15553274),表示開始消費的點位。

  • seekToBeginning:表示從開始點位消費資料。

  • seekToLast:表示從kafkaConfig配置中指定的group.id對應群組ID儲存的位點開始讀取資料,注意群組位點在用戶端會定時自動認可到Kafka服務端,所以任務失敗後,如果重跑任務時可能會有資料重複或者丟失,skipExceedRecord參數配置為true時,任務可能丟棄最後讀取的一些記錄,而這些丟棄資料的群組位點已經提交到服務端,在下一個周期任務運行時將無法讀到這些丟棄的資料。

  • seekToEnd:表示從最後點位消費資料,會讀取到空資料。

需要和beginDateTime二選一。

endOffset

資料消費的結束位點,用於控制結束資料消費任務退出的時間。

需要和endDateTime二選一。

skipExceedRecord

Kafka使用public ConsumerRecords<K, V> poll(final Duration timeout)消費資料,一次poll調用擷取的資料可能在endOffset或者endDateTime之外。skipExceedRecord用於控制是否寫出多餘的資料至目的端。由於消費資料使用了自動點位提交,建議您:

  • Kafka 0.10.2之前版本:建議配置skipExceedRecord為false。

  • Kafka 0.10.2及以上版本:建議配置skipExceedRecord為true。

否,預設值為false

partition

Kafka的一個Topic有多個分區(partition),正常情況下資料同步任務是讀取Topic(多個分區)一個點位區間的資料。您也可以指定partition,僅讀取一個分區點位區間的資料。

否,無預設值。

kafkaConfig

建立Kafka資料消費用戶端KafkaConsumer可以指定擴充參數,例如bootstrap.serversauto.commit.interval.mssession.timeout.ms等,您可以基於kafkaConfig控制KafkaConsumer消費資料的行為。

encoding

當keyType或valueType配置為STRING時,將使用該配置項指定的編碼解析字串。

否,預設為UTF-8。

waitTIme

消費者對象從Kafka拉取一次資料的最大等待時間,單位為秒。

否,預設為60。

stopWhenPollEmpty

該配置項可選值為true/false。當配置為true時,如果消費者從Kafka拉取資料返回為空白(一般是已經讀完主題中的全部資料,也可能是網路或者Kafka叢集可用性問題),則立即停止任務,否則持續重試直到再次讀到資料。

否,預設為true。

stopWhenReachEndOffset

該配置項只在stopWhenPollEmpty為true時生效,可選值為true/false。

  • 當配置為true時,如果消費者從Kafka拉取資料返回為空白時,會檢查當前是否讀取到了Kafka Topic分區中的最新位點資料,如果已經讀到了Kafka Topic所有分區中的最新位點資料,則立即停止任務,否則繼續嘗試從Kafka Topic中拉取資料。

  • 當配置為false時,如果消費者從Kafka拉取資料返回為空白時,不會進行檢查,立即停止任務。

否,預設為false。

說明

相容歷史邏輯,Kafka版本低於V0.10.2無法執行已經讀取Kafka Topic所有分區中的最新位點資料檢查,但線上可能存在個別指令碼模式任務是讀取的版本低於V0.10.2的Kafka資料。

kafkaConfig參數說明如下。

參數

描述

fetch.min.bytes

指定消費者從broker擷取訊息的最小位元組數,即有足夠的資料時,才將其返回給消費者。

fetch.max.wait.ms

等待broker返回資料的最大時間,預設500毫秒。fetch.min.bytesfetch.max.wait.ms先滿足哪個條件,便按照該方式返回資料。

max.partition.fetch.bytes

指定broker從每個partition中返回給消費者的最大位元組數,預設為1 MB。

session.timeout.ms

指定消費者不再接收服務之前,可以與伺服器中斷連線的時間,預設是30秒。

auto.offset.reset

消費者在讀取沒有位移量或者位移量無效的情況下(因為消費者長時間失效,包含位移量的記錄已經過時並被刪除)的處理方式。預設為none(意味著不會自動重設位點),您可以更改為earliest(消費者從起始位置讀取partition的記錄)。

max.poll.records

單次調用poll方法能夠返回的訊息數量。

key.deserializer

訊息Key的還原序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer

value.deserializer

資料Value的還原序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer

ssl.truststore.location

SSL根憑證的路徑。

ssl.truststore.password

根憑證Store的密碼。如果是Aliyun Kafka,則配置為KafkaOnsClient。

security.protocol

接入協議,目前支援使用SASL_SSL協議接入。

sasl.mechanism

SASL鑒權方式,如果是Aliyun Kafka,使用PLAIN。

java.security.auth.login.config

SASL鑒權檔案路徑。

Writer指令碼Demo

向Kafka寫入資料的JSON配置,如下所示。

{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//外掛程式名。
"parameter":{
"server": "ip:9092", //Kafka的server地址。
"keyIndex": 0, //作為Key的列。需遵循駝峰命名規則,k小寫
"valueIndex": 1, //作為Value的某列。目前只支援取來源端資料的一列或者該參數不填(不填表示取來源所有資料)
        //例如想取odps的第2、3、4列資料作為kafkaValue,請建立odps表將原odps表資料做清洗整合寫新odps表後使用新表同步。
"keyType": "Integer", //Kafka的Key的類型。
"valueType": "Short", //Kafka的Value的類型。
"topic": "t08", //Kafka的topic。
"batchSize": 1024 //向kafka一次性寫入的資料量,單位是位元組。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
                     "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
                     "concurrent":1, //作業並發數。
                     "mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}

Writer指令碼參數

參數

描述

是否必選

datasource

資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。

server

Kafka的server地址,格式為ip:port

topic

Kafka的topic,是Kafka處理資源的訊息源(feeds of messages)的不同分類。

每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為topic,一個topic是對一組訊息的歸納。

valueIndex

Kafka Writer中作為Value的那一列。如果不填寫,預設將所有列拼起來作為Value,分隔字元為fieldDelimiter

writeMode

當未配置valueIndex時,該配置項決定將源端讀取記錄的所有列拼接作為寫入kafka記錄Value的格式,可選值為text和JSON,預設值為text。

  • 配置為text,將所有列按照fieldDelimiter指定分隔字元拼接。

  • 配置為JSON,將所有列按照column參數指定欄位名稱拼接為JSON字串。

例如源端記錄有三列,值為a、b和c,writeMode配置為text、fieldDelimiter配置為#時,寫入kafka的記錄Value為字串a#b#c;writeMode配置為JSON、column配置為[{"name":"col1"},{"name":"col2"},{"name":"col3"}]時,寫入kafka的記錄Value為字串{"col1":"a","col2":"b","col3":"c"}。

如果配置了valueIndex,該配置項無效。

column

目標表需要寫入資料的欄位,欄位間用英文逗號分隔。例如:"column": ["id", "name", "age"]

當未配置valueIndex,並且writeMode選擇JSON時,該配置項定義源端讀取記錄的列值在JSON結構中的欄位名稱。例如,"column": [{"name":id"}, {"name":"name"}, {"name":"age"}]。

  • 當源端讀取記錄列的個數多於column配置的欄位名個數時,寫入時進行截斷。例如:

    源端記錄有三列,值為a、b和c,column配置為[{"name":"col1"},{"name":"col2"}]時,寫入kafka的記錄Value為字串{"col1":"a","col2":"b"}。

  • 當源端讀取記錄列的個數少於column配置的欄位名個數時,多餘column配置欄位名填充null或者nullValueFormat指定的字串。例如:

    源端記錄有兩列,值為a和b,column配置為[{"name":"col1"},{"name":"col2"},{"name":"col3"}]時,寫入kafka的記錄Value為字串{"col1":"a","col2":"b","col3":null}。如果配置了valueIndex,或者writeMode配置為text,該配置項無效。

如果配置了valueIndex,或者writeMode配置為text,該配置項無效。

當未配置valueIndex,並且writeMode配置為JSON時必選

partition

指定寫入Kafka topic指定分區的編號,是一個大於等於0的整數。

keyIndex

Kafka Writer中作為Key的那一列。

keyIndex參數取值範圍是大於等於0的整數,否則任務會出錯。

keyIndexes

源端讀取記錄中作為寫入kafka記錄Key的列的序號數組。

列序號從0開始,例如[0,1,2],會將配置的所有列序號的值用逗號串連作為寫入kafka記錄的Key。如果不填寫,寫入kafka記錄Key為null,資料輪流寫入topic的各個分區中,與keyIndex參數只能二選一。

fieldDelimiter

當writeMode配置為text,並且未配置valueIndex時,將源端讀取記錄的所有列按照該配置項指定資料行分隔符號拼接作為寫入kafka記錄的Value,支援配置單個或者多個字元作為分隔字元,支援以\u0001格式配置unicode字元,支援\t\n等逸出字元。預設值為\t

如果writeMode未配置為text或者配置了valueIndex,該配置項無效。

keyType

Kafka的Key的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

valueType

Kafka的Value的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。

nullKeyFormat

keyIndex或者keyIndexes指定的源端列值為null時,替換為該配置項指定的字串,如果不配置不做替換。

nullValueFormat

當源端列值為null時,組裝寫入kafka記錄Value時替換為該配置項指定的字串,如果不配置不做替換。

acks

初始化Kafka Producer時的acks配置,決定寫入成功的確認方式。預設acks參數為allacks取值如下:

  • 0:不進行寫入成功確認。

  • 1:確認主副本寫入成功。

  • all:確認所有副本寫入成功。

附錄:寫入Kafka訊息格式定義

完成配置即時同步任務的操作後,執行同步任務會將源端資料庫讀取的資料,以JSON格式寫入到Kafka topic中。除了會將設定的源端表中已有資料全部寫入Kafka對應Topic中,還會啟動即時同步將增量資料持續寫入Kafka對應Topic中,同時源端表增量DDL變更資訊也會以JSON格式寫入Kafka對應Topic中。您可以通過附錄:訊息格式擷取寫入Kafka的訊息的狀態及變更等資訊。

說明

通過離線同步任務寫入Kafka的資料JSON結構中的payload.sequenceId、payload.timestamp.eventTImepayload.timestamp.checkpointTime欄位均設定為-1