All Products
Search
Document Center

Realtime Compute for Apache Flink:Upsert Kafka connector

Last Updated:Oct 31, 2024

This topic describes how to use the Upsert Kafka connector.

Background information

The Upsert Kafka connector can be used to read data from and write data to Kafka topics in the upsert fashion.

  • The Upsert Kafka connector for a source table can convert data that is stored in Kafka topics into a changelog stream. Each data record in the changelog stream represents an update event or a delete event. If a Kafka topic contains a key that is the same as the key in a data record, the value in the data record overwrites the value of the key. The data record is interpreted as UPDATE. If a Kafka topic does not contain such a key in a data record, the value in the data record is inserted into the Kafka topic. The data record is interpreted as INSERT. Each data record in a changelog stream is interpreted as UPSERT (INSERT or UPDATE) because an existing row with the same key is always overwritten. If the value of the key in a data record is null, the data record is interpreted as DELETE.

  • The Upsert Kafka connector for a sink table or a data ingestion sink can consume changelog streams that are produced by the source. The Upsert Kafka connector can write INSERT and UPDATE_AFTER data to Kafka topics as normal Kafka messages. The Upsert Kafka connector can write DELETE data to Kafka topics as Kafka messages with null values. If the value of the key in a data record is null, the Kafka message that uses the key is deleted. Flink partitions data based on the values of the primary key column. This ensures that messages with the same primary key are sorted by value. Therefore, UPDATE or DELETE data that contains the same primary key is written to the same partition.

Item

Description

Table type

Source table, sink table, data ingestion sink

Running mode

Streaming mode

Data format

avro, avro-confluent, csv, json, and raw

Metric

  • Metrics for source tables

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Metrics for sink tables

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API type

SQL API and data ingestion YAML API

Data update or deletion in a sink table

Yes

Prerequisites

  • A Kafka cluster is created. For more information, see Create a Dataflow Kafka cluster or Create resources in Kafka.

  • A network connection is established between Realtime Compute for Apache Flink and the Kafka cluster. For more information about how to establish a network connection between Realtime Compute for Apache Flink and a Kafka cluster that is created in E-MapReduce (EMR), see Create and manage a VPC and Overview. For more information about how to establish a network connection between Realtime Compute for Apache Flink and an ApsaraMQ for Kafka cluster, see Configure a whitelist.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Apache Kafka connector.

  • The Upsert Kafka connector can be used to read or write only data of Apache Kafka 0.10 or later.

  • The Upsert Kafka connector supports only the client parameters of Apache Kafka 2.8. For more information about the configuration parameters of the Kafka producer and consumer, see Consumer Configs and Producer Configs.

  • If an Upsert Kafka sink table uses the exactly-once semantics, you must enable the Kafka transaction mechanism to write data to a Kafka cluster. The version of the Kafka cluster must be Apache Kafka 0.11 or later.

SQL

The Upsert Kafka connector can be used to read data from and write data to Kafka topics in the upsert fashion.

Syntax

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    String

    Yes

    No default value

    Set the value to upsert-kafka.

    properties.bootstrap.servers

    The IP addresses or endpoints and port numbers of Kafka brokers.

    String

    Yes

    No default value

    Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

    properties.*

    The parameters that are configured for the Kafka client.

    String

    No

    No default value

    The suffix of this parameter must comply with the rules that are defined in Producer Configs and Consumer Configs.

    Flink removes the properties. prefix and passes the transformed keys and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable automatic topic creation.

    You cannot modify the configurations of the following parameters by adding the properties. prefix, because the values of the parameters are overwritten after you use the Upsert Kafka connector:

    • key.deserializer

    • value.deserializer

    key.format

    The format used to read or write the key field of Kafka messages.

    String

    Yes

    No default value

    You must configure the key.fields or key.fields-prefix parameter if you configure this parameter.

    Valid values:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields.

    String

    No

    No default value

    This parameter is used only to distinguish the column names of source tables and sink tables. The prefix is removed from the column names when the key fields of Kafka messages are parsed and generated.

    Note

    If you configure this parameter, you must set the value.fields-include parameter to EXCEPT_KEY.

    value.format

    The format used to read or write the value field of Kafka messages.

    String

    Yes

    No default value

    The configuration of this parameter is equivalent to the configuration of the format parameter. The format parameter cannot be used together with the value.format parameter. If you configure both parameters, a conflict occurs.

    value.fields-include

    Specifies whether to include the fields that correspond to message keys when the value fields of Kafka messages are parsed or generated.

    String

    Yes

    ALL

    Valid values:

    • ALL: All fields are processed as value fields of Kafka messages. This is the default value.

    • EXCEPT_KEY: All fields except for the fields specified by the key.fields parameter are processed as value fields of Kafka messages.

    topic

    The name of the topic from which data is read or to which data is written.

    String

    Yes

    No default value

    N/A

  • Parameters only for sink tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    sink.parallelism

    The parallelism of operators in the Kafka sink table.

    Integer

    No

    The parallelism of upstream operators, which is determined by the framework.

    N/A

    sink.buffer-flush.max-rows

    The maximum number of data records that can be cached before the cache is refreshed.

    Integer

    No

    0 (disabled)

    If the sink table receives a large number of updates on the same key, only the last data record of the key is retained in the cache. In this case, data caching in the sink table helps reduce the amount of data that is written to Kafka topics. This prevents potential tombstone messages from being sent to Kafka topics.

    Note

    If you want to enable data caching for sink tables, you must set the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters to values that are greater than 0.

    sink.buffer-flush.interval

    The interval at which the cache is refreshed.

    Duration

    No

    0 (disabled)

    The unit can be milliseconds, seconds, minutes, or hours. For example, you can configure 'sink.buffer-flush.interval'='1 s'.

    If the sink table receives a large number of updates on the same key, only the last data record of the key is retained in the cache. In this case, data caching in the sink table helps reduce the amount of data that is written to Kafka topics. This prevents potential tombstone messages from being sent to Kafka topics.

    Note

    If you want to enable data caching for sink tables, you must set the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters to values that are greater than 0.

