Kafka資料來源為您提供讀取和寫入Kafka的雙向通道,本文為您介紹DataWorks的Kafka資料同步的能力支援情況。
支援的版本
支援阿里雲Kafka,以及=0.10.2且<=2.2.x的自建Kafka版本。
對於<0.10.2版本Kafka,由於Kafka不支援檢索分區資料offset,並且Kafka資料結構可能不支援時間戳記,進而無法支援資料同步。
資源評估
即時讀取
使用訂用帳戶Serverless資源群組時,請提前預估Serverless資源群組規格,避免資源群組規格不足影響任務運行:
一個topic預估需要1 CU,除此之外,還需根據流量進行評估:
Kafka資料不壓縮,按10 MB/s預估需要1 CU。
Kafka資料壓縮,按10 MB/s預估需要2 CU。
Kafka資料壓縮並且進行JSON解析,按10MB/s預估需要3 CU。
使用訂用帳戶Serverless資源群組和舊版獨享Data Integration資源群組時:
對Failover容忍度高,叢集槽位的水位建議不超過80%。
對Failover容忍度低,叢集槽位的水位建議不超過70%。
實際佔用和資料內容格式等相關,評估後您可以再根據實際運行情況進行調整。
使用限制
Kafka資料來源目前支援使用Serverless資源群組(推薦)和舊版獨享Data Integration資源群組。
單表離線讀
同時配置parameter.groupId和parameter.kafkaConfig.group.id時,parameter.groupId優先順序高於kafkaConfig配置資訊中的group.id。
單表即時寫
寫入資料不支援去重,即如果任務重設位點或者Failover後再啟動,會導致出現重複資料寫入。
整庫即時寫
即時資料同步任務支援使用Serverless資源群組(推薦)和舊版獨享Data Integration資源群組。
對於源端同步表有主鍵的情境,同步時會使用主索引值作為kafka記錄的key,確保同主鍵的變更有序寫入kafka的同一分區。
對於源端同步表無主鍵的情境,如果選擇了支援無主鍵表同步選項,則同步時kafka記錄的key為空白。如果要確保表的變更有序寫入kafka,則選擇寫入的kafka topic必須是單分區。如果選擇了自訂同步主鍵,則同步時使用其他非主鍵的一個或幾個欄位的聯合,代替主鍵作為kafka記錄的key。
如果在kafka叢集發生響應異常的情況下,仍要確保有主鍵表同主鍵的變更有序寫入kafka的同一分區,則需要在配置kafka資料來源時,在擴充參數表單中加入如下配置。
{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}
重要添加配置後同步效能會大幅下降,需要在效能和嚴格保序可靠性之間做好權衡。
即時同步寫入kafka的訊息總體格式、同步任務心跳訊息格式及源端更改資料對應的訊息格式,詳情請參見:附錄:訊息格式。
支援的欄位類型
Kafka的資料存放區為非結構化的儲存,通常Kafka記錄的資料模組有key、value、offset、timestamp、headers、partition。DataWorks在對Kafka資料進行讀寫時,會按照以下的策略進行資料處理。
離線讀資料
DataWorks讀取Kafka資料時,支援對Kafka資料進行JSON格式的解析,各資料模組的處理方式如下。
Kafka記錄資料模組 | 處理後的資料類型 |
key | 取決於資料同步任務配置的keyType配置項,keyType參數介紹請參見下文的全量參數說明章節。 |
value | 取決於資料同步任務配置的valueType配置項,valueType參數介紹請參見下文的全量參數說明章節。 |
offset | Long |
timestamp | Long |
headers | String |
partition | Long |
離線寫資料
DataWorks將資料寫入Kafka時,支援寫入JSON格式或text格式的資料,不同的資料同步方案往Kafka資料來源中寫入資料時,對資料的處理策略不一致,詳情如下。
寫入text格式的資料時,不會寫入欄位名資料,使用分隔字元來分隔欄位取值。
即時同步寫入資料到Kafka時,寫入的格式為內建的JSON格式,寫入資料為包含資料庫變更訊息的資料、業務時間和DDL資訊的所有資料,資料格式詳情請參見附錄:訊息格式。
同步任務類型 | 寫入Kafka value的格式 | 源端欄位類型 | 寫入時的處理方式 |
離線同步 DataStudio的離線同步節點 | json | 字串 | UTF8編碼字串 |
布爾 | 轉換為UTF8編碼字串"true"或者"false" | ||
時間/日期 | yyyy-MM-dd HH:mm:ss格式UTF8編碼字串 | ||
數值 | UTF8編碼數值字串 | ||
位元組流 | 位元組流會被視為UTF8編碼的字串,被轉換成字串 | ||
text | 字串 | UTF8編碼字串 | |
布爾 | 轉換為UTF8編碼字串"true"或者"false" | ||
時間/日期 | yyyy-MM-dd HH:mm:ss格式UTF8編碼字串 | ||
數值 | UTF8編碼數值字串 | ||
位元組流 | 位元組流會被視為UTF8編碼的字串,被轉換成字串 | ||
即時同步:ETL即時同步至Kafka DataStudio的即時同步節點 | json | 字串 | UTF8編碼字串 |
布爾 | json布爾類型 | ||
時間/日期 |
| ||
數值 | json數實值型別 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼的字串 | ||
text | 字串 | UTF8編碼字串 | |
布爾 | 轉換為UTF8編碼字串"true"或者"false" | ||
時間/日期 | yyyy-MM-dd HH:mm:ss格式UTF8編碼字串 | ||
數值 | UTF8編碼數值字串 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼字串 | ||
即時同步:整庫即時同步至Kafka 純即時同步增量資料 | 內建JSON格式 | 字串 | UTF8編碼字串 |
布爾 | json布爾類型 | ||
時間/日期 | 13位毫秒時間戳記 | ||
數值 | json數值 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼字串 | ||
同步解決方案:一鍵即時同步至Kafka 離線全量+即時增量 | 內建JSON格式 | 字串 | UTF8編碼字串 |
布爾 | json布爾類型 | ||
時間/日期 | 13位毫秒時間戳記 | ||
數值 | json數值 | ||
位元組流 | 位元組流會進行Base64編碼後轉換成UTF8編碼字串 |
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過嚮導模式配置離線同步任務、通過指令碼模式配置離線同步任務。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:指令碼Demo與參數說明。
單表、整庫即時同步任務配置指導
操作流程請參見DataStudio側即時同步任務配置。
單表、整庫全增量即時同步任務配置指導
操作流程請參見Data Integration側同步任務配置。
啟用認證配置說明
SSL
配置Kafka資料來源時,特殊認證方式選擇SSL或者SASL_SSL時,表明Kafka叢集開啟了SSL認證,您需要上傳用戶端truststore認證檔案並填寫truststore認證密碼。
如果Kafka叢集為alikafka執行個體,可以參考SSL認證演算法升級說明下載正確的truststore認證檔案,truststore認證密碼為KafkaOnsClient。
如果Kafka叢集為EMR執行個體,可以參考使用SSL加密Kafka連結下載正確的truststore認證檔案並擷取truststore認證密碼。
如果是自建叢集,請自行上傳正確的truststore認證,填寫正確的truststore認證密碼。
keystore認證檔案、keystore認證密碼和SSL密鑰密碼只有在Kafka叢集開啟雙向SSL認證時才需要進行配置,用於Kafka叢集服務端認證用戶端身份,Kafka叢集server.properties中ssl.client.auth=required時開啟雙向SSL認證,詳情請參見使用SSL加密Kafka連結。
GSSAPI
配置Kafka資料來源時,當Sasl機制選擇GSSAPI時,需要上傳三個認證檔案,分別是JAAS設定檔、Kerberos設定檔以及Keytab檔案,並在獨享資源群組進行DNS/HOST設定,下面分別介紹三種檔案以及獨享資源群組DNS、HOST的配置方式。
Serverless資源群組需要通過內網DNS解析配置Host地址資訊,更多資訊,請參見內網DNS解析(PrivateZone)。
JAAS設定檔
JAAS檔案必須以KafkaClient開頭,之後使用一個大括弧包含所有配置項:
大括弧內第一行定義使用的登入組件類,對於各類Sasl認證機制,登入組件類是固定的,後續的每個配置項以key=value格式書寫。
除最後一個配置項,其他配置項結尾不能有分號。
最後一個配置項結尾必須有分號,在大括弧"}"之後也必須加上一個分號。
不符合以上格式要求將導致JAAS設定檔解析出錯,典型的JAAS設定檔格式如下(根據實際情況替換以下內容中的xxx):
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="xxx" storeKey=true serviceName="kafka-server" principal="kafka-client@EXAMPLE.COM"; };
配置項
說明
登入模組
必須配置com.sun.security.auth.module.Krb5LoginModule。
useKeyTab
必須指定為true。
keyTab
可以指定任意路徑,在同步任務運行時會自動下載資料來源配置時上傳的keyTab檔案到本地,並使用本地檔案路徑填充keyTab配置項。
storeKey
決定用戶端是否儲存密鑰,配置true或者false均可,不影響資料同步。
serviceName
對應Kafka服務端server.properties設定檔中的sasl.kerberos.service.name配置項,請根據實際情況配置該項。
principal
Kafka用戶端使用的kerberos principal,請根據實際情況配置該項,並確保上傳的keyTab檔案包含該principal的密鑰。
Kerberos設定檔
Kerberos設定檔必須包含兩個模組[libdefaults]和[realms]
[libdefaults]模組指定Kerberos認證參數,模組中每個配置項以key=value格式書寫。
[realms]模組指定kdc地址,可以包含多個realm子模組,每個realm子模組以realm名稱=開頭。
後面緊跟一組用大括弧包含配置項,每個配置項也以key=value格式書寫,典型的Kerberos設定檔格式如下(根據實際情況替換以下內容中的xxx):
[libdefaults] default_realm = xxx [realms] xxx = { kdc = xxx }
配置項
說明
[libdefaults].default_realm
訪問Kafka叢集節點時預設使用的realm,一般情況下與JAAS設定檔中指定用戶端principal所在realm一致。
[libdefaults]其他參數
[libdefaults]模組可以指定其他一些kerberos認證參數,例如ticket_lifetime等,請根據實際需要配置。
[realms].realm名稱
需要與JAAS設定檔中指定用戶端principal所在realm,以及[libdefaults].default_realm一致,如果JAAS設定檔中指定用戶端principal所在realm和[libdefaults].default_realm不一致,則需要包含兩組realms子模組分別對應JAAS設定檔中指定用戶端principal所在realm和[libdefaults].default_realm。
[realms].realm名稱.kdc
以ip:port格式指定kdc地址和連接埠,例如kdc=10.0.0.1:88,連接埠如果省略預設使用88連接埠,例如kdc=10.0.0.1。
Keytab檔案
Keytab檔案需要包含JAAS設定檔指定principal的密鑰,並且能夠通過kdc的驗證。例如本地當前工作目錄有名為client.keytab的檔案,可以通過以下命令驗證Keytab檔案是否包含指定principal的密鑰。
klist -ket ./client.keytab Keytab name: FILE:client.keytab KVNO Timestamp Principal ---- ------------------- ------------------------------------------------------ 7 2018-07-30T10:19:16 test@demo.com (des-cbc-md5)
獨享資源群組DNS、HOST配置
開啟Kerberos認證的Kafka叢集,會使用Kafka叢集中節點的hostname作為節點在kdc(Kerberos的服務端程式,即密鑰分發中心)中註冊的principal的一部分,而用戶端訪問Kafka叢集節點時,會根據本地的DNS、HOST設定,推導Kafka叢集節點的principal,進而從kdc擷取節點的訪問憑證。使用獨享資源群組訪問開啟Kerberos認證的Kafka叢集時,需要正確配置DNS、HOST,以確保順利從kdc擷取Kafka叢集節點的訪問憑證:
DNS設定
當獨享資源群組綁定的VPC中,使用PrivateZone執行個體進行了Kafka叢集節點的網域名稱解析設定,則可以在DataWorks管控台,獨享資源群組對應的VPC繫結項目增加100.100.2.136和100.100.2.138兩個IP的自訂路由,即可使PrivateZone針對Kafka叢集節點的網域名稱解析設定對獨享資源群組生效。
HOST設定
當獨享資源群組綁定的VPC中,未使用PrivateZone執行個體進行了Kafka叢集節點的網域名稱解析設定,則需要在DataWorks管控台,獨享資源群組網路設定中逐個將Kafka叢集節點的IP地址與網域名稱映射添加到Host配置中。
PLAIN
配置Kafka資料來源時,當Sasl機制選擇PLAIN時,JAAS檔案必須以KafkaClient開頭,之後使用一個大括弧包含所有配置項。
大括弧內第一行定義使用的登入組件類,對於各類Sasl認證機制,登入組件類是固定的,後續的每個配置項以key=value格式書寫。
除最後一個配置項,其他配置項結尾不能有分號。
最後一個配置項結尾必須有分號,在大括弧"}"之後也必須加上一個分號。
不符合以上格式要求將導致JAAS設定檔解析出錯,典型的JAAS設定檔格式如下(根據實際情況替換以下內容中的xxx):
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="xxx"
password="xxx";
};
配置項 | 說明 |
登入模組 | 必須配置org.apache.kafka.common.security.plain.PlainLoginModul |
username | 使用者名稱,請根據實際情況配置該項。 |
password | 密碼,請根據實際情況配置該項。 |
常見問題
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
Reader指令碼參數
Writer指令碼Demo
Writer指令碼參數
附錄:寫入Kafka訊息格式定義
完成配置即時同步任務的操作後,執行同步任務會將源端資料庫讀取的資料,以JSON格式寫入到Kafka topic中。除了會將設定的源端表中已有資料全部寫入Kafka對應Topic中,還會啟動即時同步將增量資料持續寫入Kafka對應Topic中,同時源端表增量DDL變更資訊也會以JSON格式寫入Kafka對應Topic中。您可以通過附錄:訊息格式擷取寫入Kafka的訊息的狀態及變更等資訊。
通過離線同步任務寫入Kafka的資料JSON結構中的payload.sequenceId、payload.timestamp.eventTIme和payload.timestamp.checkpointTime欄位均設定為-1。