本文介紹如何使用訊息佇列Kafka連接器。
背景資訊
Apache Kafka是一款開源的分布式訊息佇列系統,廣泛用於高效能資料處理、流式分析、Data Integration等巨量資料領域。Kafka連接器基於開源Apache Kafka用戶端,為阿里雲Realtime ComputeFlink提供高效能的資料吞吐、多種資料格式的讀寫和精確一次語義的支援。
類別 | 詳情 |
支援類型 | 源表和結果表,資料攝入目標端 |
運行模式 | 流模式 |
資料格式 | |
特有監控指標 | |
API種類 | SQL,Datastream和資料攝入YAML |
是否支援更新或刪除結果表資料 | 不支援更新和刪除結果表資料,只支援插入資料。 說明 更新和刪除資料相關功能請參見Upsert Kafka。 |
前提條件
請根據需求選擇以下任意一種方式串連叢集:
串連阿里雲雲訊息佇列Kafka版叢集
Kafka叢集版本在0.11及以上。
ApsaraMQ for Kafka叢集已建立。詳情請參見建立資源。
Flink工作空間與Kafka叢集處於同一VPC內,且ApsaraMQ for Kafka已對Flink開放白名單,具體操作請參見配置白名單。
重要寫入阿里雲Kafka的限制:
阿里雲Kafka不支援zstd壓縮格式寫入。
阿里雲Kafka不支援等冪和事務寫入,無法使用Kafka結果表提供的精確一次語義exactly-once semantic功能。在使用Realtime Compute引擎VVR 8.0.0及以上時,需要在結果表中添加配置項
properties.enable.idempotence=false以關閉等冪寫入功能。阿里雲Kafka的儲存引擎對比與功能限制參見儲存引擎對比。
串連自建Apache Kafka叢集
注意事項
目前不推薦使用事務寫入,這是 Flink 社區和 Kafka 社區的設計缺陷所致。當設定sink.delivery-guarantee = exactly-once,Kafka Connector 會啟用事務寫入,存在三個已知問題:
每個 Checkpoint 會產生一個 Transaction ID。如果 Checkpoint 間隔太短,Transaction ID會過多。Kafka 叢集的 Coordinator 可能因此記憶體不足,從而破壞 Kafka 叢集的穩定性。
每個事務會建立一個 Producer 執行個體。如果同時提交的事務太多,TaskManager 的記憶體可能耗盡,從而破壞 Flink 作業的穩定性。
多個 Flink 作業若使用相同的
sink.transactional-id-prefix,它們產生的事務 ID 可能衝突。一個作業寫入失敗時,會阻塞 Kafka 分區的 LSO(Log Start Offset)前進,這會影響所有消費者讀取該分區的資料。
如果你需要 Exactly-Once 語義,改用 Upsert Kafka 寫入主鍵表,並用主鍵保證等冪性。如果需要使用事務寫入,請參見EXACTLY_ONCE語義注意事項。
網路連接排查
Flink作業啟動時若報錯Timed out waiting for a node assignment,通常是因為 Flink 與 Kafka 之間網路不通。
Kafka 用戶端串連服務端如下所示:
用戶端用
bootstrap.servers中的地址串連 Kafka。Kafka 返回叢集中各 broker 的中繼資料,包括它們的串連地址。
用戶端再用這些返回的地址串連各 broker,進行讀寫。
即使bootstrap.servers地址能通,若 Kafka 返回的 broker 地址錯誤,用戶端仍無法讀寫。這類問題常出現在使用代理、連接埠轉寄或專線等網路架構中。
排查步驟
訊息佇列Kafka
自建Kafka(ECS)
使用Flink開發控制台進行網路探測。
排除
bootstrap.servers地址連通性問題,確認內外網存取點正確性。檢查安全性群組與白名單
ECS 安全性群組必須允許存取 Kafka 存取點連接埠(通常為 9092 或 9093)。
ECS 執行個體需將 Flink 所在VPC加入白名單,詳情請參見查看VPC網段。
配置排查
登入 Kafka 所用的 ZooKeeper 叢集,使用 zkCli.sh 或 zookeeper-shell.sh工具。
執行命令擷取 broker 中繼資料。例如:
get /brokers/ids/0。在返回結果的endpoints欄位中,找到 Kafka 向用戶端通告的地址。
Flink開發控制台進行網路探測,測試該地址是否可達。
說明若不可達,請 Kafka 營運人員檢查並修正
listeners和advertised.listeners配置,確保返回的地址對 Flink 可訪問。更多關於Kafka用戶端與服務端的串連資訊,請參見Troubleshoot Connectivity。
檢查 SASL 配置(如啟用)
若使用 SASL_SSL存取點,必須在 Flink 作業中正確配置 JAAS、SSL 與 SASL 機制。缺少認證會導致串連在握手階段失敗,也可能表現為逾時,詳情請參見安全與認證。
SQL
Kafka連接器可以在SQL作業中使用,作為源表或者結果表。
文法結構
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)元資訊列
可以在源表和結果表中定義元資訊列,以擷取或寫入Kafka訊息的元資訊。例如,當WITH參數中定義了多個topic時,如果在Kafka源表中定義了元資訊列,那麼Flink讀取到的資料就會被標識是從哪個topic中讀取的資料。元資訊列的使用樣本如下。
CREATE TABLE kafka_source (
--讀取訊息所屬的topic作為`record_topic`欄位
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
--讀取ConsumerRecord中的時間戳記作為`ts`欄位
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
--讀取訊息的offset作為`record_offset`欄位
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
--將`ts`欄位中的時間戳記作為ProducerRecord的時間戳記寫入Kafka
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);下表列出了Kafka源表和結果表所支援的元資訊列。
Key | 資料類型 | 說明 | 源表或結果表 |
topic | STRING NOT NULL METADATA VIRTUAL | Kafka訊息所在的Topic名稱。 | 源表 |
partition | INT NOT NULL METADATA VIRTUAL | Kafka訊息所在的Partition ID。 | 源表 |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Kafka訊息的訊息頭(header)。 | 源表和結果表 |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka訊息的Leader epoch。 | 源表 |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka訊息的位移量(offset)。 | 源表 |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka訊息的時間戳記。 | 源表和結果表 |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka訊息的時間戳記類型:
| 源表 |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | Kafka原始訊息的Key欄位。 | 源表和結果表 說明 僅VVR 11.4及以上版本支援該參數。 |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | Kafka原始訊息的Value欄位。 | 源表和結果表 說明 僅VVR 11.4及以上版本支援該參數。 |
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為kafka。
properties.bootstrap.servers
Kafka broker地址。
String
是
無
格式為host:port,host:port,host:port,以英文逗號(,)分割。
properties.*
對Kafka用戶端的直接配置。
String
否
無
Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過
'properties.allow.auto.create.topics'='false'來禁用自動建立topic。不能通過該方式修改以下配置,因為它們會被Kafka連接器覆蓋:
key.deserializer
value.deserializer
format
讀取或寫入Kafka訊息value部分時使用的格式。
String
否
無
支援的格式
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
說明更多format參數設定請參見Format參數。
key.format
讀取或寫入Kafka訊息key部分時使用的格式。
String
否
無
支援的格式
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
說明使用該配置時,key.options配置是必填的。
key.fields
Kafka訊息key部分對應的源表或結果表欄位。
String
否
無
多個欄位名以分號(;)分隔。例如
field1;field2key.fields-prefix
為所有Kafka訊息key部分指定自訂首碼,以避免與訊息value部分格式欄位重名。
String
否
無
該配置項僅用於源表和結果表的列名區分,解析和產生Kafka訊息key部分時,該首碼會被移除。
說明使用該配置時,
value.fields-include必須配置為EXCEPT_KEY。value.format
讀取或寫入Kafka訊息value部分時使用的格式。
String
否
無
該配置等同於
format,只能設定format或value.format中的一個。如果同時配置,value.format會覆蓋format。value.fields-include
在解析或產生Kafka訊息value部分時,是否要包含訊息key部分對應的欄位。
String
否
ALL
參數取值如下:
ALL(預設值):所有列都會作為Kafka訊息value部分處理EXCEPT_KEY:除去key.fields定義的欄位,剩餘欄位作為Kafka訊息value部分處理
源表
參數
說明
資料類型
是否必填
預設值
備忘
topic
讀取的topic名稱。
String
否
無
以英文分號 (;) 分隔多個topic名稱,例如topic-1和topic-2
說明topic和topic-pattern兩個選項只能指定其中一個。
topic-pattern
匹配讀取topic名稱的Regex。所有匹配該Regex的topic在作業運行時均會被讀取。
String
否
無
說明topic和topic-pattern兩個選項只能指定其中一個。
properties.group.id
消費組ID。
String
否
KafkaSource-{源表表名}
如果指定的group id為首次使用,則必須將properties.auto.offset.reset設定為earliest或latest以指定初次開機位點。
scan.startup.mode
Kafka讀取資料的啟動位點。
String
否
group-offsets
取值如下:
earliest-offset:從Kafka最早的分區開始讀取。latest-offset:從Kafka最新位點開始讀取。group-offsets(預設值):從指定的properties.group.id已提交的位點開始讀取。timestamp:從scan.startup.timestamp-millis指定的時間戳記開始讀取。specific-offsets:從scan.startup.specific-offsets指定的位移量開始讀取。
說明該參數在作業無狀態啟動時生效。作業在從checkpoint重啟或狀態恢複時,會優先使用狀態中儲存的進度恢複讀取。
scan.startup.specific-offsets
specific-offsets啟動模式下,指定每個分區的啟動位移量。
String
否
無
例如
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
timestamp啟動模式下,指定啟動位點時間戳記。
Long
否
無
單位為毫秒
scan.topic-partition-discovery.interval
動態檢測Kafka topic和partition的時間間隔。
Duration
否
5分鐘
分區檢查間隔預設為5分鐘。需要顯式地設定分區檢查間隔為非正數才能關閉此功能。開啟動態分區發現後,Kafka Source 可以自動地發現新增的分區並自動讀取對應分區上的資料。在topic-pattern模式下,不僅讀取已有topic的新增分區資料,也會讀取符合正則匹配的新增topic的所有分區資料。
說明在Realtime Compute引擎VVR 6.0.x版本中,動態分區檢測預設為關閉。自8.0版本起該功能預設開啟,活動訊號間隔時間預設設定為5分鐘。
scan.header-filter
根據Kafka資料是否包含指定的訊息頭(Header)對資料進行條件過濾。
String
否
無
Header key和value使用冒號(:)分隔,多個header條件之間使用邏輯運算子(&、|)串連,支援取反邏輯運算子(!)。例如
depart:toy|depart:book&!env:test表示保留header中包含depart=toy或depart=book,且不包含env=test的Kafka資料。說明僅Realtime Compute引擎VVR 8.0.6及以上版本支援配置該參數。
暫不支援括弧運算。
邏輯運算順序為從左至右。
Header value會以UTF-8格式轉換為字串,與參數指定的header value進行比較。
scan.check.duplicated.group.id
是否檢查通過
properties.group.id指定的消費者組有重複。Boolean
否
false
參數取值如下:
true:在啟動作業前,系統會檢查消費者組是否存在重複。若發現重複,作業將報錯並停止運行,從而避免與現有消費者組產生衝突。
false:直接啟動作業,不檢查消費者組衝突。
說明僅VVR 6.0.4及以上版本支援該參數。
結果表
參數
說明
資料類型
是否必填
預設值
備忘
topic
寫入的topic名稱。
String
是
無
無
sink.partitioner
從Flink並發到Kafka分區的映射模式。
String
否
default
取值如下:
default(預設值):使用Kafka預設的分區模式
fixed:每個Flink並發對應一個固定的Kafka分區。
round-robin:Flink並發中的資料將被輪流分配至Kafka的各個分區。
自訂分區映射模式:如果fixed和round-robin不滿足需求,可以建立一個FlinkKafkaPartitioner的子類來自訂分區映射模式。例如org.mycompany.MyPartitioner
sink.delivery-guarantee
Kafka結果表的語義模式。
String
否
at-least-once
取值如下:
none:不保證任何語義,資料可能會丟失或重複。
at-least-once(預設值):保證資料不丟失,但可能會重複。
exactly-once:使用Kafka事務保證資料不會丟失和重複。
說明在使用exactly-once語義時,sink.transactional-id-prefix是必填的。
sink.transactional-id-prefix
在exactly-once語義下使用的Kafka事務ID首碼。
String
否
無
只有sink.delivery-guarantee配置為exactly-once時該配置才會生效。
sink.parallelism
Kafka結果表運算元的並發數。
Integer
否
無
上遊運算元的並發,由架構決定。
安全與認證
如果Kafka叢集要求安全連線或認證,請將相關的安全與認證配置添加properties.首碼後設定在WITH參數中。配置Kafka表以使用PLAIN作為SASL機制,並提供JAAS配置的樣本如下。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)使用SASL_SSL作為安全性通訊協定,並使用SCRAM-SHA-256作為SASL機制的樣本如下。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/*SSL配置*/
/*佈建服務端提供的truststore (CA 憑證) 的路徑*/
/*檔案管理上傳的檔案會存放在/flink/usrlib/路徑下*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/*如果要求用戶端認證,則需要配置keystore (私密金鑰) 的路徑*/
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/*用戶端驗證伺服器位址的演算法,空值表示禁用伺服器位址驗證*/
'properties.ssl.endpoint.identification.algorithm' = '',
/*SASL配置*/
/*將SASL機制配置為as SCRAM-SHA-256*/
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/*配置JAAS*/
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)樣本中提到的CA認證和私密金鑰可使用Realtime Compute控制台的檔案管理功能上傳至平台。上傳後檔案存放在/flink/usrlib目錄下,需要使用的CA認證檔案名稱為my-truststore.jks,則上傳後在WITH參數中有兩種方式可以設定'properties.ssl.truststore.location'來使用該認證:
配置
'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks',採用這種方式Flink運行期間不需要動態下載OSS檔案,但是不支援偵錯模式。Realtime Compute引擎版本為 VVR 11.5 及以上,可以配置
properties.ssl.truststore.location和properties.ssl.keystore.location為OSS絕對路徑地址,檔案路徑格式為oss://flink-fullymanaged-<工作空間ID>/artifacts/namespaces/<專案空間名稱>/<檔案名稱>。採用這種方式會在Flink運行期間動態下載OSS檔案,支援偵錯模式。
配置確認:上文中的樣本僅適用於大多數配置情況。在配置Kafka連接器前,請與Kafka服務端營運人員聯絡,以擷取正確的安全與認證配置資訊。
轉義說明:與開源Flink不同,Realtime ComputeFlink版的SQL編輯器預設已經對雙引號(")進行轉義處理,因此在配置
properties.sasl.jaas.config時無需對使用者名稱和密碼中的雙引號(")添加額外的轉義符(\)。
源表起始位點
啟動模式
Kafka源表可通過配置scan.startup.mode來指定初始讀取位點:
最早位點(earliest-offset):從當前分區的最早位點開始讀取。
最末尾位點(latest-offset):從當前分區的最末尾位點開始讀取。
已提交位點(group-offsets):從指定group id的已提交位點開始讀取,group id通過properties.group.id指定。
指定時間戳記(timestamp):從時間戳記大於等於指定時間的第一條訊息開始讀取,時間戳記通過scan.startup.timestamp-millis指定。
特錨點(specific-offsets):從指定的分區位點開始消費,位點通過scan.startup.specific-offsets指定。
如果不指定啟動位點,則預設會從已提交位點(group-offsets)啟動消費。
scan.startup.mode只針對無狀態啟動的作業生效,有狀態作業啟動時會從狀態中儲存的位點開始消費。
程式碼範例如下:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
--從最早位點開始消費
'scan.startup.mode' = 'earliest-offset',
--從最末尾位點開始消費
'scan.startup.mode' = 'latest-offset',
--從消費者組"my-group"的已提交位點開始消費
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- 如果 "my-group" 為首次使用,則從最早位點開始消費
'properties.auto.offset.reset' = 'latest', -- 如果 "my-group" 為首次使用,則從最末尾位點開始消費
--從指定的毫秒時間戳記1655395200000開始消費
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
--從指錨點開始消費
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);起始位點優先順序
源表起始位點的優先順序為:
優先順序從高到低 | Checkpoint或Savepoint中儲存的位點 |
Realtime Compute控制台作業啟動選擇的啟動時間 | |
WITH參數中通過scan.startup.mode指定的啟動位點 | |
未指定scan.startup.mode的情況下使用group-offsets,使用對應消費組的位點 |
在以上任何一個步驟中,如果位點到期或Kafka叢集發生問題等原因導致位點無效,則會使用properties.auto.offset.reset指定的策略進行位點重設,如果未設定該配置項,則會產生異常要求使用者介入。
一種常見情況是使用全新的group id開始消費。首先源表會向Kafka叢集查詢該group的已提交位點,由於該group id是第一次使用,不會查詢到有效位點,所以會通過properties.auto.offset.reset參數配置的策略進行重設。因此在使用全新group id進行消費時,必須配置properties.auto.offset.reset來指錨點重設策略。
源表位點提交
Kafka源表只在checkpoint成功後將當前消費位點提交至Kafka叢集。如果checkpoint間隔設定較長,在Kafka叢集側觀察到的消費位點會有延遲。在進行checkpoint時,Kafka源表會將當前讀取進度儲存在狀態中,並不依賴於提交到叢集上的位點進行故障恢複,提交位點僅僅是為了在Kafka側能夠監控到讀取進度,位點提交失敗不會對資料正確性產生任何影響。
結果表自訂分區器
如果內建的Kafka Producer分區模式無法滿足需求,可以實現自訂分區模式將資料寫入對應的分區。自訂分區器需要繼承FlinkKafkaPartitioner,開發完成後編譯JAR包,使用檔案管理功能上傳至Realtime Compute控制台。上傳並引用完成後,請在WITH參數中設定sink.partitioner參數,參數值為分區器完整的類路徑,如org.mycompany.MyPartitioner。
Kafka、Upsert Kafka或Kafka JSON catalog的選擇
Kafka是一種只能添加資料的訊息佇列系統,無法進行資料的更新和刪除操作,因此在流式SQL計算中無法處理上遊的CDC變更資料和彙總、聯合等運算元的回撤邏輯。如果需要將含有變更或回撤類型的資料寫入Kafka,請使用對變更資料進行特殊處理的Upsert Kafka結果表。
為了方便將上遊資料庫中一個或多個資料表中的變更資料批量同步到Kafka中,可以使用Kafka JSON catalog。如果Kafka中儲存的資料格式為JSON,使用Kafka JSON catalog可以省去定義schema和WITH參數的步驟。詳情可參見管理Kafka JSON Catalog。
使用樣本
樣本一:從Kafka中讀取資料後寫入Kafka
從名稱為源表的Topic中讀取Kafka資料,再寫入名稱為結果表的Topic,資料使用CSV格式。
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;樣本二:同步表結構以及資料
將Kafka Topic中的訊息即時同步到Hologres中。在該情況下,可以將Kafka訊息的offset和partition id作為主鍵,從而保證在Failover時,Hologres中不會有重複訊息。
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
--可選,將嵌套列全部展開。
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;樣本三:同步表結構以及Kafka訊息的key和value資料
Kafka訊息中的key部分已經儲存了相關資訊,可以同時同步Kafka中的key和value。
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Kafka訊息中的key部分不支援表結構變更和類型解析,需要手動聲明。
樣本四:同步表結構和資料並進行計算
在同步Kafka資料到Hologres時,往往需要一些輕量級的計算。
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
--使用COALESCE處理空值情況。樣本五:嵌套JSON解析
JSON訊息樣本
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "阿里雲",
"engine": "Flink"
}
}為避免後續使用 JSON_VALUE(payload, '$.properties.owner') 等函數解析欄位,可直接在 Source DDL 中定義結構:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);這樣,Flink會在讀取階段一次性將 JSON 解析為結構化欄位,後續 SQL 查詢直接使用 properties.owner,無需額外函數調用,整體效能更優。
Datastream API
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Realtime ComputeFlink版,DataStream連接器設定方法請參見DataStream連接器使用方法。
構建Kafka Source
Kafka Source提供了構建類來建立Kafka Source的執行個體。我們將通過以下範例程式碼介紹如何構建Kafka Source來消費input-topic最早位點的資料,消費組名稱為my-group,並將Kafka訊息體還原序列化為字串。
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");在構建KafkaSource時,必須指定以下參數。
參數
說明
BootstrapServers
Kafka Broker地址,通過setBootstrapServers(String)方法配置。
GroupId
消費者組ID,通過setGroupId(String)方法配置。
Topics或Partition
訂閱的Topic或Partition名稱。Kafka Source提供了以下三種Topic或Partition的訂閱者式:
Topic列表,訂閱Topic列表中所有Partition。
KafkaSource.builder().setTopics("topic-a","topic-b")Regex匹配,訂閱與Regex所匹配的Topic下的所有Partition。
KafkaSource.builder().setTopicPattern("topic.*")Partition列表,訂閱指定的Partition。
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
解析Kafka訊息的還原序列化器。
還原序列化器通過setDeserializer(KafkaRecordDeserializationSchema)來指定,其中KafkaRecordDeserializationSchema定義了如何解析Kafka的ConsumerRecord。如果只解析Kafka訊息中的訊息體(Value)的資料,可以通過以下任何一種方式實現:
使用Flink提供的KafkaSource構建類中的setValueOnlyDeserializer(DeserializationSchema)方法,其中DeserializationSchema定義了如何解析Kafka訊息體中的位元據。
使用Kafka提供的解析器,包括多種實作類別。例如,可以使用StringDeserializer來將Kafka訊息體解析成字串。
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
說明如果需要完整地解析ConsumerRecord,則需要自行實現KafkaRecordDeserializationSchema介面。
XML
Maven中央庫中已經放置了Kafka DataStream連接器。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>在使用Kafka DataStream連接器時,需要瞭解以下Kafka屬性:
起始消費位點
Kafka Source能夠通過位點初始化器(OffsetsInitializer)來指定從不同的位移量開始消費。內建的位點初始化器包括以下內容。
位點初始化器
代碼設定
從最早位點開始消費。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())從最末尾位點開始消費。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())從時間戳記大於等於指定時間的資料開始消費,單位為毫秒。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))從消費組提交的位點開始消費,如果提交位點不存在,使用最早位點。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))從消費組提交的位點開始消費,不指錨點重設策略。
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())說明如果以上內建的初始化器不能滿足需求,也可以自己實現自訂的位點初始化器。
如果未指錨點初始化器,則預設使用OffsetsInitializer.earliest(),即最早位點。
流模式和批模式
Kafka Source支援流式和批式兩種運行模式。預設情況下,Kafka Source設定為以流模式運行,因此作業永遠不會停止,直到Flink作業失敗或被取消。如果要配置Kafka Source在批模式下運行,可以使用setBounded(OffsetsInitializer)指定停止位移量,當所有分區都達到其停止位移量時,Kafka Source會退出運行。
說明通常,流模式下Kafka Source沒有停止位移量。為了方便對代碼進行調試,流模式下可以使用 setUnbounded(OffsetsInitializer) 指定停止位移量。請留意指定流模式和批模式停止位移量的方法名(setUnbounded 和 setBounded)是不同的。
動態分區檢查
為了在不重啟Flink作業的情況下,處理Topic擴容或建立Topic等情境,可以在提供的Topic或Partition訂閱模式下,啟用動態分區檢查功能。DataStream連接器中,該功能預設關閉,需要手動顯式配置開啟:
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // 每10秒檢查一次新分區。重要動態分區檢查功能依賴於Kafka叢集的元資訊更新機制。如果Kafka叢集未及時更新分區資訊,可能導致新分區無法被發現。請確保Kafka叢集的partition.discovery.interval.ms配置與實際情況匹配。
事件時間和浮水印
Kafka Source預設使用Kafka訊息中的時間戳記作為事件時間。可以自訂浮水印策略(Watermark Strategy)以從訊息中提取事件時間,並向下遊發送浮水印。
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")如果需要瞭解自訂浮水印策略(Watermark Strategy),請參見Generating Watermarks。
說明若並行Source的部分任務長期處於空閑狀態(如某個Kafka分區長時間無資料輸入,或Source並發數超過Kafka分區數量),可能導致Watermark產生機制失效。此時系統將無法正常觸發視窗計算,造成資料處理流程停滯。
為解決此類問題,可通過以下方式調整:
配置Watermark逾時機制:啟用table.exec.source.idle-timeout參數,強制系統在指定逾時時間後產生Watermark,確保視窗計算周期的推進。
資料來源最佳化:建議保持Kafka分區數與Source並發度的合理比例(推薦分區數≥Source並行度)。
消費位點提交
Kafka Source在Checkpoint完成時,提交當前的消費位點,以保證Flink的Checkpoint狀態和Kafka Broker上的提交位點一致。如果未開啟Checkpoint,Kafka Source依賴於Kafka Consumer內部的位點定時自動認可邏輯,自動認可功能由enable.auto.commit和 auto.commit.interval.ms兩個Kafka Consumer配置項進行配置。
說明Kafka Source不依賴於Broker上提交的位點來恢複失敗的作業。提交位點只是為了上報Kafka Consumer和消費組的消費進度,以在Broker端進行監控。
其他屬性
除了上述屬性之外,還可以使用setProperties(Properties) 和setProperty(String, String) 為Kafka Source和Kafka Consumer設定任意屬性。KafkaSource通常有以下配置項。
配置項
說明
client.id.prefix
指定用於Kafka Consumer的用戶端ID首碼。
partition.discovery.interval.ms
定義Kafka Source檢查新分區的時間間隔。
說明partition.discovery.interval.ms會在批模式下被覆蓋為-1。
register.consumer.metrics
指定是否在Flink中註冊Kafka Consumer的指標。
其他Kafka Consumer配置
Kafka Consumer的配置詳情,請參見Apache Kafka。
重要Kafka Connector會強制覆蓋部分手動設定的參數項,覆蓋詳情如下:
key.deserializer始終被覆蓋為ByteArrayDeserializer。
value.deserializer始終被覆蓋為ByteArrayDeserializer。
auto.offset.reset.strategy被覆蓋為OffsetsInitializer#getAutoOffsetResetStrategy()。
以下樣本展示如何配置Kafka Consumer,以使用PLAIN作為SASL機制並提供JAAS配置。
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")監控
Kafka Source在Flink中註冊指標,用於監控和診斷。
指標範圍
Kafka source reader的所有指標都註冊在KafkaSourceReader指標組下,KafkaSourceReader是operator指標組的子組。與特定主題分區相關的指標註冊在KafkaSourceReader.topic.<topic_name>.partition.<partition_id>指標組中。
例如Topic "my-topic"、分區1的當前消費位點(currentOffset)註冊在<some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset。成功提交位點的次數(commitsSucceeded)註冊在<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded。
指標列表
指標名稱
描述
範圍
currentOffset
當前消費位點
TopicPartition
committedOffset
當前提交位點
TopicPartition
commitsSucceeded
成功提交的次數
KafkaSourceReader
commitsFailed
失敗的提交次數
KafkaSourceReader
Kafka Consumer指標
Kafka Consumer的指標註冊在指標組KafkaSourceReader.KafkaConsumer。例如Kafka Consumer指標records-consumed-total註冊在<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total。
可以使用配置項register.consumer.metrics配置是否註冊Kafka消費者的指標。預設此選項設定為true。對於Kafka Consumer的指標,可參見Apache Kafka。
構建Kafka Sink
Flink Kafka Sink可以實現將流資料寫入一個或多個Kafka Topic。
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // // producer config .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // target topic .setKafkaValueSerializer(StringSerializer.class) // serialization schema .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // fault-tolerance .build(); stream.sinkTo(kafkaSink);需要配置以下參數。
參數
說明
Topic
資料寫入的預設Topic名稱。
資料序列化
構建時需要提供
KafkaRecordSerializationSchema來將輸入資料轉換為 Kafka的ProducerRecord。Flink提供了schema構建器,以提供一些通用的組件,例如訊息鍵(key)/訊息體(value)序列化、topic選擇、訊息分區,同樣也可以通過實現對應的介面來進行更豐富的控制。ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp)方法會在每條資料流入的時候調用,來產生ProducerRecord寫入Kafka。使用者可以對每條資料如何寫入Kafka進行細粒度地控制。通過ProducerRecord可以進行以下操作:
設定寫入的Topic名稱。
定義訊息鍵(Key)。
指定資料寫入的Partition。
Kafka用戶端屬性
bootstrap.servers必填,以逗號分隔的Kafka Broker列表。
容錯語義
啟用Flink的Checkpoint後,Flink Kafka Sink可以保證精確一次的語義。除了啟用Flink的Checkpoint外,還可以通過DeliveryGuarantee參數來指定不同的容錯語義,DeliveryGuarantee參數詳情如下:
DeliveryGuarantee.NONE:(預設設定)Flink不做任何保證,資料可能會丟失或重複。
DeliveryGuarantee.AT_LEAST_ONCE:保證不會丟失任何資料,但可能會重複。
DeliveryGuarantee.EXACTLY_ONCE:使用Kafka事務提供精確一次的語義保證。
說明使用EXACTLY_ONCE語義時,需要注意的事項請參見EXACTLY_ONCE語義注意事項。
資料攝入
Kafka連接器可以用於資料攝入YAML作業開發,作為源端讀取或目標端寫入。
使用限制
建議在Realtime Compute引擎VVR 11.1及以上版本使用Kafka作為Flink CDC資料攝入的同步資料來源。
僅支援JSON、Debezium JSON和Canal JSON格式,其他資料格式暫不支援。
對於資料來源,僅Realtime Compute引擎VVR 8.0.11及以上版本支援同一張表的資料分布在多個分區。
文法結構
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092配置項
通用
參數
說明
是否必填
資料類型
預設值
備忘
type
源端或目標端類型。
是
String
無
固定值為kafka
name
源端或目標端名稱。
否
String
無
無
properties.bootstrap.servers
Kafka broker地址。
是
String
無
格式為
host:port,host:port,host:port,以英文逗號(,)分割。properties.*
對Kafka用戶端的直接配置。
否
String
無
Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過
'properties.allow.auto.create.topics' = 'false'來禁用自動建立topic。key.format
讀取或寫入Kafka訊息key部分使用的格式。
否
String
無
對Source,僅支援json。
對Sink,取值如下:
csv
json
說明僅Realtime Compute引擎VVR 11.0.0及以上版本支援該參數。
value.format
讀取或寫入Kafka訊息value部分時使用的格式。
否
String
debezium-json
取值如下:
debezium-json
canal-json
json
說明僅Realtime Compute引擎VVR 8.0.10及以上版本支援debezium-json和canal-json格式。
僅Realtime Compute引擎VVR 11.0.0及以上版本支援json格式。
源表
參數
說明
是否必填
資料類型
預設值
備忘
topic
讀取的topic名稱。
否
String
無
以英文分號 (;) 分隔多個topic名稱,例如topic-1和topic-2
說明topic和topic-pattern兩個選項只能指定其中一個。
topic-pattern
匹配讀取topic名稱的Regex。所有匹配該Regex的topic在作業運行時均會被讀取。
否
String
無
說明topic和topic-pattern兩個選項只能指定其中一個。
properties.group.id
消費組ID。
否
String
無
如果指定的group id為首次使用,則必須將properties.auto.offset.reset設定為earliest或latest以指定初次開機位點。
scan.startup.mode
Kafka讀取資料的啟動位點。
否
String
group-offsets
取值如下:
earliest-offset:從Kafka最早分區開始讀取。
latest-offset:從Kafka最新位點開始讀取。
group-offsets(預設值):從指定的properties.group.id已提交的位點開始讀取。
timestamp:從scan.startup.timestamp-millis指定的時間戳記開始讀取。
specific-offsets:從scan.startup.specific-offsets指定的位移量開始讀取。
說明該參數在作業無狀態啟動時生效。作業在從checkpoint重啟或狀態恢複時,會優先使用狀態中儲存的進度恢複讀取。
scan.startup.specific-offsets
specific-offsets啟動模式下,指定每個分區的啟動位移量。
否
String
無
例如
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
timestamp啟動模式下,指定啟動位點時間戳記。
否
Long
無
單位為毫秒
scan.topic-partition-discovery.interval
動態檢測Kafka topic和partition的時間間隔。
否
Duration
5分鐘
分區檢查間隔預設為5分鐘。需要顯式地設定分區檢查間隔為非正數才能關閉此功能。開啟動態分區發現後,Kafka Source 可以自動地發現新增的分區並自動讀取對應分區上的資料。在topic-pattern模式下,不僅讀取已有topic的新增分區資料,也會讀取符合正則匹配的新增topic的所有分區資料。
scan.check.duplicated.group.id
是否檢查通過
properties.group.id指定的消費者組有重複。否
Boolean
false
參數取值如下:
true:在啟動作業前檢查消費者組是否有重複,如有重複作業將會報錯,避免與現有的消費者組產生衝突。
false:直接啟動作業,不檢查消費者組衝突。
schema.inference.strategy
Schema解析策略。
否
String
continuous
取值如下:
continuous:對每條資料均進行 Schema 解析。在前後 Schema 不相容時,解析出更寬的 Schema 併產生 Schema 變更事件。
static:僅在作業啟動時進行一次Schema解析,後續根據初始Schema解析資料,不會產生Schema變更事件。
說明Schema解析詳情可見表結構解析和變更同步策略。
僅VVR 8.0.11及以上版本支援該配置項。
scan.max.pre.fetch.records
Schema初始解析時,對每個分區最多嘗試消費解析的訊息數量
否
Int
50
在作業實際讀取並處理資料前,對每個分區嘗試提前消費指定數量的最新訊息,用於初始化Schema資訊。
key.fields-prefix
自訂添加到訊息鍵(Key)解析出欄位名稱的首碼,以避免Kafka訊息鍵解析後的命名衝突問題。
否
String
無
假設該配置項設為key_,當key中包含欄位名a時,解析key後該欄位名稱為key_a。
說明key.fields-prefix的配置值不可以是value.fields-prefix的首碼。
value.fields-prefix
自訂添加到訊息體(Value)解析出欄位名稱的首碼,以避免Kafka訊息體解析後的命名衝突問題。
否
String
無
假設該配置項設為value_,當value中包含欄位名b時,解析value後該欄位名稱為value_b。
說明value.fields-prefix的配置值不可以是key.fields-prefix的首碼。
metadata.list
需要傳遞給下遊的中繼資料列
否
String
無
可用的中繼資料列包括
topic、partition、offset、timestamp、timestamp-type、headers、leader-epoch、__raw_key__、__raw_value__,使用英文逗號分隔。scan.value.initial-schemas.ddls
通過DDL方式指定某些表的初始Schema。
否
String
無
多個DDL使用英文
;串連。例如,使用CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);為表db1.t1和表db1.t2分別指定初始Schema。這裡的DDL表結構應當和寫入目標表保持一致,並且滿足Flink SQL的文法規則。
說明VVR 11.5及以上版本支援該配置。
ingestion.ignore-errors
是否忽略資料解析過程中的報錯。
否
Boolean
false
說明VVR 11.5及以上版本支援該配置。
ingestion.error-tolerance.max-count
忽略資料解析過程報錯時,累計多少次報錯後作業失敗。
否
Integer
-1
僅當ingestion.ignore-errors開啟時生效,預設值-1表示解析異常不觸發作業失敗。
說明VVR 11.5及以上版本支援該配置。
源表 Debezium JSON 格式
參數
是否必填
資料類型
預設值
描述
debezium-json.distributed-tables
否
Boolean
false
如果Debezium JSON內單張表資料會出現在多個分區,則需要開啟此選項。
說明僅VVR 8.0.11及以上版本支援該配置項。
重要修改該配置項後,需要無狀態啟動作業。
debezium-json.schema-include
否
Boolean
false
設定Debezium Kafka Connect時,可以啟用Kafka配置value.converter.schemas.enable,以在訊息中包含schema。此選項表明Debezium JSON訊息是否包含schema。
參數取值如下:
true:Debezium JSON訊息包含schema。
false:Debezium JSON訊息不包含schema。
debezium-json.ignore-parse-errors
否
Boolean
false
參數取值如下:
true:當解析異常時,跳過當前行。
false(預設值):報出錯誤,作業啟動失敗。
debezium-json.infer-schema.primitive-as-string
否
Boolean
false
解析表結構時,是否解析所有類型為String類型。
參數取值如下:
true:解析所有基本類型為String。
false(預設值):按照基本規則進行解析。
源表 Canal JSON 格式
參數
是否必填
資料類型
預設值
描述
canal-json.distributed-tables
否
Boolean
false
如果Canal JSON內單張表資料會出現在多個分區,則需要開啟此選項。
說明僅VVR 8.0.11及以上版本支援該配置項。
重要修改該配置項後,需要無狀態啟動作業。
canal-json.database.include
否
String
無
一個可選的Regex,通過正則匹配Canal記錄中的database元欄位,僅讀取指定資料庫的changelog記錄。正則字串與Java的Pattern相容。
canal-json.table.include
否
String
無
一個可選的Regex,通過正則匹配Canal記錄中的table元欄位,僅讀取指定表的changelog記錄。正則字串與Java的Pattern相容。
canal-json.ignore-parse-errors
否
Boolean
false
參數取值如下:
true:當解析異常時,跳過當前行。
false(預設值):報出錯誤,作業啟動失敗。
canal-json.infer-schema.primitive-as-string
否
Boolean
false
解析表結構時,是否所有類型解析為String類型。
參數取值如下:
true:解析所有基本類型為String。
false(預設值):按照基本規則進行解析。
canal-json.infer-schema.strategy
否
String
AUTO
解析表結構時的解析策略。
參數取值如下:
AUTO(預設值):通過解析JSON資料自動解析。如果資料中不包含sqlType欄位,建議使用AUTO以避免解析失敗。
SQL_TYPE:通過canal json資料中的sqlType數組解析。如果資料包含sqlType欄位時,建議將canal-json.infer-schema.strategy設定為SQL_TYPE以獲得更精確的類型。
MYSQL_TYPE:通過canal json資料中的mysqlType數組解析。
當Kafka中的Canal JSON資料包含sqlType欄位且需要更精確的類型映射時,建議將canal-json.infer-schema.strategy設定為SQL_TYPE。
sqlType類型映射規則請見Canal JSON的Schema解析。
說明VVR 11.1及以上版本支援該配置。
MYSQL_TYPE在VVR 11.3及以上版本支援使用。
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
否
Boolean
true
是否將mysql timestamp類型映射到cdc timestamp類型:
true(預設):mysql timestamp類型映射到cdc timestamp類型。
false:mysql timestamp類型映射到cdc timestamp_ltz類型。
canal-json.mysql.treat-tinyint1-as-boolean.enabled
否
Boolean
true
使用MYSQL_TYPE解析時,是否將mysql tinyint(1)類型映射到cdc boolean類型:
true(預設):mysql tinyint(1)類型映射到cdc boolean類型。
false:mysql tinyint(1)類型映射到cdc tinyint(1)類型。
該配置僅當canal-json.infer-schema.strategy配置為MYSQL_TYPE時生效。
源表 JSON 格式
參數
是否必填
資料類型
預設值
描述
json.timestamp-format.standard
否
String
SQL
指定輸入和輸出時間戳記格式。參數取值如下:
SQL:解析yyyy-MM-dd HH:mm:ss.s{precision}格式的輸入時間戳記,例如2020-12-30 12:13:14.123。
ISO-8601:解析yyyy-MM-ddTHH:mm:ss.s{precision}格式的輸入時間戳記,例如2020-12-30T12:13:14.123。
json.ignore-parse-errors
否
Boolean
false
參數取值如下:
true:當解析異常時,跳過當前行。
false(預設值):報出錯誤,作業啟動失敗。
json.infer-schema.primitive-as-string
否
Boolean
false
解析表結構時,是否解析所有類型為String類型。
參數取值如下:
true:解析所有基本類型為String。
false(預設值):按照基本規則進行解析。
json.infer-schema.flatten-nested-columns.enable
否
Boolean
false
解析JSON格式資料時,是否遞迴式地展開JSON中的嵌套列。參數取值如下:
true:遞迴式展開。
false(預設值):將嵌套列當作String處理。
json.decode.parser-table-id.fields
否
String
無
解析JSON格式資料時,是否使用部分JSON欄位值產生tableId,多個欄位使用英文
,串連。例如:JSON資料為{"col0":"a", "col1","b", "col2","c"},產生結果如下:配置
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
json.infer-schema.fixed-types
否
String
無
解析JSON格式資料時,指定某些欄位的具體類型,多個欄位使用英文
,串連。例如:id BIGINT, name VARCHAR(10)可以指定JSON資料中的id欄位類型為BIGINT,name欄位類型為VARCHAR(10)。使用這個配置時,需要額外添加
scan.max.pre.fetch.records: 0配置。說明VVR 11.5及以上版本支援該配置。
結果表
參數
說明
是否必填
資料類型
預設值
備忘
type
目標端類型。
是
String
無
固定值為kafka
name
目標端名稱。
否
String
無
無
topic
Kafka Topic名稱。
否
String
無
開啟時,所有的資料都會寫入這個Topic。
說明如果沒有開啟,每條資料會寫入到其TableID對應字串(通過
.拼接產生)的Topic,例如databaseName.tableName。partition.strategy
資料寫入Kafka分區的策略。
否
String
all-to-zero
取值如下:
all-to-zero(預設值):將所有資料寫入 0 號分區。
hash-by-key:根據主鍵的雜湊值將資料寫到多個分區。保證同一個主鍵的資料在同一個分區並且有序。
sink.tableId-to-topic.mapping
上遊表名到下遊 Kafka Topic名的映射關係。
否
String
無
每個映射關係由
;分割,上遊表的表名和下遊 Kafka 的Topic名由:分割,表名可以使用Regex,映射到同一個topic的多張表可以使用,拼接。例如mydb.mytable1:topic1;mydb.mytable2:topic2。說明配置這個參數能夠在保留原始表名資訊的同時修改映射的Topic。
結果表Debezium JSON格式
參數
是否必填
資料類型
預設值
描述
debezium-json.include-schema.enabled
否
Boolean
false
Debezium JSON資料中是否包含Schema資訊。
使用樣本
使用 Kafka 作為資料攝入源端:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADEN使用 Kafka 作為資料攝入目標端:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}其中,使用route模組以設定源表寫入Kafka的Topic名稱。
阿里雲Kafka預設不開啟自動建立Topic功能,參見自動化建立Topic相關問題,寫入到阿里雲Kafka時,需要預先建立對應的Topic,詳情請參見步驟三:建立資源。
表結構解析和變更同步策略
Kafka連接器會維護當前已知的所有表的Schema。
表結構資訊初始化
表結構資訊包含欄位和資料類型資訊、庫表資訊和主鍵資訊,這三種資訊的初始化方式如下:
欄位和資料類型資訊
資料攝入作業能夠根據資料自動推匯出表的欄位和資料類型資訊,然而在某些情境下,您可能希望指定某些表的欄位和類型,根據使用者指定欄位類型的粒度,表結構資訊的初始化有以下三種配置策略:
完全由程式推導表結構
在讀取Kafka資料前,Kafka連接器會預先在每個分區中嘗試消費最多scan.max.pre.fetch.records條訊息,解析每條資料的Schema,再將這些Schema合并,用於初始化表結構資訊。後續在實際消費資料前會根據初始化的Schema產生對應的建表事件。
對於Debezium JSON和Canal JSON格式,表資訊在具體訊息中,提前消費的scan.max.pre.fetch.records條訊息中可能包含了若干個表的資料,因此對每張表而言,提前消費的資料條數無法確定。預消費和初始化表結構資訊只會在實際消費和處理每個分區的訊息前進行一次,若後續有新表資料,該表的第一條資料解析出的表結構會作為初始表結構,不會重新預消費和初始化對應的表結構。
僅VVR 8.0.11及以上版本支援單表資料分布在多個分區,對於該情境需要將配置項debezium-json.distributed-tables或canal-json.distributed-tables設為true。
指定初始表結構
在某些情境下,您希望自行指定初始的表結構,例如將 Kafka 的資料寫入到一張預先建立的下遊表中。這時您可以通過添加scan.value.initial-schemas.ddls參數指定初始的表結構。配置樣本如下:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 設定初始表結構
scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);這裡的建表語句需要和目標表的表結構保持一致。這樣,我們就為 db1.t1 這張表指定了 id 這個欄位的初始類型為 BIGINT,name 這個欄位的初始類型為 VARCHAR(10) ,為 db1.t2 這張表指定了 id 這個欄位的初始類型為 BIGINT。
這裡的建表語句使用的是 FlinkSQL 的文法。
指定欄位設定為固定類型
在某些情境下,您希望固定特定欄位的資料類型,例如對於某些可能被推導為 TIMESTAMP 類型的欄位,您希望以字串的格式下發,這時您可以通過添加json.infer-schema.fixed-types參數指定初始的表結構(僅在訊息格式為 json 時有效)。配置樣本如下:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 設定特定欄位始終為固定類型
json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
scan.max.pre.fetch.records: 0這樣,我們就為所有的 id 欄位指定了類型固定為 BIGINT,所有的 name 欄位指定了類型固定為 VARCHAR(10)。
這裡的類型與 FlinkSQL 的資料類型一致。
庫表資訊
對於Canal JSON和Debezium JSON格式,表資訊由具體訊息解析得到,包括資料庫和表名。
對於JSON格式,預設配置下,表資訊僅包含表名,即資料所在的topic名稱。如果您的資料中包含庫表資訊,可以通過json.infer-schema.fixed-types指定庫表資訊所在的欄位,我們會將這些欄位對應到庫名/表名中。配置樣本如下:
source: type: kafka name: Kafka Source properties.bootstrap.servers: host:9092 topic: test-topic value.format: json scan.startup.mode: earliest-offset # 使用 col1 欄位中的值作為庫名,使用 col2 欄位中的值作為表名 json.decode.parser-table-id.fields: col1,col2這樣,我們會將每條資料下發到庫名為col1欄位的值,表名為col2欄位的值的表中。
主鍵資訊
對於Canal JSON格式,會根據JSON中的pkNames欄位定義表的主鍵。
對於Debezium JSON和JSON格式,JSON中不包含主鍵資訊,可以通過transform規則手動為表添加主鍵:
transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
Schema解析和Schema變更
在表結構初始化完成後,若schema.inference.strategy配置為static,Kafka連接器會根據初始的表結構解析每個訊息的訊息體(Value),不會產生Schema變更事件。若schema.inference.strategy配置為continuous,Kafka連接器會解析每個Kafka訊息的訊息體,解析出訊息的物理列,並與當前維護的Schema比對,若解析出的Schema與當前Schema不一致時,會嘗試將Schema合并,同時產生對應的表結構變更事件,合并規則如下:
如果解析出的物理列中包含當前Schema中沒有的欄位,則會將這些欄位加入到Schema中,同時產生新增可空列事件。
如果解析出的物理列中不包含當前Schema中已有的欄位,該欄位仍會保留,該列的資料會填充為NULL,不產生刪除列事件。
如果兩者出現了同名列,則按照以下情境進行處理:
當類型相同且精度不同時,會取兩者中較大精度的類型,同時產生列類型變更事件。
當類型不同時,會按照如下圖的樹形結構找到最小父節點,作為該同名列的類型,同時產生列類型變更事件。