Data ingestion

The Upsert Kafka connector can be used to develop YAML drafts for data ingestion. In addition, it can be used as a sink for data ingestion. The data is written in the JSON format to the sink, and the primary key field is also included in the message body.

Syntax

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # ApsaraMQ for Kafka
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

Parameters

Parameter

Description

Data type

Required

Default value

Remarks

type

The connector type of a sink

STRING

Yes

No default value

Set the parameter value to upsert-kafka.

name

The connector name of a sink

STRING

No

No default value

N/A

properties.bootstrap.servers

The IP addresses or endpoints and port numbers of Kafka brokers.

STRING

Yes

No default value

Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

properties.*

The parameters that are configured for the Kafka client.

STRING

No

No default value

The suffix of this parameter must comply with the rules that are defined in Producer Configs.

Flink removes the properties. prefix and passes the transformed keys and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable automatic topic creation.

sink.delivery-guarantee

The delivery semantics for the Kafka sink

STRING

No

at-least-once

Valid values:

  • none: The delivery semantics does not guarantee anything. Data may be lost or duplicated.

  • at-least-once: The at-least-once semantics ensures that no data is lost. However, duplicate data may exist. This is the default value.

  • exactly-once: Kafka transactions are used to ensure the exactly-once semantics. This ensures that data is not lost or duplicated.

sink.add-tableId-to-header-enabled

Whether to write table information to the header.

BOOLEAN

No

false

If you set the parameter value to true, namespace, schemaName, and tableName are written to the header.

aliyun.kafka.accessKeyId

The AccessKey ID of your Alibaba Cloud account.

STRING

No

No default value

For more information, see Obtain an AccessKey pair.

Note

You must configure this parameter when you synchronize data to ApsaraMQ for Kafka.

aliyun.kafka.accessKeySecret

The AccessKey secret of an Alibaba Cloud account.

STRING

No

No default value

For more information, see Obtain an AccessKey pair.

Note

You must configure this parameter when you synchronize data to ApsaraMQ for Kafka.

aliyun.kafka.instanceId

ID of the ApsaraMQ for Kafka instance

STRING

No

No default value

You can view the instance ID on the Instance Details page.

Note

You must configure this parameter when you synchronize data to ApsaraMQ for Kafka.

aliyun.kafka.endpoint

The endpoint of ApsaraMQ for Kafka.

STRING

No

No default value

For more information, see Endpoints.

Note

You must configure this parameter when you synchronize data to ApsaraMQ for Kafka.

aliyun.kafka.regionId

The region ID of the instance in which you want to create a topic.

STRING

No

No default value

For more information, see Endpoints.

Note

You must configure this parameter when you synchronize data to ApsaraMQ for Kafka.

Sample code

  • Sample code for a source table

    Create a Kafka source table that contains the browsing data of website users.

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • Sample code for a sink table

    • Create an Upsert Kafka sink table.

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • Write the browsing data of website users to the sink table.

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;
  • Data ingestion sink

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

Best practices