背景資訊
Apache Kafka是一款開源的分布式訊息佇列系統,廣泛用於高效能資料處理、流式分析、Data Integration等巨量資料領域。Kafka連接器基於開源Apache Kafka用戶端,為阿里雲Realtime ComputeFlink提供高效能的資料吞吐、多種資料格式的讀寫和精確一次語義的支援。
類別 | 詳情 |
支援類型 | 源表和結果表,資料攝入目標端 |
運行模式 | 流模式 |
資料格式 | CSV JSON Apache Avro Confluent Avro Debezium JSON Canal JSON Maxwell JSON Raw Protobuf
|
特有監控指標 | 源表 numRecordsIn numRecordsInPerSecond numBytesIn numBytesInPerScond currentEmitEventTimeLag currentFetchEventTimeLag sourceIdleTime pendingRecords
結果表 numRecordsOut numRecordsOutPerSecond numBytesOut numBytesOutPerSecond currentSendTime
|
API種類 | SQL,Datastream和資料攝入YAML |
是否支援更新或刪除結果表資料 | 不支援更新和刪除結果表資料,只支援插入資料。 |
前提條件
您可以根據需求選擇以下任意一種方式串連叢集:
串連阿里雲雲訊息佇列Kafka版叢集
串連自建Apache Kafka叢集
自建Apache Kafka叢集版本在0.11及以上。
Flink與自建Apache Kafka叢集之間的網路已打通。如何通過公網串連自建叢集,詳情請參見如何訪問公網?
僅支援Apache Kafka 2.8版本的用戶端配置項,詳情請參見Apache Kafka消費者和生產者配置項文檔。
使用限制
CREATE TABLE AS(CTAS)的使用限制
僅Realtime Compute引擎VVR 4.0.12及以上版本支援Kafka作為CREATE TABLE AS(CTAS)的同步資料來源。
僅支援JSON格式的類型推導和schema變更,其它資料格式暫不支援。
僅支援Kafka中value部分的類型推導和表結構變更。如果您需要同步Kafka key部分的列,則需要您手動在DDL中進行指定。詳情請參見樣本三。
資料攝入YAML的使用限制
僅Realtime Compute引擎VVR 8.0.10及以上版本支援Kafka作為資料攝入YAML的同步資料來源。
僅支援Debezium JSON和Canal JSON格式,其他資料格式暫不支援。
對於資料來源,僅Realtime Compute引擎VVR 8.0.11及以上版本支援同一張表的資料分布在多個分區。
注意事項
Realtime Compute引擎8.0.11及以下版本,由於Flink社區和Kafka社區的設計缺陷,不推薦使用事務寫入。當Kafka Connector設定 'sink.delivery-guarantee' = 'exactly-once'
時會使用Kafka事務寫入。事務寫入存在如下問題:
Kafka Connector 當前的 Sink 實現,會在 Flink作業的每個 Checkpoint 期間產生一個 Transaction ID用於寫入資料的事務提交,如果Flink作業的Checkpoint 間隔設定過小,可能會導致下遊Kafka叢集Coordinator組件因為Transaction ID過多引發記憶體不足,進而影響整個下遊Kafka叢集的穩定性。
Kafka Connector 當前的 Sink 實現,會為每個事務建立一個 Producer 執行個體,如果同時提交的Kafka寫入事務較多,可能導致 Flink 作業的 TaskManager 組件因為Transaction ID執行個體過多引發記憶體不足,影響當前Flink作業的穩定性。
如果多個Flink作業使用相同的事務ID首碼(sink.transactional-id-prefix),不同Flink作業通過事務ID首碼+Checkpoint ID產生的事務ID可能相互影響,在某個Flink作業寫入異常時,可能會阻塞Kafka叢集上的分區LSO(Log Start Offset)位點前進,影響Kafka叢集上各個分區的資料消費。
如果需要Exactly-Once語義,可以通過Upsert Kafka向主鍵表寫入,以主鍵來保證等冪性。
網路連接排查
如果您的Flink作業在啟動時出現Timed out waiting for a node assignment
錯誤,一般是Flink和Kafka之間的網路連通問題導致的。
Kafka用戶端與服務端建立串連的過程如下所示。
用戶端使用您指定的properties.bootstrap.servers地址串連Kafka服務端,Kafka服務端根據配置向用戶端返回叢集中各台broker的元資訊,包括各台broker的串連地址。
用戶端使用第一步broker返回的串連地址串連各台broker進行讀取或寫入。
如果Kafka服務端沒有正確配置,用戶端在第一步收到的串連地址有誤,即使properties.bootstrap.servers配置的地址可以串連上,也無法正常讀取或寫入資料。該問題經常在Flink與Kafka之間存在代理、連接埠轉寄、專線等網路轉寄機制時發生。
您可以按照以下步驟檢查Kafka叢集是否配置正確。
使用Zookeeper命令列工具(zkCli.sh或zookeeper-shell.sh)登入到您Kafka所使用的Zookeeper叢集。
根據您的叢集實際情況執行正確的命令,來擷取您的Kafka broker元資訊。通常可以使用get /brokers/ids/0
命令來擷取Kafka broker元資訊。Kafka broker的串連地址位於endpoints欄位中,該地址即為上述串連過程中服務端向用戶端返回的串連地址,資訊如下圖所示。
使用ping或telnet等命令來測試endpoint中顯示的地址與Flink的連通性。如果無法連通該地址,請聯絡您的Kafka營運修改Kafka配置,為Flink單獨配置listeners和advertised.listeners。
SQL
Kafka連接器可以在SQL作業中使用,作為源表或者結果表。
文法結構
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
元資訊列
您可以在源表和結果表中定義元資訊列,以擷取或寫入Kafka訊息的元資訊。例如,當WITH參數中定義了多個topic時,如果在Kafka源表中定義了元資訊列,那麼Flink讀取到的資料就會被標識是從哪個topic中讀取的資料。元資訊列的使用樣本如下。
CREATE TABLE kafka_source (
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
下表列出了Kafka源表和結果表所支援的元資訊列。
Key | 資料類型 | 說明 | 源表或結果表 |
topic | STRING NOT NULL METADATA VIRTUAL | Kafka訊息所在的Topic名稱。 | 源表 |
partition | INT NOT NULL METADATA VIRTUAL | Kafka訊息所在的Partition ID。 | 源表 |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Kafka訊息的訊息頭(header)。 | 源表和結果表 |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka訊息的Leader epoch。 | 源表 |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka訊息的位移量(offset)。 | 源表 |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka訊息的時間戳記。 | 源表和結果表 |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka訊息的時間戳記類型: | 源表 |
安全與認證
如果您的Kafka叢集要求安全連線或認證,請將相關的安全與認證配置添加properties.
首碼後設定在WITH參數中。配置Kafka表以使用PLAIN作為SASL機制,並提供JAAS配置的樣本如下。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)
使用SASL_SSL作為安全性通訊協定,並使用SCRAM-SHA-256作為SASL機制的樣本如下。
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
'properties.ssl.endpoint.identification.algorithm' = '',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)
樣本中提到的CA認證和私密金鑰可使用Realtime Compute控制台的檔案管理功能上傳至平台。例如,上傳後檔案存放在/flink/usrlib
目錄下,需要使用的CA認證檔案名稱為my-truststore.jks,則上傳後您可以在WITH參數中指定'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'
來使用該認證。
源表起始位點
啟動模式
Kafka源表可通過配置scan.startup.mode來指定初始讀取位點:
最早位點(earliest-offset):從當前分區的最早位點開始讀取。
最末尾位點(latest-offset):從當前分區的最末尾位點開始讀取。
已提交位點(group-offsets):從指定group id的已提交位點開始讀取,group id通過properties.group.id指定。
指定時間戳記(timestamp):從時間戳記大於等於指定時間的第一條訊息開始讀取,時間戳記通過scan.startup.timestamp-millis指定。
特錨點(specific-offsets):從您指定的分區位點開始消費,位點通過scan.startup.specific-offsets指定。
程式碼範例如下:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
'scan.startup.mode' = 'earliest-offset',
'scan.startup.mode' = 'latest-offset',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.auto.offset.reset' = 'latest',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);
起始位點優先順序
源表起始位點的優先順序為:
Checkpoint或Savepoint中儲存的位點。
您在Realtime Compute控制台指定的啟動時間。
您在WITH參數中通過scan.startup.mode指定的啟動位點。
未指定scan.startup.mode的情況下使用group-offsets。
在以上任何一個步驟中,如果位點到期或Kafka叢集發生問題等原因導致位點無效,則會使用properties.auto.offset.reset指定的策略進行位點重設,如果您未設定該配置項,則會產生異常要求您介入。
一種常見情況是使用全新的group id開始消費。首先源表會向Kafka叢集查詢該group的已提交位點,由於該group id是第一次使用,不會查詢到有效位點,所以會通過properties.auto.offset.reset參數配置的策略進行重設。因此在使用全新group id進行消費時,必須配置properties.auto.offset.reset來指錨點重設策略。
源表位點提交
Kafka源表只在checkpoint成功後將當前消費位點提交至Kafka叢集。如果您的checkpoint間隔設定較長,您在Kafka叢集側觀察到的消費位點會有延遲。在進行checkpoint時,Kafka源表會將當前讀取進度儲存在狀態中,並不依賴於提交到叢集上的位點進行故障恢複,提交位點僅僅是為了在Kafka側能夠監控到讀取進度,位點提交失敗不會對資料正確性產生任何影響。
結果表自訂分區器
如果內建的Kafka Producer分區模式無法滿足您的需求,您可以實現自訂分區模式將資料寫入對應的分區。自訂分區器需要繼承FlinkKafkaPartitioner,開發完成後編譯JAR包,使用檔案管理功能上傳至Realtime Compute控制台。上傳並引用完成後,請在WITH參數中設定sink.partitioner參數,參數值為分區器完整的類路徑,如org.mycompany.MyPartitioner
。
Kafka、Upsert Kafka或Kafka JSON catalog的選擇
Kafka是一種只能添加資料的訊息佇列系統,無法進行資料的更新和刪除操作,因此在流式SQL計算中無法處理上遊的CDC變更資料和彙總、聯合等運算元的回撤邏輯。如果您需要將含有變更或回撤類型的資料寫入Kafka,請使用對變更資料進行特殊處理的Upsert Kafka結果表。
為了方便您將上遊資料庫中一個或多個資料表中的變更資料批量同步到Kafka中,您可以使用Kafka JSON catalog。如果您的Kafka中儲存的資料格式為JSON,使用Kafka JSON catalog可以省去定義schema和WITH參數的步驟。詳情可參見管理Kafka JSON Catalog。
作為CTAS資料來源
CTAS語句支援將訊息佇列Kafka,且format為JSON的表作為資料來源。在資料同步過程中,如果某些欄位並未出現在預定義的表結構中,Flink會嘗試自動推導該列的類型。如果自動推導的類型不能滿足您的使用需求,您也可以通過輔助推導的方式對某些列的解析類型進行聲明。
類型推導
在類型推導過程中,Flink預設只展開JSON文本中的第一層資料,根據其類型和數值,按照基本規則進行類型推導。類型映射基本規則如下表所示。
JSON類型 | Flink SQL類型 |
BOOLEAN | BOOLEAN |
STRING | DATE、TIMESTAMP、TIMESTAMP_LTZ、TIME或 STRING |
INT或LONG | BIGINT |
BIGINT | DECIMAL或STRING 說明 Flink中DECIMAL的類型存在精度限制。因此,如果整數的實際取值超過了DECIMAL類型最大精度,Flink會自動推導其類型為STRING,避免精度的損失。 |
FLOAT、DOUBLE或BIG DECIMAL | DOUBLE |
ARRAY | STRING |
OBJECT | STRING |
樣本
JSON文本
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "阿里雲",
"engine": "Flink"
}
"type": ["巨量資料"]
}
Flink寫入到下遊儲存的表資訊為
id | name | properties | type |
101 | VVP |
{
"owner": "阿里雲",
"engine": "Flink"
}
| ["巨量資料"] |
輔助推導
當基本規則不符合您的實際需要時,您可以在源表的DDL中聲明特定列的解析類型。通過該方式,Flink會優先使用您聲明的列類型去解析目標欄位。針對以下樣本,Flink會使用DECIMAL的方式去解析price欄位,而不是使用預設的基本規則將其轉換為DOUBLE類型。
CREATE TABLE evolvingKafkaSource (
price DECIMAL(18, 2)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'evolving_kafka_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
但是,當您在DDL中指定的類型和實際資料中的類型不一致時,可以按照以下方式進行處理:
通常,Kafka topic中的JSON文本帶有嵌套結構。如果您需要提取JSON文本中的嵌套列,則可以通過以下兩種方式:
在源表DDL中聲明'json.infer-schema.flatten-nested-columns.enable'='true'
,來展開嵌套列中的所有元素至頂層。通過該方式,所有的嵌套列都會被依次展開。為了避免列名衝突,Flink採用索引到該列的路徑作為展開後列名字。
重要
目前不支援解決列名衝突。如果發生列名衝突,請在源表的DDL中聲明json.ignore-parse-errors為true,來忽略存在衝突的資料。
在DDL中CTAS文法中添加計算資料行`rowkey` AS JSON_VALUE(`properties`, `$.rowkey`)
,來指定要展開的列。詳情請參見樣本四:同步表結構和資料並進行計算。
使用樣本
樣本一:從Kafka中讀取資料後寫入Kafka
從名稱為源表的Topic中讀取Kafka資料,再寫入名稱為結果表的Topic,資料使用CSV格式。
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;
樣本二:同步表結構以及資料
將Kafka Topic中的訊息即時同步到Hologres中。在該情況下,您可以將Kafka訊息的offset和partition id作為主鍵,從而保證在Failover時,Hologres中不會有重複訊息。
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
樣本三:同步表結構以及Kafka訊息的key和value資料
Kafka訊息中的key部分已經儲存了相關資訊,您可以同時同步Kafka中的key和value。
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
說明
Kafka訊息中的key部分不支援表結構變更和類型推導,需要您手動聲明。
樣本四:同步表結構和資料並進行計算
在同步Kafka資料到Hologres時,往往需要一些輕量級的計算。
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
Datastream API
重要
通過DataStream的方式讀寫資料時,則需要使用對應的DataStream連接器串連Realtime ComputeFlink版,DataStream連接器設定方法請參見DataStream連接器使用方法。
構建Kafka Source
Kafka Source提供了構建類來建立Kafka Source的執行個體。我們將通過以下範例程式碼為您介紹如何構建Kafka Source來消費input-topic最早位點的資料,消費組名稱為my-group,並將Kafka訊息體還原序列化為字串。
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Maven中央庫中已經放置了Kafka DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr-version}</version>
</dependency>
在構建KafkaSource時,必須指定以下參數。
參數 | 說明 |
BootstrapServers | Kafka Broker地址,通過setBootstrapServers(String)方法配置。 |
GroupId | 消費者組ID,通過setGroupId(String)方法配置。 |
Topics或Partition | 訂閱的Topic或Partition名稱。Kafka Source提供了以下三種Topic或Partition的訂閱者式: Topic列表,訂閱Topic列表中所有Partition。
KafkaSource.builder().setTopics("topic-a","topic-b")
Regex匹配,訂閱與Regex所匹配的Topic下的所有Partition。
KafkaSource.builder().setTopicPattern("topic.*")
Partition列表,訂閱指定的Partition。
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
new TopicPartition("topic-a", 0),
new TopicPartition("topic-b", 5)));
KafkaSource.builder().setPartitions(partitionSet)
|
Deserializer | 解析Kafka訊息的還原序列化器。 還原序列化器通過setDeserializer(KafkaRecordDeserializationSchema)來指定,其中KafkaRecordDeserializationSchema定義了如何解析Kafka的ConsumerRecord。如果只解析Kafka訊息中的訊息體(Value)的資料,則您可以通過以下任何一種方式實現: 說明 如果需要完整地解析ConsumerRecord,則需要您自己實現KafkaRecordDeserializationSchema介面。 |
在使用Kafka DataStream連接器時,您還需要瞭解以下Kafka屬性:
構建Kafka Sink
Flink Kafka Sink可以實現將流資料寫入一個或多個Kafka Topic。
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", );
KafkaSink<String> kafkaSink =
KafkaSink.<String>builder()
.setKafkaProducerConfig(kafkaProperties)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("my-topic")
.setKafkaValueSerializer(StringSerializer.class)
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
stream.sinkTo(kafkaSink);
您需要配置以下參數。
參數 | 說明 |
Topic | 資料寫入的預設Topic名稱。 |
資料序列化 | 構建時需要提供KafkaRecordSerializationSchema 來將輸入資料轉換為 Kafka 的ProducerRecord 。Flink 提供了 schema 構建器 以提供一些通用的組件,例如訊息鍵(key)/訊息體(value)序列化、topic 選擇、訊息分區,同樣也可以通過實現對應的介面來進行更豐富的控制。ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp)方法會在每條資料流入的時候調用,來產生ProducerRecord寫入Kafka。 使用者可以對每條資料如何寫入Kafka進行細粒度地控制。通過ProducerRecord可以進行以下操作: 設定寫入的Topic名稱。 定義訊息鍵(Key)。 指定資料寫入的Partition。
|
Kafka用戶端屬性 | bootstrap.servers必填,以逗號分隔的Kafka Broker列表。 |
容錯語義 | 啟用Flink的Checkpoint後,Flink Kafka Sink可以保證精確一次的語義。除了啟用Flink的Checkpoint外,您還可以通過DeliveryGuarantee參數來指定不同的容錯語義,DeliveryGuarantee參數詳情如下: DeliveryGuarantee.NONE:(預設設定)Flink不做任何保證,資料可能會丟失或重複。 DeliveryGuarantee.AT_LEAST_ONCE:保證不會丟失任何資料,但可能會重複。 DeliveryGuarantee.EXACTLY_ONCE:使用Kafka事務提供精確一次的語義保證。
|
資料攝入
Kafka連接器可以用於資料攝入YAML作業開發,作為源端讀取或目標端寫入。
文法結構
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092
配置項
通用
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
type | 源端或目標端類型。 | 是 | String | 無 | 固定值為kafka |
name | 源端或目標端名稱。 | 否 | String | 無 | 無 |
properties.bootstrap.servers | Kafka broker地址。 | 是 | String | 無 | 格式為host:port,host:port,host:port ,以英文逗號(,)分割。 |
properties.* | 對Kafka用戶端的直接配置。 | 否 | String | 無 | 尾碼名必須是Kafka官方文檔中定義的生產者和消費者配置。 Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過'properties.allow.auto.create.topics' = 'false' 來禁用自動建立topic。 |
value.format | 讀取或寫入Kafka訊息value部分時使用的格式。 | 否 | String | debezium-json | 取值如下: |
源表
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
topic | 讀取的topic名稱。 | 否 | String | 無 | 以英文分號 (;) 分隔多個topic名稱,例如topic-1和topic-2 說明 topic和topic-pattern兩個選項只能指定其中一個。 |
topic-pattern | 匹配讀取topic名稱的Regex。所有匹配該Regex的topic在作業運行時均會被讀取。 | 否 | String | 無 | 說明 topic和topic-pattern兩個選項只能指定其中一個。 |
properties.group.id | 消費組ID。 | 否 | String | 無 | 如果指定的group id為首次使用,則必須將properties.auto.offset.reset設定為earliest或latest以指定初次開機位點。 |
scan.startup.mode | Kafka讀取資料的啟動位點。 | 否 | String | group-offsets | 取值如下: earliest-offset:從Kafka最早分區開始讀取。 latest-offset:從Kafka最新位點開始讀取。 group-offsets(預設值):從指定的properties.group.id已提交的位點開始讀取。 timestamp:從scan.startup.timestamp-millis指定的時間戳記開始讀取。 specific-offsets:從scan.startup.specific-offsets指定的位移量開始讀取。
說明 該參數在作業無狀態啟動時生效。作業在從checkpoint重啟或狀態恢複時,會優先使用狀態中儲存的進度恢複讀取。 |
scan.startup.specific-offsets | specific-offsets啟動模式下,指定每個分區的啟動位移量。 | 否 | String | 無 | 例如partition:0,offset:42;partition:1,offset:300 |
scan.startup.timestamp-millis | timestamp啟動模式下,指定啟動位點時間戳記。 | 否 | Long | 無 | 單位為毫秒 |
scan.topic-partition-discovery.interval | 動態檢測Kafka topic和partition的時間間隔。 | 否 | Duration | 5分鐘 | 分區檢查間隔預設為5分鐘。需要顯式地設定分區檢查間隔為非正數才能關閉此功能。開啟動態分區發現後,Kafka Source 可以自動地發現新增的分區並自動讀取對應分區上的資料。在topic-pattern模式下,不僅讀取已有topic的新增分區資料,也會讀取符合正則匹配的新增topic的所有分區資料。 |
scan.check.duplicated.group.id | 是否檢查通過properties.group.id 指定的消費者組有重複。 | 否 | Boolean | false | 參數取值如下: |
schema.inference.strategy | Schema推導策略。 | 否 | String | continuous | 取值如下: |
scan.max.pre.fetch.records | Schema初始推導時,對每個分區最多嘗試消費解析的訊息數量 | 否 | Int | 50 | 在作業實際讀取並處理資料前,對每個分區嘗試提前消費指定數量的最新訊息,用於初始化Schema資訊。 |
源表 Debezium JSON
參數 | 是否必填 | 資料類型 | 預設值 | 描述 |
debezium-json.distributed-tables | 否 | Boolean | false | 如果Debezium JSON內單張表資料會出現在多個分區,則需要開啟此選項。 |
debezium-json.schema-include | 否 | Boolean | false | 設定Debezium Kafka Connect時,可以啟用Kafka配置value.converter.schemas.enable,以在訊息中包含schema。此選項表明Debezium JSON訊息是否包含schema。 參數取值如下: |
debezium-json.ignore-parse-errors | 否 | Boolean | false | 參數取值如下: true:當解析異常時,跳過當前行。 false(預設值):報出錯誤,作業啟動失敗。
|
debezium-json.infer-schema.primitive-as-string | 否 | Boolean | false | 解析表結構時,是否推導所有類型為String類型。 參數取值如下: true:推導所有基本類型為String。 false(預設值):按照基本規則進行推導。
|
源表 Canal JSON
參數 | 是否必填 | 資料類型 | 預設值 | 描述 |
canal-json.distributed-tables | 否 | Boolean | false | 如果Canal JSON內單張表資料會出現在多個分區,則需要開啟此選項。 |
canal-json.database.include | 否 | String | 無 | 一個可選的Regex,通過正則匹配Canal記錄中的database元欄位,僅讀取指定資料庫的changelog記錄。正則字串與Java的Pattern相容。 |
canal-json.table.include | 否 | String | 無 | 一個可選的Regex,通過正則匹配Canal記錄中的table元欄位,僅讀取指定表的changelog記錄。正則字串與Java的Pattern相容。 |
canal-json.ignore-parse-errors | 否 | Boolean | false | 參數取值如下: true:當解析異常時,跳過當前行。 false(預設值):報出錯誤,作業啟動失敗。
|
canal-json.infer-schema.primitive-as-string | 否 | Boolean | false | 解析表結構時,是否推導所有類型為String類型。 參數取值如下: true:推導所有基本類型為String。 false(預設值):按照基本規則進行推導。
|
結果表
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
type | 目標端類型。 | 是 | String | 無 | 固定值為kafka |
name | 目標端名稱。 | 否 | String | 無 | 無 |
key.format | 寫入Kafka訊息key部分時使用的格式。 | 否 | String | 無 | 取值如下: |
topic | Kafka Topic名稱。 | 否 | String | 無 | 開啟時,所有的資料都會寫入這個Topic。 說明 如果沒有開啟,每條資料會寫入到其TableID對應字串(通過. 拼接產生)的Topic,例如databaseName.tableName 。 |
partition.strategy | 資料寫入Kafka分區的策略。 | 否 | String | all-to-zero | 取值如下: |
表結構推導和變更同步策略
分區訊息預消費和表結構初始化
Kafka連接器會維護當前已知的所有表的Schema。在讀取Kafka中Debezium JSON或Canal JSON格式的資料前,Kafka連接器會預先在每個分區中嘗試消費最多scan.max.pre.fetch.records條訊息,解析每條資料的Schema,再將這些Schema合并,用於初始化表結構資訊。後續在實際消費資料前會根據初始化的Schema產生對應的建表事件。
說明
對於Debezium JSON和Canal JSON格式,表資訊在具體訊息中,提前消費的scan.max.pre.fetch.records條訊息中可能包含了若干個表的資料,因此對每張表而言,提前消費的資料條數無法確定。預消費和初始化表結構資訊只會在實際消費和處理每個分區的訊息前進行一次,若後續有新表資料,該表的第一條資料解析出的表結構會作為初始表結構,不會重新預消費和初始化對應的表結構。
重要
僅VVR 8.0.11及以上版本支援單表資料分布在多個分區,對於該情境需要將配置項debezium-json.distributed-tables或canal-json.distributed-tables設為true。
主鍵資訊
Schema推導和Schema變更
在表結構初始化完成後,若schema.inference.strategy配置為static,Kafka連接器會根據初始的表結構解析每個訊息的訊息體(Value),不會產生Schema變更事件。若schema.inference.strategy配置為continuous,Kafka連接器會解析每個Kafka訊息的訊息體,推匯出訊息的物理列,並與當前維護的Schema比對,若推匯出的Schema與當前Schema不一致時,會嘗試將Schema合并,同時產生對應的表結構變更事件,合并規則如下:
當前支援的Schema變更策略如下:
添加列:會在當前Schema末尾添加對應的列,並同步新增列的資料,新增的列會設定為可空列。
刪除列:不會產生刪除列事件,而是後續將該列的資料自動填滿為NULL值。
重新命名列:被看作為添加列和刪除列,在當前Schema末尾添加重新命名後的列,並將重新命名前的列資料填充為NULL值。
列類型變更:
對於支援列類型變更的下遊系統,在下遊Sink支援處理列類型變更後,資料攝入作業支援普通列的類型變更,例如,從INT類型變更到BIGINT類型。此類變更依賴於下遊Sink支援的列類型變更規則,不同的結果表支援的列類型變更規則也不相同,請參考結果表文檔擷取其支援的列類型變更規則。
對於不支援列類型變更的下遊系統,比如Hologres,此類情境可以使用寬類型映射,即作業啟動時在下遊系統建立類型更加寬泛的表,在列類型變更發生時判斷該類型變更下遊Sink是否可以接受從而實現寬容的列類型變更支援。
當前暫不支援的Schema變更:
主鍵或索引等約束的變更。
從NOT NULL轉為NULLABLE變更。
EXACTLY_ONCE語義注意事項
當使用事務寫入Kafka時,請為所有消費Kafka資料的應用配置isolation.level參數。該參數取值如下:
DeliveryGuarantee.EXACTLY_ONCE模式依賴於在從某個Checkpoint恢複後,且在該Checkpoint開始之前所提交的事務。如果Flink作業崩潰與完成重啟之間的時間大於Kafka的事務逾時時間,則會有資料丟失,因為Kafka會自動中止超過逾時時間的事務。因此,請根據您的預期停機時間適當地配置您的事務逾時。
Kafka Broker預設的transaction.max.timeout.ms設定為15分鐘,Producer設定的事務逾時不能超過Broker指定的時間。Flink Kafka Sink預設會將Kafka Producer配置中的transaction.timeout.ms屬性設定為1小時,因此在使用DeliveryGuarantee.EXACTLY_ONCE模式前,需要增加Broker端的transaction.max.timeout.ms值。
DeliveryGuarantee.EXACTLY_ONCE模式為在Flink Kafka Producer執行個體中使用一個固定大小的Kafka Producer池。每個Checkpoint會使用池中的一個Kafka Producer。如果並發的Checkpoint數量超過Producer池的大小,Flink Kafka Producer會拋出異常並使整個作業失敗。請相應地配置Producer池大小和最大並發的Checkpoint數量。
DeliveryGuarantee.EXACTLY_ONCE會儘可能清除阻止Consumer從Kafka topic中讀取資料的殘留事務。但如果Flink作業在第一個 Checkpoint之前就出現故障,則在重啟該作業後並不會保留重啟前Producer池的資訊。因此,在第一個Checkpoint完成之前縮減Flink作業的並行度是不安全的,即使要縮減並行度,也不能小於FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR。
對於在read_committed模式下啟動並執行Kafka Consumer,任何未結束(既未中止也未完成)的事務都將阻塞對該Kafka Topic的所有讀取。如果您按照以下步驟進行了操作:
使用者開啟事務1並通過該事務寫入了一些資料。
使用者開啟事務2並通過該事務寫入了更多的資料。
使用者提交事務2。
即使來自事務2的資料已經提交,在事務1提交或中止之前,事務2的資料對消費者是不可見的。因此: