本文為您介紹如何使用Upsert Kafka連接器。
背景資訊
Upsert Kafka連接器支援以upsert方式從Kafka topic中讀取資料並將資料寫入Kafka topic。
作為源表,此連接器可以將Kafka中儲存的資料轉換為changelog流,其中每條資料記錄代表一個更新或刪除事件。更準確地說,資料記錄中的value被解釋為同一key的最後一個value的UPDATE,如果有這個key,如果不存在相應的key,則該更新被視為INSERT。用表來類比,changelog流中的資料記錄被解釋為UPSERT,也稱為INSERT或UPDATE,因為任何具有相同key的現有行都被覆蓋。另外,value為空白的訊息將會被視作為DELETE訊息。
作為結果表,此連接器可以消費上遊計算邏輯產生的changelog流。它會將INSERT或UPDATE_AFTER資料作為正常的Kafka訊息寫入,並將DELETE資料以value為空白的Kafka訊息寫入,表示對應key的訊息被刪除。Flink將根據主鍵列的值對資料進行分區,從而保證主鍵上的訊息有序,因此同一主鍵上的更新或刪除訊息將落在同一分區中。
類別 | 詳情 |
支援類型 | 源表和結果表 |
運行模式 | 流模式 |
資料格式 | avro、avro-confluent、csv、json和raw |
特有監控指標 |
|
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
您需要建立Kafka叢集,詳情請參見建立DataFlow Kafka叢集或在Kafka建立資源。
您需要串連Realtime ComputeFlink與Kafka叢集之間網路。Kafka on EMR可參見文檔配置建立和管理專用網路和安全性群組概述,ApsaraMQ for Kafka需要配置白名單。
使用限制
文法結構
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為upsert-kafka。
properties.bootstrap.servers
Kafka broker地址。
String
是
無
格式為
host:port,host:port,host:port
,以英文逗號(,)分割。properties.*
對Kafka用戶端的直接配置。
String
否
無
Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過
'properties.allow.auto.create.topics' = 'false'
來禁用自動建立topic。不能通過該方式修改以下配置,因為它們會被Kafka連接器覆蓋:
key.deserializer
value.deserializer
key.format
讀取或寫入Kafka訊息key部分時使用的格式。
String
是
無
當使用該配置時,key.fields或key.fields-prefix配置是必填的。
參數取值如下:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
為所有Kafka訊息key部分指定自訂首碼,以避免與訊息value部分格式欄位重名。
String
否
無
該配置項僅用於源表和結果表的列名區分,解析和產生Kafka訊息key部分時,該首碼會被移除。
說明使用該配置時,value.fields-include必須配置為EXCEPT_KEY。
value.format
讀取或寫入Kafka訊息value部分時使用的格式。
String
是
無
該配置等同於format,因此format和value.format 只能配置其中一個,如果同時配置兩個會產生衝突。
value.fields-include
在解析或產生Kafka訊息value部分時,是否要包含訊息key部分對應的欄位。
String
是
ALL
參數取值如下:
ALL(預設值):所有列都會作為Kafka訊息value部分處理。
EXCEPT_KEY:除去key.fields定義的欄位,剩餘欄位作為Kafka訊息value部分處理。
topic
讀取或寫入topic名稱。
String
是
無
無。
結果表
參數
說明
資料類型
是否必填
預設值
備忘
sink.parallelism
Kafka結果表運算元的並發數。
Integer
否
上遊運算元的並發,由架構決定
無。
sink.buffer-flush.max-rows
緩衝重新整理前,最多能緩衝多少條記錄。
Integer
否
0(未開啟)
當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。
說明如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rows和sink.buffer-flush.interval兩個選項為大於零的值。
sink.buffer-flush.interval
緩衝重新整理的間隔時間。
Duration
否
0(未開啟)
單位可以為毫秒(ms)、秒(s)、分鐘(min)或小時(h)。例如
'sink.buffer-flush.interval'='1 s'
。當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。
說明如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rows和sink.buffer-flush.interval兩個選項為大於零的值。
使用樣本
源表
建立Kafka資料來源表,源表中包含網站使用者的瀏覽資料。
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
結果表
建立Upsert Kafka結果表。
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
將統計網站使用者的瀏覽資料寫入結果表中。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;