本文為您介紹如何使用Upsert Kafka連接器。
背景資訊
Upsert Kafka連接器支援以upsert方式從Kafka topic中讀取資料並將資料寫入Kafka topic。
作為源表,此連接器可以將Kafka中儲存的資料轉換為changelog流,其中每條資料記錄代表一個更新或刪除事件。更準確地說,資料記錄中的value被解釋為同一key的最後一個value的UPDATE,如果有這個key,如果不存在相應的key,則該更新被視為INSERT。用表來類比,changelog流中的資料記錄被解釋為UPSERT,也稱為INSERT或UPDATE,因為任何具有相同key的現有行都被覆蓋。另外,value為空白的訊息將會被視作為DELETE訊息。
作為結果表或資料攝入目標端,此連接器可以消費上遊計算邏輯產生的changelog流。它會將INSERT或UPDATE_AFTER資料作為正常的Kafka訊息寫入,並將DELETE資料以value為空白的Kafka訊息寫入,表示對應key的訊息被刪除。Flink將根據主鍵列的值對資料進行分區,從而保證主鍵上的訊息有序,因此同一主鍵上的更新或刪除訊息將落在同一分區中。
類別 | 詳情 |
支援類型 | 源表和結果表,資料攝入目標端 |
運行模式 | 流模式 |
資料格式 | avro、avro-confluent、csv、json和raw |
特有監控指標 |
|
API種類 | SQL,資料攝入YAML作業 |
是否支援更新或刪除結果表資料 | 是 |
前提條件
您需要建立Kafka叢集,詳情請參見建立DataFlow Kafka叢集或在Kafka建立資源。
您需要串連Realtime ComputeFlink與Kafka叢集之間網路。Kafka on EMR可參見文檔配置建立和管理專用網路和安全性群組概述,ApsaraMQ for Kafka需要配置白名單。
使用限制
SQL
Upsert Kafka連接器支援以upsert方式從Kafka topic中讀取資料並將資料寫入Kafka topic。
文法結構
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為upsert-kafka。
properties.bootstrap.servers
Kafka broker地址。
String
是
無
格式為
host:port,host:port,host:port
,以英文逗號(,)分割。properties.*
對Kafka用戶端的直接配置。
String
否
無
Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過
'properties.allow.auto.create.topics' = 'false'
來禁用自動建立topic。不能通過該方式修改以下配置,因為它們會被Kafka連接器覆蓋:
key.deserializer
value.deserializer
key.format
讀取或寫入Kafka訊息key部分時使用的格式。
String
是
無
當使用該配置時,key.fields或key.fields-prefix配置是必填的。
參數取值如下:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
為所有Kafka訊息key部分指定自訂首碼,以避免與訊息value部分格式欄位重名。
String
否
無
該配置項僅用於源表和結果表的列名區分,解析和產生Kafka訊息key部分時,該首碼會被移除。
說明使用該配置時,value.fields-include必須配置為EXCEPT_KEY。
value.format
讀取或寫入Kafka訊息value部分時使用的格式。
String
是
無
該配置等同於format,因此format和value.format 只能配置其中一個,如果同時配置兩個會產生衝突。
value.fields-include
在解析或產生Kafka訊息value部分時,是否要包含訊息key部分對應的欄位。
String
是
ALL
參數取值如下:
ALL(預設值):所有列都會作為Kafka訊息value部分處理。
EXCEPT_KEY:除去key.fields定義的欄位,剩餘欄位作為Kafka訊息value部分處理。
topic
讀取或寫入topic名稱。
String
是
無
無。
結果表
參數
說明
資料類型
是否必填
預設值
備忘
sink.parallelism
Kafka結果表運算元的並發數。
Integer
否
上遊運算元的並發,由架構決定
無。
sink.buffer-flush.max-rows
緩衝重新整理前,最多能緩衝多少條記錄。
Integer
否
0(未開啟)
當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。
說明如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rows和sink.buffer-flush.interval兩個選項為大於零的值。
sink.buffer-flush.interval
緩衝重新整理的間隔時間。
Duration
否
0(未開啟)
單位可以為毫秒(ms)、秒(s)、分鐘(min)或小時(h)。例如
'sink.buffer-flush.interval'='1 s'
。當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。
說明如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rows和sink.buffer-flush.interval兩個選項為大於零的值。
資料攝入
Upsert Kafka連接器可以用於資料攝入YAML作業開發,作為目標端寫入。寫入時使用JSON格式,主鍵欄位也會放入訊息體中。
文法結構
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# 阿里雲訊息佇列 Kafka 版
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
配置項
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
type | 目標端類型。 | STRING | 是 | 無 | 固定值為upsert-kafka。 |
name | 目標端名稱。 | STRING | 否 | 無 | 無。 |
properties.bootstrap.servers | Kafka broker地址。 | STRING | 是 | 無 | 格式為 |
properties.* | 對Kafka用戶端的直接配置。 | STRING | 否 | 無 | 尾碼名必須是Kafka官方文檔中定義的生產者配置。 Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過 |
sink.delivery-guarantee | 寫入時的語義模式。 | STRING | 否 | at-least-once | 取值如下:
|
sink.add-tableId-to-header-enabled | 是否將table資訊寫入header。 | BOOLEAN | 否 | false | 開啟時,namespace、schemaName和tableName會分別寫入header。 |
aliyun.kafka.accessKeyId | 阿里雲帳號AccessKey ID。 | STRING | 否 | 無 | 詳情請參見建立AccessKey。 說明 同步資料到阿里雲訊息佇列Kafka版時需要配置。 |
aliyun.kafka.accessKeySecret | 阿里雲帳號AccessKey Secret。 | STRING | 否 | 無 | 詳情請參見建立AccessKey。 說明 同步資料到阿里雲訊息佇列Kafka版時需要配置。 |
aliyun.kafka.instanceId | 阿里雲Kafka訊息佇列執行個體ID。 | STRING | 否 | 無 | 請在阿里雲Kafka執行個體詳情介面查看。 說明 同步資料到阿里雲訊息佇列Kafka版時需要配置。 |
aliyun.kafka.endpoint | 阿里雲Kafka API服務接入地址。 | STRING | 否 | 無 | 詳情請參見服務存取點。 說明 同步資料到阿里雲訊息佇列Kafka版時需要配置。 |
aliyun.kafka.regionId | Topic所在執行個體的地區ID。 | STRING | 否 | 無 | 詳情請參見服務存取點。 說明 同步資料到阿里雲訊息佇列Kafka版時需要配置。 |
使用樣本
源表
建立Kafka資料來源表,源表中包含網站使用者的瀏覽資料。
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
結果表
建立Upsert Kafka結果表。
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
將統計網站使用者的瀏覽資料寫入結果表中。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;
資料攝入目標端
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}