全部產品
Search
文件中心

Realtime Compute for Apache Flink:Upsert Kafka

更新時間:Aug 30, 2024

本文為您介紹如何使用Upsert Kafka連接器。

背景資訊

Upsert Kafka連接器支援以upsert方式從Kafka topic中讀取資料並將資料寫入Kafka topic。

  • 作為源表,此連接器可以將Kafka中儲存的資料轉換為changelog流,其中每條資料記錄代表一個更新或刪除事件。更準確地說,資料記錄中的value被解釋為同一key的最後一個value的UPDATE,如果有這個key,如果不存在相應的key,則該更新被視為INSERT。用表來類比,changelog流中的資料記錄被解釋為UPSERT,也稱為INSERT或UPDATE,因為任何具有相同key的現有行都被覆蓋。另外,value為空白的訊息將會被視作為DELETE訊息。

  • 作為結果表,此連接器可以消費上遊計算邏輯產生的changelog流。它會將INSERT或UPDATE_AFTER資料作為正常的Kafka訊息寫入,並將DELETE資料以value為空白的Kafka訊息寫入,表示對應key的訊息被刪除。Flink將根據主鍵列的值對資料進行分區,從而保證主鍵上的訊息有序,因此同一主鍵上的更新或刪除訊息將落在同一分區中。

類別

詳情

支援類型

源表和結果表

運行模式

流模式

資料格式

avro、avro-confluent、csv、json和raw

特有監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 僅Flink計算引擎VVR 2.0.0及以上版本支援訊息佇列Kafka連接器。

  • 僅支援讀取和寫入Apache Kafka 0.10及以上版本的資料。

  • 僅支援Apache Kafka 2.8版本的用戶端配置項,詳情請參見Apache Kafka消費者生產者配置項文檔。

  • Upsert Kafka結果表在使用精確一次語義時,寫入的Kafka叢集必須開啟事務功能,且僅支援Apache Kafka 0.11及以上版本的叢集。

  • 版本關係限制。

    Kafka叢集版本

    VVR版本

    1.x版本

    6.x版本

    2.x版本,3.x版本

    6.x版本,8.x版本

文法結構

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為upsert-kafka。

    properties.bootstrap.servers

    Kafka broker地址。

    String

    格式為host:port,host:port,host:port,以英文逗號(,)分割。

    properties.*

    對Kafka用戶端的直接配置。

    String

    尾碼名必須是Kafka官方文檔中定義的生產者消費者配置。

    Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過'properties.allow.auto.create.topics' = 'false' 來禁用自動建立topic。

    不能通過該方式修改以下配置,因為它們會被Kafka連接器覆蓋:

    • key.deserializer

    • value.deserializer

    key.format

    讀取或寫入Kafka訊息key部分時使用的格式。

    String

    當使用該配置時,key.fieldskey.fields-prefix配置是必填的。

    參數取值如下:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    為所有Kafka訊息key部分指定自訂首碼,以避免與訊息value部分格式欄位重名。

    String

    該配置項僅用於源表和結果表的列名區分,解析和產生Kafka訊息key部分時,該首碼會被移除。

    說明

    使用該配置時,value.fields-include必須配置為EXCEPT_KEY。

    value.format

    讀取或寫入Kafka訊息value部分時使用的格式。

    String

    該配置等同於format,因此formatvalue.format 只能配置其中一個,如果同時配置兩個會產生衝突。

    value.fields-include

    在解析或產生Kafka訊息value部分時,是否要包含訊息key部分對應的欄位。

    String

    ALL

    參數取值如下:

    • ALL(預設值):所有列都會作為Kafka訊息value部分處理。

    • EXCEPT_KEY:除去key.fields定義的欄位,剩餘欄位作為Kafka訊息value部分處理。

    topic

    讀取或寫入topic名稱。

    String

    無。

  • 結果表

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    sink.parallelism

    Kafka結果表運算元的並發數。

    Integer

    上遊運算元的並發,由架構決定

    無。

    sink.buffer-flush.max-rows

    緩衝重新整理前,最多能緩衝多少條記錄。

    Integer

    0(未開啟)

    當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。

    說明

    如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rowssink.buffer-flush.interval兩個選項為大於零的值。

    sink.buffer-flush.interval

    緩衝重新整理的間隔時間。

    Duration

    0(未開啟)

    單位可以為毫秒(ms)、秒(s)、分鐘(min)或小時(h)。例如'sink.buffer-flush.interval'='1 s'

    當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。

    說明

    如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rowssink.buffer-flush.interval兩個選項為大於零的值。

使用樣本

  • 源表

    建立Kafka資料來源表,源表中包含網站使用者的瀏覽資料。

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • 結果表

    • 建立Upsert Kafka結果表。

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • 將統計網站使用者的瀏覽資料寫入結果表中。

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;

最佳實務