This topic describes how to use the Upsert Kafka connector.
Background information
The Upsert Kafka connector can be used to read data from and write data to Kafka topics in the upsert fashion.
The Upsert Kafka connector for a source table can convert data that is stored in Kafka topics into a changelog stream. Each data record in the changelog stream represents an update event or a delete event. If a Kafka topic contains a key that is the same as the key in a data record, the value in the data record overwrites the value of the key. The data record is interpreted as UPDATE. If a Kafka topic does not contain such a key in a data record, the value in the data record is inserted into the Kafka topic. The data record is interpreted as INSERT. Each data record in a changelog stream is interpreted as UPSERT (INSERT or UPDATE) because an existing row with the same key is always overwritten. If the value of the key in a data record is null, the data record is interpreted as DELETE.
The Upsert Kafka connector for a sink table or a data ingestion sink can consume changelog streams that are produced by the source. The Upsert Kafka connector can write INSERT and UPDATE_AFTER data to Kafka topics as normal Kafka messages. The Upsert Kafka connector can write DELETE data to Kafka topics as Kafka messages with null values. If the value of the key in a data record is null, the Kafka message that uses the key is deleted. Flink partitions data based on the values of the primary key column. This ensures that messages with the same primary key are sorted by value. Therefore, UPDATE or DELETE data that contains the same primary key is written to the same partition.
Item | Description |
Table type | Source table, sink table, data ingestion sink |
Running mode | Streaming mode |
Data format | avro, avro-confluent, csv, json, and raw |
Metric |
|
API type | SQL API and data ingestion YAML API |
Data update or deletion in a sink table | Yes |
Prerequisites
A Kafka cluster is created. For more information, see Create a Dataflow Kafka cluster or Create resources in Kafka.
A network connection is established between Realtime Compute for Apache Flink and the Kafka cluster. For more information about how to establish a network connection between Realtime Compute for Apache Flink and a Kafka cluster that is created in E-MapReduce (EMR), see Create and manage a VPC and Overview. For more information about how to establish a network connection between Realtime Compute for Apache Flink and an ApsaraMQ for Kafka cluster, see Configure a whitelist.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Apache Kafka connector.
The Upsert Kafka connector can be used to read or write only data of Apache Kafka 0.10 or later.
The Upsert Kafka connector supports only the client parameters of Apache Kafka 2.8. For more information about the configuration parameters of the Kafka producer and consumer, see Consumer Configs and Producer Configs.
If an Upsert Kafka sink table uses the exactly-once semantics, you must enable the Kafka transaction mechanism to write data to a Kafka cluster. The version of the Kafka cluster must be Apache Kafka 0.11 or later.
SQL
The Upsert Kafka connector can be used to read data from and write data to Kafka topics in the upsert fashion.
Syntax
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
String
Yes
No default value
Set the value to upsert-kafka.
properties.bootstrap.servers
The IP addresses or endpoints and port numbers of Kafka brokers.
String
Yes
No default value
Format:
host:port,host:port,host:port
. Separate multiple host:port pairs with commas (,).properties.*
The parameters that are configured for the Kafka client.
String
No
No default value
The suffix of this parameter must comply with the rules that are defined in Producer Configs and Consumer Configs.
Flink removes the properties. prefix and passes the transformed keys and values to the Kafka client. For example, you can set
properties.allow.auto.create.topics
to false to disable automatic topic creation.You cannot modify the configurations of the following parameters by adding the properties. prefix, because the values of the parameters are overwritten after you use the Upsert Kafka connector:
key.deserializer
value.deserializer
key.format
The format used to read or write the key field of Kafka messages.
String
Yes
No default value
You must configure the key.fields or key.fields-prefix parameter if you configure this parameter.
Valid values:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields.
String
No
No default value
This parameter is used only to distinguish the column names of source tables and sink tables. The prefix is removed from the column names when the key fields of Kafka messages are parsed and generated.
NoteIf you configure this parameter, you must set the value.fields-include parameter to EXCEPT_KEY.
value.format
The format used to read or write the value field of Kafka messages.
String
Yes
No default value
The configuration of this parameter is equivalent to the configuration of the format parameter. The format parameter cannot be used together with the value.format parameter. If you configure both parameters, a conflict occurs.
value.fields-include
Specifies whether to include the fields that correspond to message keys when the value fields of Kafka messages are parsed or generated.
String
Yes
ALL
Valid values:
ALL: All fields are processed as value fields of Kafka messages. This is the default value.
EXCEPT_KEY: All fields except for the fields specified by the key.fields parameter are processed as value fields of Kafka messages.
topic
The name of the topic from which data is read or to which data is written.
String
Yes
No default value
N/A
Parameters only for sink tables
Parameter
Description
Data type
Required
Default value
Remarks
sink.parallelism
The parallelism of operators in the Kafka sink table.
Integer
No
The parallelism of upstream operators, which is determined by the framework.
N/A
sink.buffer-flush.max-rows
The maximum number of data records that can be cached before the cache is refreshed.
Integer
No
0 (disabled)
If the sink table receives a large number of updates on the same key, only the last data record of the key is retained in the cache. In this case, data caching in the sink table helps reduce the amount of data that is written to Kafka topics. This prevents potential tombstone messages from being sent to Kafka topics.
NoteIf you want to enable data caching for sink tables, you must set the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters to values that are greater than 0.
sink.buffer-flush.interval
The interval at which the cache is refreshed.
Duration
No
0 (disabled)
The unit can be milliseconds, seconds, minutes, or hours. For example, you can configure
'sink.buffer-flush.interval'='1 s'
.If the sink table receives a large number of updates on the same key, only the last data record of the key is retained in the cache. In this case, data caching in the sink table helps reduce the amount of data that is written to Kafka topics. This prevents potential tombstone messages from being sent to Kafka topics.
NoteIf you want to enable data caching for sink tables, you must set the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters to values that are greater than 0.
Data ingestion
The Upsert Kafka connector can be used to develop YAML drafts for data ingestion. In addition, it can be used as a sink for data ingestion. The data is written in the JSON format to the sink, and the primary key field is also included in the message body.
Syntax
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# ApsaraMQ for Kafka
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
Parameters
Parameter | Description | Data type | Required | Default value | Remarks |
type | The connector type of a sink | STRING | Yes | No default value | Set the parameter value to upsert-kafka. |
name | The connector name of a sink | STRING | No | No default value | N/A |
properties.bootstrap.servers | The IP addresses or endpoints and port numbers of Kafka brokers. | STRING | Yes | No default value | Format: |
properties.* | The parameters that are configured for the Kafka client. | STRING | No | No default value | The suffix of this parameter must comply with the rules that are defined in Producer Configs. Flink removes the properties. prefix and passes the transformed keys and values to the Kafka client. For example, you can set |
sink.delivery-guarantee | The delivery semantics for the Kafka sink | STRING | No | at-least-once | Valid values:
|
sink.add-tableId-to-header-enabled | Whether to write table information to the header. | BOOLEAN | No | false | If you set the parameter value to true, namespace, schemaName, and tableName are written to the header. |
aliyun.kafka.accessKeyId | The AccessKey ID of your Alibaba Cloud account. | STRING | No | No default value | For more information, see Obtain an AccessKey pair. Note You must configure this parameter when you synchronize data to ApsaraMQ for Kafka. |
aliyun.kafka.accessKeySecret | The AccessKey secret of an Alibaba Cloud account. | STRING | No | No default value | For more information, see Obtain an AccessKey pair. Note You must configure this parameter when you synchronize data to ApsaraMQ for Kafka. |
aliyun.kafka.instanceId | ID of the ApsaraMQ for Kafka instance | STRING | No | No default value | You can view the instance ID on the Instance Details page. Note You must configure this parameter when you synchronize data to ApsaraMQ for Kafka. |
aliyun.kafka.endpoint | The endpoint of ApsaraMQ for Kafka. | STRING | No | No default value | For more information, see Endpoints. Note You must configure this parameter when you synchronize data to ApsaraMQ for Kafka. |
aliyun.kafka.regionId | The region ID of the instance in which you want to create a topic. | STRING | No | No default value | For more information, see Endpoints. Note You must configure this parameter when you synchronize data to ApsaraMQ for Kafka. |
Sample code
Sample code for a source table
Create a Kafka source table that contains the browsing data of website users.
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
Sample code for a sink table
Create an Upsert Kafka sink table.
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
Write the browsing data of website users to the sink table.
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;
Data ingestion sink
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}