全部產品
Search
文件中心

Realtime Compute for Apache Flink:Upsert Kafka

更新時間:Oct 25, 2024

本文為您介紹如何使用Upsert Kafka連接器。

背景資訊

Upsert Kafka連接器支援以upsert方式從Kafka topic中讀取資料並將資料寫入Kafka topic。

  • 作為源表,此連接器可以將Kafka中儲存的資料轉換為changelog流,其中每條資料記錄代表一個更新或刪除事件。更準確地說,資料記錄中的value被解釋為同一key的最後一個value的UPDATE,如果有這個key,如果不存在相應的key,則該更新被視為INSERT。用表來類比,changelog流中的資料記錄被解釋為UPSERT,也稱為INSERT或UPDATE,因為任何具有相同key的現有行都被覆蓋。另外,value為空白的訊息將會被視作為DELETE訊息。

  • 作為結果表或資料攝入目標端,此連接器可以消費上遊計算邏輯產生的changelog流。它會將INSERT或UPDATE_AFTER資料作為正常的Kafka訊息寫入,並將DELETE資料以value為空白的Kafka訊息寫入,表示對應key的訊息被刪除。Flink將根據主鍵列的值對資料進行分區,從而保證主鍵上的訊息有序,因此同一主鍵上的更新或刪除訊息將落在同一分區中。

類別

詳情

支援類型

源表和結果表,資料攝入目標端

運行模式

流模式

資料格式

avro、avro-confluent、csv、json和raw

特有監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API種類

SQL,資料攝入YAML作業

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 僅Flink計算引擎VVR 2.0.0及以上版本支援訊息佇列Kafka連接器。

  • 僅支援讀取和寫入Apache Kafka 0.10及以上版本的資料。

  • 僅支援Apache Kafka 2.8版本的用戶端配置項,詳情請參見Apache Kafka消費者生產者配置項文檔。

  • Upsert Kafka結果表在使用精確一次語義時,寫入的Kafka叢集必須開啟事務功能,且僅支援Apache Kafka 0.11及以上版本的叢集。

SQL

Upsert Kafka連接器支援以upsert方式從Kafka topic中讀取資料並將資料寫入Kafka topic。

文法結構

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為upsert-kafka。

    properties.bootstrap.servers

    Kafka broker地址。

    String

    格式為host:port,host:port,host:port,以英文逗號(,)分割。

    properties.*

    對Kafka用戶端的直接配置。

    String

    尾碼名必須是Kafka官方文檔中定義的生產者消費者配置。

    Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過'properties.allow.auto.create.topics' = 'false' 來禁用自動建立topic。

    不能通過該方式修改以下配置,因為它們會被Kafka連接器覆蓋:

    • key.deserializer

    • value.deserializer

    key.format

    讀取或寫入Kafka訊息key部分時使用的格式。

    String

    當使用該配置時,key.fieldskey.fields-prefix配置是必填的。

    參數取值如下:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    為所有Kafka訊息key部分指定自訂首碼,以避免與訊息value部分格式欄位重名。

    String

    該配置項僅用於源表和結果表的列名區分,解析和產生Kafka訊息key部分時,該首碼會被移除。

    說明

    使用該配置時,value.fields-include必須配置為EXCEPT_KEY。

    value.format

    讀取或寫入Kafka訊息value部分時使用的格式。

    String

    該配置等同於format,因此formatvalue.format 只能配置其中一個,如果同時配置兩個會產生衝突。

    value.fields-include

    在解析或產生Kafka訊息value部分時,是否要包含訊息key部分對應的欄位。

    String

    ALL

    參數取值如下:

    • ALL(預設值):所有列都會作為Kafka訊息value部分處理。

    • EXCEPT_KEY:除去key.fields定義的欄位,剩餘欄位作為Kafka訊息value部分處理。

    topic

    讀取或寫入topic名稱。

    String

    無。

  • 結果表

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    sink.parallelism

    Kafka結果表運算元的並發數。

    Integer

    上遊運算元的並發,由架構決定

    無。

    sink.buffer-flush.max-rows

    緩衝重新整理前,最多能緩衝多少條記錄。

    Integer

    0(未開啟)

    當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。

    說明

    如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rowssink.buffer-flush.interval兩個選項為大於零的值。

    sink.buffer-flush.interval

    緩衝重新整理的間隔時間。

    Duration

    0(未開啟)

    單位可以為毫秒(ms)、秒(s)、分鐘(min)或小時(h)。例如'sink.buffer-flush.interval'='1 s'

    當結果表收到很多同key上的更新時,緩衝將保留同key的最後一條記錄,因此結果表緩衝能協助減少發往Kafka topic的資料量,以及避免發送潛在的tombstone訊息。

    說明

    如果要開啟結果表緩衝,需要同時設定sink.buffer-flush.max-rowssink.buffer-flush.interval兩個選項為大於零的值。

資料攝入

Upsert Kafka連接器可以用於資料攝入YAML作業開發,作為目標端寫入。寫入時使用JSON格式,主鍵欄位也會放入訊息體中。

文法結構

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # 阿里雲訊息佇列 Kafka 版
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

配置項

參數

說明

資料類型

是否必填

預設值

備忘

type

目標端類型。

STRING

固定值為upsert-kafka。

name

目標端名稱。

STRING

無。

properties.bootstrap.servers

Kafka broker地址。

STRING

格式為host:port,host:port,host:port,以英文逗號(,)分割。

properties.*

對Kafka用戶端的直接配置。

STRING

尾碼名必須是Kafka官方文檔中定義的生產者配置。

Flink會將properties.首碼移除,並將剩餘的配置傳遞給Kafka用戶端。例如可以通過'properties.allow.auto.create.topics' = 'false' 來禁用自動建立topic。

sink.delivery-guarantee

寫入時的語義模式。

STRING

at-least-once

取值如下:

  • none:不保證任何語義,資料可能會丟失或重複。

  • at-least-once(預設值):保證資料不丟失,但可能會重複。

  • exactly-once:使用Kafka事務保證資料不會丟失和重複。

sink.add-tableId-to-header-enabled

是否將table資訊寫入header。

BOOLEAN

false

開啟時,namespace、schemaName和tableName會分別寫入header。

aliyun.kafka.accessKeyId

阿里雲帳號AccessKey ID。

STRING

詳情請參見建立AccessKey

說明

同步資料到阿里雲訊息佇列Kafka版時需要配置。

aliyun.kafka.accessKeySecret

阿里雲帳號AccessKey Secret。

STRING

詳情請參見建立AccessKey

說明

同步資料到阿里雲訊息佇列Kafka版時需要配置。

aliyun.kafka.instanceId

阿里雲Kafka訊息佇列執行個體ID。

STRING

請在阿里雲Kafka執行個體詳情介面查看。

說明

同步資料到阿里雲訊息佇列Kafka版時需要配置。

aliyun.kafka.endpoint

阿里雲Kafka API服務接入地址。

STRING

詳情請參見服務存取點

說明

同步資料到阿里雲訊息佇列Kafka版時需要配置。

aliyun.kafka.regionId

Topic所在執行個體的地區ID。

STRING

詳情請參見服務存取點

說明

同步資料到阿里雲訊息佇列Kafka版時需要配置。

使用樣本

  • 源表

    建立Kafka資料來源表,源表中包含網站使用者的瀏覽資料。

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • 結果表

    • 建立Upsert Kafka結果表。

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • 將統計網站使用者的瀏覽資料寫入結果表中。

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;
  • 資料攝入目標端

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

最佳實務