當前支援的Schema變更策略如下:
添加列:會在當前Schema末尾添加對應的列,並同步新增列的資料,新增的列會設定為可空列。
刪除列:不會產生刪除列事件,而是後續將該列的資料自動填滿為NULL值。
重新命名列:被看作為添加列和刪除列,在當前Schema末尾添加重新命名後的列,並將重新命名前的列資料填充為NULL值。
列類型變更:
對於支援列類型變更的下遊系統,在下遊Sink支援處理列類型變更後,資料攝入作業支援普通列的類型變更,例如,從INT類型變更到BIGINT類型。此類變更依賴於下遊Sink支援的列類型變更規則,不同的結果表支援的列類型變更規則也不相同,請參考結果表文檔擷取其支援的列類型變更規則。
對於不支援列類型變更的下遊系統,比如Hologres,此類情境可以使用寬類型映射,即作業啟動時在下遊系統建立類型更加寬泛的表,在列類型變更發生時判斷該類型變更下遊Sink是否可以接受從而實現寬容的列類型變更支援。
當前暫不支援的Schema變更:
主鍵或索引等約束的變更。
從NOT NULL轉為NULLABLE變更。
Canal JSON的Schema解析
Canal JSON資料中可能包含可選的sqlType欄位,其中記錄了資料列的精確類型資訊。為了擷取更準確的Schema,可以通過將canal-json.infer-schema.strategy配置為SQL_TYPE使用sqlType中的類型。類型映射關係如下:
JDBC類型
Type Code
CDC類型
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
-5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
其他類型
髒資料容忍與收集
在某些情況下,您的 Kafka 資料來源中可能包含一些格式不正確的資料(髒資料),為了避免同步作業因為這些髒資料導致頻繁失敗重啟,您可以配置忽略這部分異常資料。配置樣本如下:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 開啟髒資料容忍功能
ingestion.ignore-errors: true
# 容忍 1000 條髒資料
ingestion.error-tolerance.max-count: 1000這裡配置了忽略 1000 條髒資料的策略,讓您的作業能夠在存在小批量髒資料時正常運行,而在髒資料條數超過這個閾值時讓作業進入失敗狀態,提醒您對資料進行校正。
如果您希望作業永遠不因為髒資料的存在導致失敗,可以採用如下的配置:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 開啟髒資料容忍功能
ingestion.ignore-errors: true
# 容忍所有的髒資料
ingestion.error-tolerance.max-count: -1髒資料容忍策略保障了作業不會因為異常資料頻繁失敗,您可能還希望進一步的瞭解這些髒資料的資訊以調整 Kafka 資料生產者的行為,參考髒資料收集中介紹的流程,您可以在 TaskManager 日誌中查看到作業的髒資料,配置樣本如下:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# 開啟髒資料容忍功能
ingestion.ignore-errors: true
# 容忍所有的髒資料
ingestion.error-tolerance.max-count: -1
pipeline:
dirty-data.collector:
# 將髒資料寫入 TaskManager 的記錄檔中
type: logger表名與Topic的映射策略
在使用kafka作為資料攝入作業的目標端時,由於寫入到Kafka訊息格式(debezium-json或者canal-json)中還包含表名資訊,後續消費Kafka訊息時往往以資料中的表名資訊作為實際表名(而非topic名稱),因此需要謹慎配置表名與Topic的映射策略。
假設在MySQL有mydb.mytable1,mydb.mytable2兩張表需要同步,可能的配置策略有以下幾種:
1. 不配置任何映射策略
在沒有任何映射策略的情況下,每張表會寫入到對應的由“庫名.表名”組成的topic中。因此mydb.mytable1的資料會寫入到名為mydb.mytable1的topic中,mydb.mytable2的資料會寫入到名為mydb.mytable2的topic中。配置樣本如下:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. 配置route規則進行映射(不推薦)
在很多情境下,使用者不希望寫入的topic直接為“庫名.表名”的格式,希望將資料寫入到指定的topic中,因此會配置route規則進行映射。配置樣本如下:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytable1此時所有來自mydb.mytable1,mydb.mytable2的資料都會寫入到mytable1這一個topic中。
然而,通過route規則修改寫入的topic名稱時,也會修改Kafka訊息(debezium-json或者canal-json格式)中的表名資訊,此時Kafka訊息中所有的表名都為mytable1,在其他系統消費這個topic的Kafka訊息時,可能出現不符合預期的情況。
3. 配置sink.tableId-to-topic.mapping參數進行映射(推薦)
為了在配置表名與Topic的映射規則的同時保留源表表名資訊,可以使用sink.tableId-to-topic.mapping參數完成該需求。配置樣本如下:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}或者
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}此時所有來自mydb.mytable1,mydb.mytable2的資料都會寫入到mytable1這一個topic中,並且Kafka訊息(debezium-json或者canal-json格式)中的表名資訊仍然為mydb.mytable1或者mydb.mytable2,在其他系統消費這個topic的Kafka訊息時,能正確擷取源表表名資訊。
EXACTLY_ONCE語義注意事項
配置消費者隔離等級
所有消費 Kafka 資料的應用必須設定 isolation.level:
read_committed:唯讀取已提交的資料。read_uncommitted(預設):可讀取未提交的資料。
EXACTLY_ONCE 依賴
read_committed。否則消費者可能看到未提交資料,破壞一致性。事務逾時與資料丟失
Flink 從 Checkpoint 恢複時,僅依賴該 Checkpoint 開始前已提交的事務。如果作業崩潰到重啟的時間超過 Kafka 事務逾時,Kafka 會自動中止事務,導致資料丟失。
Kafka Broker 預設
transaction.max.timeout.ms= 15 分鐘。Flink Kafka Sink 預設設定
transaction.timeout.ms= 1 小時。你必須在 Broker 端提高
transaction.max.timeout.ms,使其不小於 Flink 的設定。
Producer 池與 Checkpoint 並發
EXACTLY_ONCE 模式使用固定大小的 Kafka Producer 池。每個 Checkpoint 佔用池中的一個 Producer。如果並發 Checkpoint 數超過池大小,作業會失敗。
請根據最大並發 Checkpoint 數調整 Producer 池大小。
並行度縮容限制
如果作業在第一個 Checkpoint 前失敗,重啟後不會保留原有 Producer 池資訊。因此,在第一個 Checkpoint 完成前,不要縮減作業並行度。如必須縮容,並行度不得低於
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR。事務阻塞讀取
在
read_committed模式下,任何未結束(未提交也未中止)的事務會阻塞整個 Topic 的讀取。例如:
事務 1 寫入資料。
事務 2 寫入並提交資料。
只要事務 1 未結束,事務 2 的資料對消費者不可見。
因此:
正常運行時,資料可見延遲約等於 Checkpoint 間隔。
作業失敗時,正在寫入的 Topic 會阻塞消費者,直到作業重啟或事務逾時,極端情況下,事務逾時甚至會影響讀取。