Kafka資料來源為您提供讀取和寫入Kafka的雙向通道,本文為您介紹DataWorks的Kafka資料同步的能力支援情況。
支援的版本
支援阿里雲Kafka,以及=0.10.2且<=2.2.x的自建Kafka版本。
對於<0.10.2版本Kafka,由於Kafka不支援檢索分區資料offset,並且Kafka資料結構可能不支援時間戳記,進而無法支援資料同步。
使用限制
Kafka資料來源目前僅支援使用獨享Data Integration資源群組。
單表離線讀
同時配置parameter.groupId和parameter.kafkaConfig.group.id時,parameter.groupId優先順序高於kafkaConfig配置資訊中的group.id。
單表即時寫
寫入資料不支援去重,即如果任務重設位點或者Failover後再啟動,會導致有重複資料寫入。
整庫即時寫
即時資料同步任務僅支援使用獨享Data Integration資源群組。
對於源端同步表有主鍵的情境,同步時會使用主索引值作為kafka記錄的key,確保同主鍵的變更有序寫入kafka的同一分區。
對於源端同步表無主鍵的情境,如果選擇了支援無主鍵表同步選項,則同步時kafka記錄的key為空白。如果要確保表的變更有序寫入kafka,則選擇寫入的kafka topic必須是單分區。如果選擇了自訂同步主鍵,則同步時使用其他非主鍵的一個或幾個欄位的聯合,代替主鍵作為kafka記錄的key。
如果在kafka叢集發生響應異常的情況下,仍要確保有主鍵表同主鍵的變更有序寫入kafka的同一分區,則需要在配置kafka資料來源時,在擴充參數表單中加入如下配置。
{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}
。重要添加配置後同步效能會大幅下降,需要在效能和嚴格保序可靠性之間做好權衡。
即時同步寫入kafka的訊息總體格式、同步任務心跳訊息格式及源端更改資料對應的訊息格式,詳情請參見:附錄:訊息格式。
支援的欄位類型
Kafka的資料存放區為非結構化的儲存,通常Kafka記錄的資料模組有key、value、offset、timestamp、headers、partition。DataWorks在對Kafka資料進行讀寫時,會按照以下的策略進行資料處理。
離線讀資料
DataWorks讀取Kafka資料時,支援對Kafka資料進行JSON格式的解析,各資料模組的處理方式如下。
Kafka記錄資料模組 | 處理後的資料類型 |
key | 取決於資料同步任務配置的keyType配置項,keyType參數介紹請參見下文的全量參數說明章節。 |
value | 取決於資料同步任務配置的valueType配置項,valueType參數介紹請參見下文的全量參數說明章節。 |
offset | Long |
timestamp | Long |
headers | String |
partition | Long |
離線寫資料
DataWorks將資料寫入Kafka時,支援寫入JSON格式或text格式的資料,不同的資料同步方案往Kafka資料來源中寫入資料時,對資料的處理策略不一致,詳情如下。
寫入text格式的資料時,不會寫入欄位名資料,使用分隔字元來分隔欄位取值。
即時同步寫入資料到Kafka時,寫入的格式為內建的JSON格式,寫入資料為包含資料庫變更訊息的資料、業務時間和DDL資訊的所有資料,資料格式詳情請參見附錄:訊息格式。
同步任務類型 | 寫入Kafka value的格式 | 源端欄位類型 | 寫入時的處理方式 |
離線同步 DataStudio的離線同步節點 | json | 字串 | UTF8編碼字串 |
布爾 | 轉換為UTF8編碼字串"true"或者"false" | ||
時間/日期 | yyyy-MM-dd HH:mm:ss格式UTF8編碼字串 | ||
數值 | UTF8編碼數值字串 | ||
位元組流 | 位元組流會被視為UTF8編碼的字串,被轉換成字串 | ||
text | 字串 | UTF8編碼字串 | |
布爾 | 轉換為UTF8編碼字串"true"或者"false" | ||
時間/日期 | yyyy-MM-dd HH:mm:ss格式UTF8編碼字串 | ||
數值 | UTF8編碼數值字串 | ||
位元組流 | 位元組流會被視為UTF8編碼的字串,被轉換成字串 | ||
即時同步:ETL即時同步至Kafka DataStudio的即時同步節點 | json | 字串 | UTF8編碼字串 |
布爾 | json布爾類型 | ||
時間/日期 |
| ||
數值 | json數實值型別 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼的字串 | ||
text | 字串 | UTF8編碼字串 | |
布爾 | 轉換為UTF8編碼字串"true"或者"false" | ||
時間/日期 | yyyy-MM-dd HH:mm:ss格式UTF8編碼字串 | ||
數值 | UTF8編碼數值字串 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼字串 | ||
即時同步:整庫即時同步至Kafka 純即時同步增量資料 | 內建JSON格式 | 字串 | UTF8編碼字串 |
布爾 | json布爾類型 | ||
時間/日期 | 13位毫秒時間戳記 | ||
數值 | json數值 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼字串 | ||
同步解決方案:一鍵即時同步至Kafka 離線全量+即時增量 | 內建JSON格式 | 字串 | UTF8編碼字串 |
布爾 | json布爾類型 | ||
時間/日期 | 13位毫秒時間戳記 | ||
數值 | json數值 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼字串 |
資料同步任務開發
Kafka資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源。
單表離線同步任務配置指導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:指令碼Demo與參數說明。
單表、整庫即時同步任務配置指導
操作流程請參見DataStudio側即時同步任務配置。
單表、整庫全增量即時同步任務配置指導
操作流程請參見Data Integration側同步任務配置。
啟用認證配置說明
SSL
配置Kafka資料來源時,特殊認證方式選擇SSL或者SASL_SSL時,表明Kafka叢集開啟了SSL認證,您需要上傳用戶端truststore認證檔案並填寫truststore認證密碼。
如果Kafka叢集為alikafka執行個體,可以參考SSL認證演算法升級說明下載正確的truststore認證檔案,truststore認證密碼為KafkaOnsClient。
如果Kafka叢集為EMR執行個體,可以參考使用SSL加密Kafka連結下載正確的truststore認證檔案並擷取truststore認證密碼。
如果是自建叢集,請自行上傳正確的truststore認證,填寫正確的truststore認證密碼。
keystore認證檔案、keystore認證密碼和SSL密鑰密碼只有在Kafka叢集開啟雙向SSL認證時才需要進行配置,用於Kafka叢集服務端認證用戶端身份,Kafka叢集server.properties中ssl.client.auth=required時開啟雙向SSL認證,詳情請參見使用SSL加密Kafka連結。
GSSAPI
配置Kafka資料來源時,當Sasl機制選擇GSSAPI時,需要上傳三個認證檔案,分別是JAAS設定檔、Kerberos設定檔以及Keytab檔案,並在獨享資源群組進行DNS/HOST設定,下面分別介紹三種檔案以及獨享資源群組DNS、HOST的配置方式。
JAAS設定檔
JAAS檔案必須以KafkaClient開頭,之後使用一個大括弧包含所有配置項:
大括弧內第一行定義使用的登入組件類,對於各類Sasl認證機制,登入組件類是固定的,後續的每個配置項以key=value格式書寫。
除最後一個配置項,其他配置項結尾不能有分號。
最後一個配置項結尾必須有分號,在大括弧"}"之後也必須加上一個分號。
不符合以上格式要求將導致JAAS設定檔解析出錯,典型的JAAS設定檔格式如下(根據實際情況替換以下內容中的xxx):
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="xxx" storeKey=true serviceName="kafka-server" principal="kafka-client@EXAMPLE.COM"; };
配置項
說明
登入模組
必須配置com.sun.security.auth.module.Krb5LoginModule。
useKeyTab
必須指定為true。
keyTab
可以指定任意路徑,在同步任務運行時會自動下載資料來源配置時上傳的keyTab檔案到本地,並使用本地檔案路徑填充keyTab配置項。
storeKey
決定用戶端是否儲存密鑰,配置true或者false均可,不影響資料同步。
serviceName
對應Kafka服務端server.properties設定檔中的sasl.kerberos.service.name配置項,請根據實際情況配置該項。
principal
Kafka用戶端使用的kerberos principal,請根據實際情況配置該項,並確保上傳的keyTab檔案包含該principal的密鑰。
Kerberos設定檔
Kerberos設定檔必須包含兩個模組[libdefaults]和[realms]
[libdefaults]模組指定kerberos認證參數,模組中每個配置項以key=value格式書寫。
[realms]模組指定kdc地址,可以包含多個realm子模組,每個realm子模組以realm名稱=開頭。
後面緊跟一組用大括弧包含配置項,每個配置項也以key=value格式書寫,典型的Kerberos設定檔格式如下(根據實際情況替換以下內容中的xxx):
[libdefaults] default_realm = xxx [realms] xxx = { kdc = xxx }
配置項
說明
[libdefaults].default_realm
訪問Kafka叢集節點時預設使用的realm,一般情況下與JAAS設定檔中指定用戶端principal所在realm一致。
[libdefaults]其他參數
[libdefaults]模組可以指定其他一些kerberos認證參數,例如ticket_lifetime等,請根據實際需要配置。
[realms].realm名稱
需要與JAAS設定檔中指定用戶端principal所在realm,以及[libdefaults].default_realm一致,如果JAAS設定檔中指定用戶端principal所在realm和[libdefaults].default_realm不一致,則需要包含兩組realms子模組分別對應JAAS設定檔中指定用戶端principal所在realm和[libdefaults].default_realm。
[realms].realm名稱.kdc
以ip:port格式指定kdc地址和連接埠,例如kdc=10.0.0.1:88,連接埠如果省略預設使用88連接埠,例如kdc=10.0.0.1。
Keytab檔案
Keytab檔案需要包含JAAS設定檔指定principal的密鑰,並且能夠通過kdc的驗證。例如本地當前工作目錄有名為client.keytab的檔案,可以通過以下命令驗證Keytab檔案是否包含指定principal的密鑰。
klist -ket ./client.keytab Keytab name: FILE:client.keytab KVNO Timestamp Principal ---- ------------------- ------------------------------------------------------ 7 2018-07-30T10:19:16 test@demo.com (des-cbc-md5)
獨享資源群組DNS、HOST配置
開啟Kerberos認證的Kafka叢集,會使用Kafka叢集中節點的hostname作為節點在kdc(Kerberos的服務端程式,即密鑰分發中心)中註冊的principal的一部分,而用戶端訪問Kafka叢集節點時,會根據本地的DNS、HOST設定,推導Kafka叢集節點的principal,進而從kdc擷取節點的訪問憑證。使用獨享資源群組訪問開啟Kerberos認證的Kafka叢集時,需要正確配置DNS、HOST,以確保順利從kdc擷取Kafka叢集節點的訪問憑證:
DNS設定
當獨享資源群組綁定的VPC中,使用PrivateZone執行個體進行了Kafka叢集節點的網域名稱解析設定,則可以在DataWorks管控台,獨享資源群組對應的VPC繫結項目增加100.100.2.136和100.100.2.138兩個IP的自訂路由,即可使PrivateZone針對Kafka叢集節點的網域名稱解析設定對獨享資源群組生效。
HOST設定
當獨享資源群組綁定的VPC中,未使用PrivateZone執行個體進行了Kafka叢集節點的網域名稱解析設定,則需要在DataWorks管控台,獨享資源群組網路設定中逐個將Kafka叢集節點的IP地址與網域名稱映射添加到Host配置中。
PLAIN
配置Kafka資料來源時,當Sasl機制選擇PLAIN時,JAAS檔案必須以KafkaClient開頭,之後使用一個大括弧包含所有配置項。
大括弧內第一行定義使用的登入組件類,對於各類Sasl認證機制,登入組件類是固定的,後續的每個配置項以key=value格式書寫。
除最後一個配置項,其他配置項結尾不能有分號。
最後一個配置項結尾必須有分號,在大括弧"}"之後也必須加上一個分號。
不符合以上格式要求將導致JAAS設定檔解析出錯,典型的JAAS設定檔格式如下(根據實際情況替換以下內容中的xxx):
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
};
配置項 | 說明 |
登入模組 | 必須配置org.apache.kafka.common.security.plain.PlainLoginModul |
username | 使用者名稱,請根據實際情況配置該項。 |
password | 密碼,請根據實際情況配置該項。 |
常見問題
附錄:指令碼Demo與參數說明
附錄:離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。
Reader指令碼Demo
從Kafka讀取資料的JSON配置,如下所示。
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent": 1,//並發數
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
}
}
Reader指令碼參數
參數 | 描述 | 是否必選 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。 | 是 |
server | Kafka的broker server地址,格式為ip:port。 您可以只配置一個server,但請務必保證Kafka叢集中所有broker的IP地址都可以連通DataWorks。 | 是 |
topic | Kafka的Topic,是Kafka處理資源的訊息源(feeds of messages)的彙總。 | 是 |
column | 需要讀取的Kafka資料,支援常量列、資料列和屬性列:
| 是 |
keyType | Kafka的Key的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
valueType | Kafka的Value的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
beginDateTime | 資料消費的開始時間位點,為時間範圍(左閉右開)的左邊界。yyyymmddhhmmss格式的時間字串,可以配合調度參數使用。詳情請參見調度參數支援的格式。 說明 Kafka 0.10.2及以上的版本支援該功能。 | 需要和beginOffset二選一。 說明 beginDateTime和endDateTime配合使用。 |
endDateTime | 資料消費的結束時間位點,為時間範圍(左閉右開)的右邊界。yyyymmddhhmmss格式的時間字串,可以配合調度參數使用。詳情請參見調度參數支援的格式。 說明 Kafka 0.10.2及以上的版本支援該功能。 | 需要和endOffset二選一。 說明 endDateTime和beginDateTime配合使用。 |
beginOffset | 資料消費的開始時間位點,您可以配置以下形式:
| 需要和beginDateTime二選一。 |
endOffset | 資料消費的結束位點,用於控制結束資料消費任務退出的時間。 | 需要和endDateTime二選一。 |
skipExceedRecord | Kafka使用
| 否,預設值為false。 |
partition | Kafka的一個Topic有多個分區(partition),正常情況下資料同步任務是讀取Topic(多個分區)一個點位區間的資料。您也可以指定partition,僅讀取一個分區點位區間的資料。 | 否,無預設值。 |
kafkaConfig | 建立Kafka資料消費用戶端KafkaConsumer可以指定擴充參數,例如bootstrap.servers、auto.commit.interval.ms、session.timeout.ms等,您可以基於kafkaConfig控制KafkaConsumer消費資料的行為。 | 否 |
encoding | 當keyType或valueType配置為STRING時,將使用該配置項指定的編碼解析字串。 | 否,預設為UTF-8。 |
waitTIme | 消費者對象從Kafka拉取一次資料的最大等待時間,單位為秒。 | 否,預設為60。 |
stopWhenPollEmpty | 該配置項可選值為true/false。當配置為true時,如果消費者從Kafka拉取資料返回為空白(一般是已經讀完主題中的全部資料,也可能是網路或者Kafka叢集可用性問題),則立即停止任務,否則持續重試直到再次讀到資料。 | 否,預設為true。 |
stopWhenReachEndOffset | 該配置項只在stopWhenPollEmpty為true時生效,可選值為true/false。
| 否,預設為false。 說明 相容歷史邏輯,Kafka版本低於V0.10.2無法執行已經讀取Kafka Topic所有分區中的最新位點資料檢查,但線上可能存在個別指令碼模式任務是讀取的版本低於V0.10.2的Kafka資料。 |
kafkaConfig參數說明如下。
參數 | 描述 |
fetch.min.bytes | 指定消費者從broker擷取訊息的最小位元組數,即有足夠的資料時,才將其返回給消費者。 |
fetch.max.wait.ms | 等待broker返回資料的最大時間,預設500毫秒。fetch.min.bytes和fetch.max.wait.ms先滿足哪個條件,便按照該方式返回資料。 |
max.partition.fetch.bytes | 指定broker從每個partition中返回給消費者的最大位元組數,預設為1 MB。 |
session.timeout.ms | 指定消費者不再接收服務之前,可以與伺服器中斷連線的時間,預設是30秒。 |
auto.offset.reset | 消費者在讀取沒有位移量或者位移量無效的情況下(因為消費者長時間失效,包含位移量的記錄已經過時並被刪除)的處理方式。預設為none(意味著不會自動重設位點),您可以更改為earliest(消費者從起始位置讀取partition的記錄)。 |
max.poll.records | 單次調用poll方法能夠返回的訊息數量。 |
key.deserializer | 訊息Key的還原序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。 |
value.deserializer | 資料Value的還原序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。 |
ssl.truststore.location | SSL根憑證的路徑。 |
ssl.truststore.password | 根憑證Store的密碼。如果是Aliyun Kafka,則配置為KafkaOnsClient。 |
security.protocol | 接入協議,目前支援使用SASL_SSL協議接入。 |
sasl.mechanism | SASL鑒權方式,如果是Aliyun Kafka,使用PLAIN。 |
java.security.auth.login.config | SASL鑒權檔案路徑。 |
Writer指令碼Demo
向Kafka寫入資料的JSON配置,如下所示。
{
"type":"job",
"version":"2.0",//版本號碼。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",//外掛程式名。
"parameter":{
"server": "ip:9092", //Kafka的server地址。
"keyIndex": 0, //作為Key的列。需遵循駝峰命名規則,k小寫
"valueIndex": 1, //作為Value的某列。目前只支援取來源端資料的一列或者該參數不填(不填表示取來源所有資料)
//例如想取odps的第2、3、4列資料作為kafkaValue,請建立odps表將原odps表資料做清洗整合寫新odps表後使用新表同步。
"keyType": "Integer", //Kafka的Key的類型。
"valueType": "Short", //Kafka的Value的類型。
"topic": "t08", //Kafka的topic。
"batchSize": 1024 //向kafka一次性寫入的資料量,單位是位元組。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer指令碼參數
參數 | 描述 | 是否必選 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須要與添加的資料來源名稱保持一致。 | 是 |
server | Kafka的server地址,格式為ip:port。 | 是 |
topic | Kafka的topic,是Kafka處理資源的訊息源(feeds of messages)的不同分類。 每條發布至Kafka叢集的訊息都有一個類別,該類別被稱為topic,一個topic是對一組訊息的歸納。 | 是 |
valueIndex | Kafka Writer中作為Value的那一列。如果不填寫,預設將所有列拼起來作為Value,分隔字元為fieldDelimiter。 | 否 |
writeMode | 當未配置valueIndex時,該配置項決定將源端讀取記錄的所有列拼接作為寫入kafka記錄Value的格式,可選值為text和JSON,預設值為text。
例如源端記錄有三列,值為a、b和c,writeMode配置為text、fieldDelimiter配置為#時,寫入kafka的記錄Value為字串a#b#c;writeMode配置為JSON、column配置為[{"name":"col1"},{"name":"col2"},{"name":"col3"}]時,寫入kafka的記錄Value為字串{"col1":"a","col2":"b","col3":"c"}。 如果配置了valueIndex,該配置項無效。 | 否 |
column | 目標表需要寫入資料的欄位,欄位間用英文逗號分隔。例如:"column": ["id", "name", "age"]。 當未配置valueIndex,並且writeMode選擇JSON時,該配置項定義源端讀取記錄的列值在JSON結構中的欄位名稱。例如,"column": [{"name":id"}, {"name":"name"}, {"name":"age"}]。
如果配置了valueIndex,或者writeMode配置為text,該配置項無效。 | 當未配置valueIndex,並且writeMode配置為JSON時必選 |
partition | 指定寫入Kafka topic指定分區的編號,是一個大於等於0的整數。 | 否 |
keyIndex | Kafka Writer中作為Key的那一列。 keyIndex參數取值範圍是大於等於0的整數,否則任務會出錯。 | 否 |
keyIndexes | 源端讀取記錄中作為寫入kafka記錄Key的列的序號數組。 列序號從0開始,例如[0,1,2],會將配置的所有列序號的值用逗號串連作為寫入kafka記錄的Key。如果不填寫,寫入kafka記錄Key為null,資料輪流寫入topic的各個分區中,與keyIndex參數只能二選一。 | 否 |
fieldDelimiter | 當writeMode配置為text,並且未配置valueIndex時,將源端讀取記錄的所有列按照該配置項指定資料行分隔符號拼接作為寫入kafka記錄的Value,支援配置單個或者多個字元作為分隔字元,支援以\u0001格式配置unicode字元,支援\t、\n等逸出字元。預設值為\t。 如果writeMode未配置為text或者配置了valueIndex,該配置項無效。 | 否 |
keyType | Kafka的Key的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
valueType | Kafka的Value的類型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
nullKeyFormat | keyIndex或者keyIndexes指定的源端列值為null時,替換為該配置項指定的字串,如果不配置不做替換。 | 否 |
nullValueFormat | 當源端列值為null時,組裝寫入kafka記錄Value時替換為該配置項指定的字串,如果不配置不做替換。 | 否 |
acks | 初始化Kafka Producer時的acks配置,決定寫入成功的確認方式。預設acks參數為all。acks取值如下:
| 否 |
附錄:寫入Kafka訊息格式定義
完成配置即時同步任務的操作後,執行同步任務會將源端資料庫讀取的資料,以JSON格式寫入到Kafka topic中。除了會將設定的源端表中已有資料全部寫入Kafka對應Topic中,還會啟動即時同步將增量資料持續寫入Kafka對應Topic中,同時源端表增量DDL變更資訊也會以JSON格式寫入Kafka對應Topic中。您可以通過附錄:訊息格式擷取寫入Kafka的訊息的狀態及變更等資訊。
通過離線同步任務寫入Kafka的資料JSON結構中的payload.sequenceId、payload.timestamp.eventTIme和payload.timestamp.checkpointTime欄位均設定為-1。