This topic describes how to use the ApsaraMQ for Kafka connector.
Background information
Apache Kafka is an open source, distributed message queue system widely used in big data applications for high-performance data processing, stream analytics, and data integration. The Kafka connector, based on the open source Apache Kafka client, provides high-performance data throughput for Realtime Compute for Apache Flink. It supports reading and writing multiple data formats and offers exactly-once semantics.
Category | Details |
Supported type | Source table, sink table, and data ingestion target |
Execution mode | Streaming mode |
Data format | |
Specific metrics | |
API type | SQL, Datastream, and data ingestion YAML |
Can you update or delete data in a sink table? | The connector does not support updating or deleting data in sink tables. It only supports inserting data. Note For information about updating and deleting data, see Upsert Kafka. |
Prerequisites
You can connect to a cluster using one of the following methods:
Connect to an ApsaraMQ for Kafka cluster
The Kafka cluster is version 0.11 or later.
You have created an ApsaraMQ for Kafka cluster. For more information, see Create resources.
The Flink workspace and the Kafka cluster are in the same VPC, and the CIDR block of the Flink VPC is added to the whitelist of the ApsaraMQ for Kafka cluster. For more information, see Configure a whitelist.
ImportantLimitations on writing to ApsaraMQ for Kafka:
ApsaraMQ for Kafka does not support writing data in the zstd compression format.
ApsaraMQ for Kafka supports neither idempotent nor transactional writes. Therefore, you cannot use the exactly-once semantics feature of the Kafka sink table. If you use Ververica Runtime (VVR) 8.0.0 or later, you must add the
properties.enable.idempotence=falseconfiguration item to the sink table to disable idempotent writes. For a comparison of storage engines and feature limitations of ApsaraMQ for Kafka, see Storage engine comparison.
Connect to a self-managed Apache Kafka cluster
The self-managed Apache Kafka cluster is version 0.11 or later.
You have established network connectivity between Flink and the self-managed Apache Kafka cluster. For more information about how to connect to a self-managed cluster over the Internet, see Select a network connection method.
Only client configuration items for Apache Kafka 2.8 are supported. For more information, see the Apache Kafka documentation for consumer and producer configurations.
Precautions
Transactional writes are not recommended because of design limitations in the Flink and Kafka communities. When you set sink.delivery-guarantee = exactly-once, the Kafka connector enables transactional writes, which has the following three known issues:
Each checkpoint generates a transaction ID. If the checkpoint interval is too short, too many transaction IDs are generated. This can cause the coordinator in the Kafka cluster to run out of memory and compromise the stability of the Kafka cluster.
Each transaction creates a producer instance. If too many transactions are committed at the same time, the TaskManager may run out of memory and compromise the stability of the Flink job.
If multiple Flink jobs use the same
sink.transactional-id-prefix, the transaction IDs they generate may conflict. When one job fails to write data, it blocks the Log Start Offset (LSO) of the Kafka partition from advancing. This affects all consumers that read data from the partition.
If you require exactly-once semantics, use an Upsert Kafka sink table and use primary keys to ensure idempotence. To use transactional writes, see Precautions for EXACTLY_ONCE semantics.
Troubleshoot network connectivity
If a Flink job reports a Timed out waiting for a node assignment error at startup, it is usually caused by a network connectivity issue between Flink and Kafka.
A Kafka client connects to the server in the following steps:
The client uses the addresses in
bootstrap.serversto connect to Kafka.Kafka returns the metadata of each broker in the cluster, including their connection addresses.
The client then uses these returned addresses to connect to each broker for read and write operations.
Even if the bootstrap.servers addresses are reachable, the client cannot read or write data if Kafka returns incorrect broker addresses. This issue often occurs in network architectures that use proxies, port forwarding, or leased lines.
Troubleshooting steps
ApsaraMQ for Kafka
Confirm the endpoint type
Default endpoint (internal network)
SASL endpoint (internal network + authentication)
Public endpoint (requires a separate request)
You can use the network probe in the Flink development console to rule out connectivity issues with the
bootstrap.serversaddress.Check security groups and whitelists
The CIDR block of the Flink VPC must be added to the whitelist of the Kafka instance. For more information, see View the VPC CIDR block and Configure a whitelist.
Check the SASL configuration (if enabled)
If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in the Flink job. Missing authentication can cause the connection to fail during the handshake, which can also appear as a timeout. For more information, see Security and authentication.
Self-managed Kafka (ECS)
Use the network probe in the Flink development console.
You can use the network probe to rule out connectivity issues with the
bootstrap.serversaddress and confirm that the internal and public endpoints are correct.Check security groups and whitelists
The ECS security group must allow access to the Kafka endpoint port (usually 9092 or 9093).
The CIDR block of the Flink VPC must be added to the whitelist of the ECS instance. For more information, see View the VPC CIDR block.
Check the configuration
Log on to the ZooKeeper cluster that Kafka uses. You can use the zkCli.sh or zookeeper-shell.sh tool.
Run a command to retrieve broker metadata. For example, run
get /brokers/ids/0. In the result, find the address that Kafka advertises to clients in the `endpoints` field.
Use the network probe in the Flink development console to test whether the address is reachable.
NoteIf the address is not reachable, ask the Kafka Operations and Maintenance (O&M) engineer to check and correct the
listenersandadvertised.listenersconfigurations. Ensure that the returned address is accessible to Flink.For more information about the connection between Kafka clients and servers, see Troubleshoot Connectivity.
Check the SASL configuration (if enabled)
If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in the Flink job. Missing authentication can cause the connection to fail during the handshake, which can also appear as a timeout. For more information, see Security and authentication.
SQL
You can use the Kafka connector in SQL jobs as a source table or a sink table.
Syntax
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)Metadata columns
You can define metadata columns in source and sink tables to retrieve or write metadata for Kafka messages. For example, if you define multiple topics in the WITH clause and define a metadata column in the Kafka source table, Flink can identify the topic from which each record is read. The following example shows how to use metadata columns.
CREATE TABLE kafka_source (
-- Read the topic of the message as the `record_topic` field.
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- Read the timestamp from the ConsumerRecord as the `ts` field.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- Read the offset of the message as the `record_offset` field.
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Write the timestamp from the `ts` field to Kafka as the timestamp of the ProducerRecord.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);The following table lists the supported metadata columns for Kafka source and sink tables.
Key | Data type | Description | Source or sink table |
topic | STRING NOT NULL METADATA VIRTUAL | The name of the topic that contains the Kafka message. | Source table |
partition | INT NOT NULL METADATA VIRTUAL | The partition ID of the Kafka message. | Source table |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | The headers of the Kafka message. | Source and sink tables |
leader-epoch | INT NOT NULL METADATA VIRTUAL | The leader epoch of the Kafka message. | Source table |
offset | BIGINT NOT NULL METADATA VIRTUAL | The offset of the Kafka message. | Source table |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | The timestamp of the Kafka message. | Source and sink tables |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | The timestamp type of the Kafka message:
| Source table |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | The key field of the raw Kafka message. | Source and sink tables |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | The value field of the raw Kafka message. | Source and sink tables |
WITH parameters
Common
Parameter
Description
Data type
Required
Default value
Remarks
connector
The table type.
String
Yes
None
The value must be kafka.
properties.bootstrap.servers
The Kafka broker address.
String
Yes
None
The format is host:port,host:port,host:port. Use commas (,) to separate multiple addresses.
properties.*
Direct configurations for the Kafka client.
String
No
None
The suffix must be a producer or consumer configuration defined in the official Kafka documentation.
Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use
'properties.allow.auto.create.topics'='false'to disable automatic topic creation.Do not use this method to modify the following configurations, because they are overwritten by the Kafka connector:
key.deserializer
value.deserializer
format
The format used to read or write the value part of a Kafka message.
String
No
None
Supported formats:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteFor more information about format parameters, see Format Parameters.
key.format
The format used to read or write the key part of a Kafka message.
String
No
None
Supported formats:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteIf you use this configuration, the key.options configuration is required.
key.fields
The fields in the source or sink table that correspond to the key part of the Kafka message.
String
No
None
Use semicolons (;) to separate multiple field names. For example:
field1;field2key.fields-prefix
Specifies a custom prefix for all Kafka message key fields to avoid name conflicts with the value part format fields.
String
No
None
This configuration item is only used to distinguish column names in source and sink tables. The prefix is removed when parsing and generating the key part of the Kafka message.
NoteIf you use this configuration, you must set
value.fields-includeto EXCEPT_KEY.value.format
The format used to read or write the value part of a Kafka message.
String
No
None
This configuration is equivalent to
format. You can only set one offormatorvalue.format. If both are configured,value.formatoverwritesformat.value.fields-include
Specifies whether to include the fields corresponding to the key part of the message when parsing or generating the value part of the Kafka message.
String
No
ALL
Valid values:
ALL(default): All columns are processed as the value part of the Kafka message.EXCEPT_KEY: The remaining fields, excluding those defined in key.fields, are processed as the value part of the Kafka message.
Source table
Parameter
Description
Data type
Required
Default value
Remarks
topic
The name of the topic to read from.
String
No
None
Use semicolons (;) to separate multiple topic names, such as topic-1 and topic-2.
NoteYou can specify only one of the topic and topic-pattern options.
topic-pattern
A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job runs.
String
No
None
NoteYou can specify only one of the topic and topic-pattern options.
properties.group.id
The consumer group ID.
String
No
KafkaSource-{source_table_name}
If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.
scan.startup.mode
The start offset for reading data from Kafka.
String
No
group-offsets
Valid values:
earliest-offset: Starts reading from the earliest partition in Kafka.latest-offset: Starts reading from the latest offset in Kafka.group-offsets(default): Starts reading from the committed offset of the specified properties.group.id.timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.
NoteThis parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.
scan.startup.specific-offsets
In specific-offsets startup mode, specifies the start offset for each partition.
String
No
None
For example:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
In timestamp startup mode, specifies the start offset timestamp.
Long
No
None
The unit is milliseconds.
scan.topic-partition-discovery.interval
The interval for dynamically detecting Kafka topics and partitions.
Duration
No
5 minutes
The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.
NoteIn VVR 6.0.x, dynamic partition discovery is disabled by default. Starting from VVR 8.0, this feature is enabled by default, and the detection interval is set to 5 minutes.
scan.header-filter
Conditionally filters data based on whether the Kafka data contains a specified header.
String
No
None
Use a colon (:) to separate the header key and value. Use logical operators (&, |) to connect multiple header conditions. The negation logical operator (!) is also supported. For example,
depart:toy|depart:book&!env:testretains Kafka data where the header contains depart=toy or depart=book, and does not contain env=test.NoteThis parameter is supported only in VVR 8.0.6 and later.
Parenthetical operations are not supported.
Logical operations are performed from left to right.
The header value is converted to a string in UTF-8 format for comparison with the specified header value.
scan.check.duplicated.group.id
Specifies whether to check for duplicate consumer groups specified by
properties.group.id.Boolean
No
false
Valid values:
true: Before starting the job, the system checks for duplicate consumer groups. If a duplicate is found, the job reports an error and stops, avoiding conflicts with existing consumer groups.
false: The job starts directly without checking for consumer group conflicts.
NoteThis parameter is supported only in VVR 6.0.4 and later.
Sink table
Parameter
Description
Data type
Required
Default value
Remarks
topic
The name of the topic to write to.
String
Yes
None
None
sink.partitioner
The mapping mode from Flink concurrency to Kafka partitions.
String
No
default
Valid values:
default: Uses the default Kafka partition mode.
fixed: Each Flink concurrency corresponds to a fixed Kafka partition.
round-robin: Data from Flink concurrencies is allocated to Kafka partitions in a round-robin fashion.
Custom partition mapping mode: If fixed and round-robin do not meet your needs, you can create a subclass of FlinkKafkaPartitioner to define a custom partition mapping mode. For example: org.mycompany.MyPartitioner
sink.delivery-guarantee
The semantic mode of the Kafka sink table.
String
No
at-least-once
Valid values:
none: No guarantee is provided. Data may be lost or duplicated.
at-least-once (default): Guarantees that no data is lost, but data may be duplicated.
exactly-once: Uses Kafka transactions to guarantee that data is not lost or duplicated.
NoteWhen using exactly-once semantics, the sink.transactional-id-prefix parameter is required.
sink.transactional-id-prefix
The prefix for the Kafka transaction ID used in exactly-once semantics.
String
No
None
This configuration takes effect only when sink.delivery-guarantee is set to exactly-once.
sink.parallelism
The degree of parallelism for the Kafka sink table operator.
Integer
No
None
The degree of parallelism of the upstream operator is determined by the framework.
Security and authentication
If the Kafka cluster requires a secure connection or authentication, add the relevant security and authentication configurations with the properties. prefix to the WITH clause. The following example shows how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)The following example shows how to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/*SSL configuration*/
/*Configure the path of the truststore (CA certificate) provided by the server.*/
/*Files uploaded through File Management are stored in the /flink/usrlib/ path.*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/*If client authentication is required, configure the path of the keystore (private key).*/
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/*The algorithm for client to verify the server address. An empty value disables server address verification.*/
'properties.ssl.endpoint.identification.algorithm' = '',
/*SASL configuration*/
/*Configure the SASL mechanism as SCRAM-SHA-256.*/
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/*Configure JAAS*/
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)You can upload the CA certificate and private key mentioned in the example to the platform using the File Management feature in the Realtime Compute console. After you upload the files, they are stored in the /flink/usrlib directory. If the CA certificate file to be used is named my-truststore.jks, you can specify 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' in the WITH clause to use this certificate.
The preceding examples apply to most configuration scenarios. Before you configure the Kafka connector, contact the Kafka server O&M engineer to obtain the correct security and authentication configuration information.
Unlike open source Flink, the SQL editor in Realtime Compute for Apache Flink automatically escapes double quotation marks ("). You do not need to add extra escape characters (\) for double quotation marks in the username and password when you configure
properties.sasl.jaas.config.
Source table start offset
Startup mode
You can specify the initial read offset for a Kafka source table by configuring scan.startup.mode:
earliest-offset: Starts reading from the earliest offset of the current partition.
latest-offset: Starts reading from the latest offset of the current partition.
group-offsets: Starts reading from the committed offset of the specified group ID. The group ID is specified by properties.group.id.
timestamp: Starts reading from the first message whose timestamp is greater than or equal to the specified time. The timestamp is specified by scan.startup.timestamp-millis.
specific-offsets: Starts consuming from the specified partition offset. The offset is specified by scan.startup.specific-offsets.
If you do not specify a start offset, consumption starts from the committed offset (group-offsets) by default.
scan.startup.mode takes effect only for jobs that start without a state. When a stateful job starts, it consumes from the offset stored in its state.
The following code provides an example:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- Consume from the earliest offset.
'scan.startup.mode' = 'earliest-offset',
-- Consume from the latest offset.
'scan.startup.mode' = 'latest-offset',
-- Consume from the committed offset of the consumer group "my-group".
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- If "my-group" is used for the first time, consume from the earliest offset.
'properties.auto.offset.reset' = 'latest', -- If "my-group" is used for the first time, consume from the latest offset.
-- Consume from the specified millisecond timestamp 1655395200000.
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- Consume from the specified offset.
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);Start offset priority
The start offset for the source table is prioritized as follows:
Priority (High to Low) | Offset stored in a checkpoint or savepoint |
Start time selected in the Realtime Compute console when starting the job | |
Start offset specified by scan.startup.mode in the WITH clause | |
If scan.startup.mode is not specified, group-offsets is used by default, and consumption starts from the offset of the corresponding consumer group. |
In any of the preceding steps, if the offset is invalid because it has expired or the Kafka cluster has issues, the policy specified by properties.auto.offset.reset is used to reset the offset. If this configuration item is not set, an exception is thrown and requires user intervention.
A common scenario is starting consumption with a new group ID. The source table first queries the Kafka cluster for the committed offset of this group. Because the group ID is being used for the first time, no valid offset is found. The offset is then reset according to the policy specified by the properties.auto.offset.reset parameter. Therefore, when you consume data with a new group ID, you must configure properties.auto.offset.reset to specify the offset reset policy.
Source table offset commit
The Kafka source table commits the current consumer offset to the Kafka cluster only after a checkpoint is successful. If the checkpoint interval is long, the observed consumer offset in the Kafka cluster will be delayed. During a checkpoint, the Kafka source table stores the current read progress in its state and does not rely on the offset committed to the cluster for fault recovery. Committing the offset is only for monitoring the read progress on the Kafka side. A failed offset commit does not affect the correctness of the data.
Sink table custom partitioner
If the built-in Kafka producer partition modes do not meet your needs, you can implement a custom partition mode to write data to the corresponding partitions. A custom partitioner must inherit FlinkKafkaPartitioner. After development, you can compile the JAR package and upload it to the Realtime Compute console using the File Management feature. After you upload and reference the package, you can set the sink.partitioner parameter in the WITH clause to the full class path of the partitioner, such as org.mycompany.MyPartitioner.
Choose between Kafka, Upsert Kafka, or Kafka JSON catalog
Kafka is an append-only message queue system. It does not support data updates or deletions. Therefore, it cannot handle upstream CDC change data or retraction logic from operators such as aggregation and join in streaming SQL. To write data with changes or retractions to Kafka, use an Upsert Kafka sink table, which is designed to handle change data.
To easily synchronize change data from one or more tables in an upstream database to Kafka in batches, you can use a Kafka JSON catalog. If the data stored in Kafka is in JSON format, using a Kafka JSON catalog eliminates the need to define a schema and WITH parameters. For more information, see Manage a Kafka JSON Catalog.
Usage examples
Example 1: Read data from Kafka and write it to Kafka
Read Kafka data from a topic named 'source' and write it to a topic named 'sink'. The data is in CSV format.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;Example 2: Synchronize table schema and data
Synchronize messages from a Kafka topic to Hologres in real time. In this case, you can use the offset and partition ID of the Kafka message as the primary key to ensure that no duplicate messages exist in Hologres during failover.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- Optional. Flattens all nested columns.
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Example 3: Synchronize table schema and Kafka message key and value data
The key part of a Kafka message already stores relevant information. You can synchronize both the key and value from Kafka.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;The key part of a Kafka message does not support table schema evolution or type parsing. You need to declare it manually.
Example 4: Synchronize table schema and data and perform computation
When you synchronize Kafka data to Hologres, lightweight computations are often required.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Use COALESCE to handle null values.Example 5: Parse nested JSON
JSON message example
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}To avoid using functions such as JSON_VALUE(payload, '$.properties.owner') to parse fields later, you can define the structure directly in the Source DDL:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);This way, Flink parses the JSON into structured fields at read time. Subsequent SQL queries can directly use properties.owner without extra function calls, which results in better overall performance.
Datastream API
When you read and write data using DataStream, you must use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For information about how to set up the DataStream connector, see Use a DataStream connector.
Build a Kafka source
The Kafka source provides a builder class to create an instance of the Kafka source. The following example code shows how to build a Kafka source to consume data from the earliest offset of input-topic. The consumer group name is my-group, and the Kafka message body is deserialized as a string.
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");When you build a KafkaSource, you must specify the following parameters.
Parameter
Description
BootstrapServers
The Kafka broker address. Configure it using the setBootstrapServers(String) method.
GroupId
The consumer group ID. Configure it using the setGroupId(String) method.
Topics or Partition
The name of the subscribed topic or partition. The Kafka source provides the following three ways to subscribe to topics or partitions:
Topic list: Subscribes to all partitions in the topic list.
KafkaSource.builder().setTopics("topic-a","topic-b")Regular expression matching: Subscribes to all partitions under the topics that match the regular expression.
KafkaSource.builder().setTopicPattern("topic.*")Partition list: Subscribes to the specified partitions.
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
The deserializer for parsing Kafka messages.
Specify the deserializer using setDeserializer(KafkaRecordDeserializationSchema), where KafkaRecordDeserializationSchema defines how to parse Kafka's ConsumerRecord. If you only need to parse the data in the message body (value) of a Kafka message, you can do so in one of the following ways:
Use the setValueOnlyDeserializer(DeserializationSchema) method in the KafkaSource builder class provided by Flink. DeserializationSchema defines how to parse the binary data in the Kafka message body.
Use a parser provided by Kafka, which includes multiple implementation classes. For example, you can use StringDeserializer to parse the Kafka message body into a string.
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
NoteTo fully parse the ConsumerRecord, you must implement the KafkaRecordDeserializationSchema interface yourself.
XML
The Kafka DataStream connector is available in the Maven Central Repository.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>When you use the Kafka DataStream connector, you must understand the following Kafka properties:
Start offset
The Kafka source can specify to start consuming from different offsets using an offset initializer (OffsetsInitializer). The built-in offset initializers include the following.
Offset initializer
Code setting
Consume from the earliest offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())Consume from the latest offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())Start consuming from data whose timestamp is greater than or equal to the specified time, in milliseconds.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))Start consuming from the offset committed by the consumer group. If the committed offset does not exist, use the earliest offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))Start consuming from the offset committed by the consumer group, without specifying an offset reset policy.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())NoteIf the built-in initializers do not meet your needs, you can also implement a custom offset initializer.
If no offset initializer is specified, OffsetsInitializer.earliest() is used by default, which starts consumption from the earliest offset.
Streaming and batch modes
The Kafka source supports both streaming and batch execution modes. By default, the Kafka source is set to run in streaming mode, so the job never stops until the Flink job fails or is canceled. To configure the Kafka source to run in batch mode, you can use setBounded(OffsetsInitializer) to specify a stop offset. When all partitions reach their stop offset, the Kafka source exits.
NoteTypically, the Kafka source has no stop offset in streaming mode. For debugging purposes, you can use setUnbounded(OffsetsInitializer) to specify a stop offset in streaming mode. Note that the method names for specifying the stop offset in streaming and batch modes (setUnbounded and setBounded) are different.
Dynamic Partition Check
To handle scenarios such as topic scale-out or new topic creation without restarting the Flink job, you can enable the dynamic partition discovery feature in the provided topic or partition subscription mode.
NoteDynamic partition discovery is enabled by default, and the partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. The following code provides an example.
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // Check for new partitions every 10 seconds.ImportantThe dynamic partition discovery feature relies on the metadata update mechanism of the Kafka cluster. If the Kafka cluster does not update partition information in time, new partitions may not be discovered. Ensure that the partition.discovery.interval.ms configuration of the Kafka cluster matches the actual situation.
Event time and watermarks
By default, the Kafka source uses the timestamp in the Kafka message as the event time. You can customize a watermark strategy to extract the event time from the message and send watermarks downstream.
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")For more information about custom watermark strategies, see Generating Watermarks.
NoteIf some tasks of a parallel source are idle for a long time (for example, if a Kafka partition has no data input for a long time, or the degree of parallelism of the source exceeds the number of Kafka partitions), the watermark generation mechanism may fail. In this case, the system cannot trigger window calculations normally, which causes the data processing flow to stall.
To resolve this issue, you can make the following adjustments:
Configure a watermark timeout mechanism: Enable the table.exec.source.idle-timeout parameter to force the system to generate a watermark after a specified timeout period, which ensures the progress of the window calculation cycle.
Optimize the data source: Maintain a reasonable ratio between the number of Kafka partitions and the degree of parallelism of the source (recommended: number of partitions ≥ source degree of parallelism).
Offset commit
The Kafka source commits the current consumer offset when a checkpoint is completed to ensure consistency between the Flink checkpoint state and the committed offset on the Kafka broker. If checkpointing is not enabled, the Kafka source relies on the internal automatic offset commit logic of the Kafka consumer. The automatic commit feature is configured by the enable.auto.commit and auto.commit.interval.ms Kafka consumer configuration items.
NoteThe Kafka source does not rely on the offset committed on the broker to recover from a failed job. Committing the offset is only for reporting the consumption progress of the Kafka consumer and consumer group for monitoring on the broker side.
Other properties
In addition to the preceding properties, you can also use setProperties(Properties) and setProperty(String, String) to set any properties for the Kafka source and Kafka consumer. The KafkaSource typically has the following configuration items.
Configuration item
Description
client.id.prefix
Specifies the client ID prefix for the Kafka consumer.
partition.discovery.interval.ms
Defines the interval at which the Kafka source checks for new partitions.
Notepartition.discovery.interval.ms is overwritten to -1 in batch mode.
register.consumer.metrics
Specifies whether to register the Kafka consumer's metrics in Flink.
Other Kafka consumer configurations
For more information about Kafka consumer configurations, see Apache Kafka.
ImportantThe Kafka connector forcibly overwrites some manually configured parameters. The details are as follows:
key.deserializer is always overwritten to ByteArrayDeserializer.
value.deserializer is always overwritten to ByteArrayDeserializer.
auto.offset.reset.strategy is overwritten to OffsetsInitializer#getAutoOffsetResetStrategy().
The following example shows how to configure a Kafka consumer to use PLAIN as the SASL mechanism and provide a JAAS configuration.
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")Monitoring
The Kafka source registers metrics in Flink for monitoring and diagnosis.
Metric scope
All metrics of the Kafka source reader are registered under the KafkaSourceReader metric group, which is a subgroup of the operator metric group. Metrics related to a specific topic partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.
For example, the current consumer offset (currentOffset) for partition 1 of topic "my-topic" is registered under <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. The number of successful offset commits (commitsSucceeded) is registered under <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.
Metric list
Metric name
Description
Scope
currentOffset
The current consumer offset.
TopicPartition
committedOffset
The current committed offset.
TopicPartition
commitsSucceeded
The number of successful commits.
KafkaSourceReader
commitsFailed
The number of failed commits.
KafkaSourceReader
Kafka consumer metrics
The Kafka consumer's metrics are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the Kafka consumer metric records-consumed-total is registered under <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.
You can use the register.consumer.metrics configuration item to specify whether to register the Kafka consumer's metrics. By default, this option is set to true. For more information about Kafka consumer metrics, see Apache Kafka.
Build a Kafka sink
The Flink Kafka sink can write stream data to one or more Kafka topics.
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // // producer config .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // target topic .setKafkaValueSerializer(StringSerializer.class) // serialization schema .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // fault-tolerance .build(); stream.sinkTo(kafkaSink);You need to configure the following parameters.
Parameter
Description
Topic
The default topic name to write data to.
Data serialization
When building, you need to provide a
KafkaRecordSerializationSchemato convert the input data into Kafka'sProducerRecord. Flink provides a schema builder with common components such as message key/value serialization, topic selection, and message partitioning. You can also implement the corresponding interfaces for more control. The ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) method is called for each data record to generate a ProducerRecord to be written to Kafka.You can have fine-grained control over how each data record is written to Kafka. With ProducerRecord, you can perform the following operations:
Set the name of the topic to write to.
Define the message key.
Specify the partition to write data to.
Kafka client properties
bootstrap.servers is required. It is a comma-separated list of Kafka brokers.
Fault tolerance semantics
After enabling Flink's checkpointing, the Flink Kafka sink can guarantee exactly-once semantics. In addition to enabling Flink's checkpointing, you can also specify different fault tolerance semantics using the DeliveryGuarantee parameter. The details of the DeliveryGuarantee parameter are as follows:
DeliveryGuarantee.NONE: (default) Flink provides no guarantee. Data may be lost or duplicated.
DeliveryGuarantee.AT_LEAST_ONCE: Guarantees that no data is lost, but data may be duplicated.
DeliveryGuarantee.EXACTLY_ONCE: Uses Kafka transactions to provide exactly-once semantics.
NoteFor precautions when using EXACTLY_ONCE semantics, see Precautions for EXACTLY_ONCE semantics.
Data ingestion
You can use the Kafka connector in YAML-based data ingestion job development to read from a source or write to a target.
Limits
We recommend that you use Kafka as a synchronous data source for Flink CDC data ingestion in VVR 11.1 and later.
Only the JSON, Debezium JSON, and Canal JSON formats are supported. Other data formats are not.
For data sources, only VVR 8.0.11 and later support distributing data from a single table across multiple partitions.
Syntax
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092Configuration items
Common
Parameter
Description
Required
Data type
Default value
Remarks
type
The source or target type.
Yes
String
None
The value must be kafka.
name
The source or target name.
No
String
None
None
properties.bootstrap.servers
The Kafka broker address.
Yes
String
None
The format is
host:port,host:port,host:port. Use commas (,) to separate multiple addresses.properties.*
Direct configurations for the Kafka client.
No
String
None
The suffix must be a producer or consumer configuration defined in the official Kafka documentation.
Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use
'properties.allow.auto.create.topics' = 'false'to disable automatic topic creation.key.format
The format used to read or write the key part of a Kafka message.
No
String
None
For Source, only json is supported.
For Sink, valid values are:
csv
json
NoteThis parameter is supported only in VVR 11.0.0 and later.
value.format
The format used to read or write the value part of a Kafka message.
No
String
debezium-json
Valid values:
debezium-json
canal-json
json
NoteThe debezium-json and canal-json formats are supported only in VVR 8.0.10 and later.
The json format is supported only in VVR 11.0.0 and later.
Source table
Parameter
Description
Required
Data type
Default value
Remarks
topic
The name of the topic to read from.
No
String
None
Use semicolons (;) to separate multiple topic names, such as topic-1 and topic-2.
NoteYou can specify only one of the topic and topic-pattern options.
topic-pattern
A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job runs.
No
String
None
NoteYou can specify only one of the topic and topic-pattern options.
properties.group.id
The consumer group ID.
No
String
None
If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.
scan.startup.mode
The start offset for reading data from Kafka.
No
String
group-offsets
Valid values:
earliest-offset: Starts reading from the earliest Kafka partition.
latest-offset: Starts reading from the latest offset in Kafka.
group-offsets (default): Starts reading from the committed offset of the specified properties.group.id.
timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.
specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.
NoteThis parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.
scan.startup.specific-offsets
In specific-offsets startup mode, specifies the start offset for each partition.
No
String
None
For example:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
In timestamp startup mode, specifies the start offset timestamp.
No
Long
None
The unit is milliseconds.
scan.topic-partition-discovery.interval
The interval for dynamically detecting Kafka topics and partitions.
No
Duration
5 minutes
The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.
scan.check.duplicated.group.id
Specifies whether to check for duplicate consumer groups specified by
properties.group.id.No
Boolean
false
Valid values:
true: Before starting the job, checks for duplicate consumer groups. If a duplicate is found, the job reports an error to avoid conflicts with existing consumer groups.
false: The job starts directly without checking for consumer group conflicts.
schema.inference.strategy
The schema parsing strategy.
No
String
continuous
Valid values:
continuous: Parses the schema for each data record. When the schemas are incompatible, it derives a wider schema and generates a schema evolution event.
static: Parses the schema only once when the job starts. Subsequent data is parsed based on the initial schema, and no schema evolution events are generated.
NoteFor more information about schema parsing, see Table schema parsing and change synchronization strategy.
This configuration item is supported only in VVR 8.0.11 and later.
scan.max.pre.fetch.records
The maximum number of messages to try to consume and parse for each partition during initial schema parsing.
No
Int
50
Before the job actually reads and processes data, it tries to pre-consume a specified number of the latest messages for each partition to initialize the schema information.
key.fields-prefix
A custom prefix added to the field names parsed from the message key to avoid naming conflicts after parsing the Kafka message key.
No
String
None
Assume this configuration item is set to key_. When the key contains a field named a, the field name after parsing the key is key_a.
NoteThe value of `key.fields-prefix` cannot be a prefix of the value of `value.fields-prefix`.
value.fields-prefix
A custom prefix added to the field names parsed from the message value to avoid naming conflicts after parsing the Kafka message value.
No
String
None
Assume this configuration item is set to value_. When the value contains a field named b, the field name after parsing the value is value_b.
NoteThe value of value.fields-prefix cannot be a prefix of key.fields-prefix.
metadata.list
The metadata columns to be passed to the downstream.
No
String
None
Available metadata columns include
topic,partition,offset,timestamp,timestamp-type,headers,leader-epoch,__raw_key__, and__raw_value__. Use commas to separate them.scan.value.initial-schemas.ddls
Specifies the initial schema for certain tables using DDL.
No
String
None
Use a semicolon (
;) to connect multiple DDL statements. For example, useCREATE TABLE a.b (id BIGINT, name VARCHAR(10)); CREATE TABLE b.c (id BIGINT);to specify the initial schema for tables a.b and b.c respectively.NoteThis configuration is supported in VVR 11.5 and later.
ingestion.ignore-errors
Specifies whether to ignore errors during data parsing.
No
Boolean
false
NoteThis configuration is supported in VVR 11.5 and later.
ingestion.error-tolerance.max-count
When ignoring data parsing errors, specifies the cumulative number of errors after which the job fails.
No
Integer
-1
This takes effect only when ingestion.ignore-errors is enabled. The default value -1 means that parsing exceptions do not cause the job to fail.
NoteThis configuration is supported in VVR 11.5 and later.
Source table Debezium JSON format
Parameter
Required
Data type
Default value
Description
debezium-json.distributed-tables
No
Boolean
false
If data for a single table in Debezium JSON appears in multiple partitions, you need to enable this option.
NoteThis configuration item is supported only in VVR 8.0.11 and later.
ImportantAfter modifying this configuration item, you need to start the job without a state.
debezium-json.schema-include
No
Boolean
false
When setting up the Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include the schema in the message. This option indicates whether the Debezium JSON message includes the schema.
Valid values:
true: The Debezium JSON message includes the schema.
false: The Debezium JSON message does not include the schema.
debezium-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row when a parsing error occurs.
false (default): Reports an error, and the job fails to start.
debezium-json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to parse all types as String when parsing the table schema.
Valid values:
true: Parses all primitive data types as String.
false (default): Parses according to the basic rules.
Source table Canal JSON format
Parameter
Required
Data type
Default value
Description
canal-json.distributed-tables
No
Boolean
false
If data for a single table in Canal JSON appears in multiple partitions, you need to enable this option.
NoteThis configuration item is supported only in VVR 8.0.11 and later.
ImportantAfter modifying this configuration item, you need to start the job without a state.
canal-json.database.include
No
String
None
An optional regular expression that matches the database metadata field in Canal records. It reads only the changelog records of the specified database. The regular expression string is compatible with Java's Pattern.
canal-json.table.include
No
String
None
An optional regular expression that matches the table metadata field in Canal records. It reads only the changelog records of the specified table. The regular expression string is compatible with Java's Pattern.
canal-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row when a parsing error occurs.
false (default): Reports an error, and the job fails to start.
canal-json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to parse all types as String when parsing the table schema.
Valid values:
true: Parses all primitive data types as String.
false (default): Parses according to the basic rules.
canal-json.infer-schema.strategy
No
String
AUTO
The parsing strategy for the table schema.
Valid values:
AUTO (default): Automatically parses by analyzing the JSON data. If the data does not contain a sqlType field, we recommend using AUTO to avoid parsing failures.
SQL_TYPE: Parses using the sqlType array in the Canal JSON data. If the data contains a sqlType field, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE to get a more precise type.
MYSQL_TYPE: Parses using the mysqlType array in the Canal JSON data.
When the Canal JSON data in Kafka contains a sqlType field and a more precise type mapping is needed, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE.
For sqlType mapping rules, see Canal JSON schema parsing.
NoteThis configuration is supported in VVR 11.1 and later.
MYSQL_TYPE is supported in VVR 11.3 and later.
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
No
Boolean
true
Specifies whether to map the MySQL timestamp type to the CDC timestamp type:
true (default): The MySQL timestamp type is mapped to the CDC timestamp type.
false: The MySQL timestamp type is mapped to the CDC timestamp_ltz type.
canal-json.mysql.treat-tinyint1-as-boolean.enabled
No
Boolean
true
When parsing with MYSQL_TYPE, specifies whether to map the MySQL tinyint(1) type to the CDC boolean type:
true (default): The MySQL tinyint(1) type is mapped to the CDC boolean type.
false: The MySQL tinyint(1) type is mapped to the CDC tinyint(1) type.
This configuration takes effect only when canal-json.infer-schema.strategy is set to MYSQL_TYPE.
Source table JSON format
Parameter
Required
Data type
Default value
Description
json.timestamp-format.standard
No
String
SQL
Specifies the input and output timestamp format. Valid values:
SQL: Parses input timestamps in the yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123.
ISO-8601: Parses input timestamps in the yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123.
json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row when a parsing error occurs.
false (default): Reports an error, and the job fails to start.
json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to parse all types as String when parsing the table schema.
Valid values:
true: Parses all primitive data types as String.
false (default): Parses according to the basic rules.
json.infer-schema.flatten-nested-columns.enable
No
Boolean
false
When parsing JSON data, specifies whether to recursively expand nested columns in the JSON. Valid values:
true: Recursively expands.
false (default): Treats nested columns as String.
json.decode.parser-table-id.fields
No
String
None
When parsing JSON data, specifies whether to use some JSON field values to generate the tableId. Use commas (
,) to connect multiple fields. For example, if the JSON data is{"col0":"a", "col1","b", "col2","c"}, the results are as follows:Configuration
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
json.infer-schema.fixed-types
No
String
None
When parsing JSON data, specifies the concrete types for certain fields. Use a comma (
,) to connect multiple fields. For example:id BIGINT, name VARCHAR(10)specifies that the `id` field in the JSON data has the type BIGINT, and the `name` field has the type VARCHAR(10).NoteThis configuration is supported in VVR 11.5 and later.
Sink table
Parameter
Description
Required
Data type
Default value
Remarks
type
Specifies the target type.
Yes
String
None
The value must be kafka.
name
Specifies the name of the target.
No
String
None
None
topic
Specifies the name of the Kafka topic.
No
String
None
Once enabled, all data is written to this topic.
NoteIf this parameter is not specified, each data record is written to a topic whose name is the TableID string. The TableID is generated by concatenating the database name and table name with a period (
.). For example,databaseName.tableName.partition.strategy
Specifies the strategy for writing data to Kafka partitions.
No
String
all-to-zero
Valid values:
all-to-zero (default): Writes all data to partition 0.
hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures that data with the same primary key is written to the same partition and remains in order.
sink.tableId-to-topic.mapping
Specifies the mapping from upstream table names to downstream Kafka topic names.
No
String
None
Separate mappings with a semicolon (
;). Within each mapping, separate the upstream table name and the downstream Kafka topic name with a colon (:). You can use regular expressions for table names. To map multiple tables to the same topic, separate the table names with a comma (,). For example:mydb.mytable1:topic1;mydb.mytable2:topic2.NoteYou can use this parameter to change the mapped topic while preserving the original table name information.
Sink table Debezium JSON format
Parameter
Required
Data type
Default value
Description
debezium-json.include-schema.enabled
No
Boolean
false
Specifies whether to include schema information in the Debezium JSON data.
Usage examples
Use Kafka as a data ingestion source:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENUse Kafka as a data ingestion target:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}Here, the route module is used to set the topic name for writing the source table to Kafka.
ApsaraMQ for Kafka does not enable automatic topic creation by default. For more information, see Issues related to automatic topic creation. When you write to ApsaraMQ for Kafka, you must create the corresponding topic in advance. For more information, see Step 3: Create resources.
Table schema parsing and change synchronization strategy
Partition message pre-consumption and table schema initialization
The Kafka connector maintains the schema of all currently known tables. Before reading Kafka data, the Kafka connector attempts to pre-consume up to scan.max.pre.fetch.records messages in each partition. It parses the schema of each data record and merges these schemas to initialize the table schema information. Then, before consuming the data, it generates corresponding table creation events based on the initialized schema.
NoteFor Debezium JSON and Canal JSON formats, the table information is in the specific message. The scan.max.pre.fetch.records messages that are pre-consumed may contain data from several tables. Therefore, the number of pre-consumed data records for each table cannot be determined. Pre-consumption and table schema initialization are performed only once before the messages of each partition are consumed and processed. If new table data appears later, the table schema parsed from the first data record of that table is used as the initial table schema. The corresponding table schema is not re-consumed and initialized.
ImportantOnly VVR 8.0.11 and later support distributing data from a single table across multiple partitions. For this scenario, you must set the configuration item debezium-json.distributed-tables or canal-json.distributed-tables to true.
Table information
For Canal JSON and Debezium JSON formats, the table information, including the database and table name, is parsed from the specific message.
For JSON format, the table information includes only the table name, which is the name of the topic where the data is located.
Primary key information
For Canal JSON format, the primary key of the table is defined based on the pkNames field in the JSON.
For Debezium JSON and JSON formats, the JSON does not contain primary key information. You can manually add a primary key to the table using a transform rule:
transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
Schema parsing and schema evolution
After the table schema is initialized, if schema.inference.strategy is set to static, the Kafka connector parses the value of each message based on the initial table schema and does not generate schema evolution events. If schema.inference.strategy is set to continuous, the Kafka connector parses the value of each Kafka message, extracts the physical columns of the message, and compares them with the currently maintained schema. If the parsed schema is inconsistent with the current schema, it attempts to merge the schemas and generates a table schema evolution event. The merging rules are as follows:
If the parsed physical columns contain fields that are not in the current schema, these fields are added to the schema, and an event to add nullable columns is generated.
If the parsed physical columns do not contain fields that are already in the current schema, the fields are retained, and their data is filled with NULL. No event to delete columns is generated.
If there are columns with the same name, they are handled as follows:
If the types are the same but the precision is different, the type with the higher precision is used, and a column type change event is generated.
If the types are different, the smallest parent node in the tree structure shown in the following figure is identified and used as the type for the column with the same name. A column type change event is also generated.

The currently supported schema evolution strategies are as follows:
Add column: Adds the corresponding column to the end of the current schema and synchronizes the data of the new column. The new column is set to be nullable.
Delete column: Does not generate a delete column event. Instead, the data of that column is automatically filled with NULL values later.
Rename column: Is treated as adding and deleting a column. The renamed column is added to the end of the current schema, and the data of the column before renaming is filled with NULL values.
Column type change:
For downstream systems that support column type changes, after the downstream sink supports handling column type changes, the data ingestion job supports changing the type of ordinary columns, for example, from INT to BIGINT. Such changes depend on the column type change rules supported by the downstream sink. Different sink tables support different column type change rules. Refer to the documentation for the specific sink table to learn about its supported column type change rules.
For downstream systems that do not support column type changes, such as Hologres, you can use broad type mapping. This means that when the job starts, a table with broader types is created in the downstream system. When a column type change occurs, it is determined whether the downstream sink can accept the type change. This provides tolerant support for column type changes.
Currently unsupported schema changes:
Changes to constraints such as primary keys or indexes.
Changing from NOT NULL to NULLABLE.
Canal JSON schema parsing
Canal JSON data may contain an optional sqlType field, which records the precise type information of the data columns. To obtain a more accurate schema, you can set canal-json.infer-schema.strategy to SQL_TYPE to use the types in sqlType. The type mapping relationships are as follows:
JDBC Type
Type Code
CDC Type
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
-5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
Other types
Table name and topic mapping strategy
When you use Kafka as the target for a data ingestion job, you must configure the table name and topic mapping strategy carefully. This is because the Kafka message format (debezium-json or canal-json) also contains table name information, and downstream systems often use the table name in the data as the actual table name, not the topic name.
Suppose you need to synchronize two tables, mydb.mytable1 and mydb.mytable2, from MySQL. The possible configuration strategies are as follows:
1. Do not configure any mapping strategy
Without any mapping strategy, each table is written to a topic named "database_name.table_name". Therefore, data from mydb.mytable1 is written to a topic named mydb.mytable1, and data from mydb.mytable2 is written to a topic named mydb.mytable2. The following code provides a configuration example:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. Configure route rules for mapping (not recommended)
In many scenarios, users do not want the topic to be in the "database_name.table_name" format. They want to write data to a specified topic, so they configure route rules for mapping. The following code provides a configuration example:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytable1In this case, all data from mydb.mytable1 and mydb.mytable2 is written to the mytable1 topic.
However, when you use a route rule to change the topic name, it also changes the table name information in the Kafka message (debezium-json or canal-json format). In this case, all table names in the Kafka messages become mytable1, which may cause unexpected behavior when other systems consume Kafka messages from this topic.
3. Configure the sink.tableId-to-topic.mapping parameter for mapping (recommended)
To configure the mapping rule between table names and topics while you retain the source table name information, use the sink.tableId-to-topic.mapping parameter. The following code provides a configuration example:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}or
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}In this case, all data from mydb.mytable1 and mydb.mytable2 is written to the mytable1 topic. The table name information in the Kafka message (debezium-json or canal-json format) remains mydb.mytable1 or mydb.mytable2. When other systems consume Kafka messages from this topic, they can correctly retrieve the source table name information.
Precautions for EXACTLY_ONCE semantics
Configure consumer isolation level
All applications that consume Kafka data must set isolation.level:
read_committed: Reads only committed data.read_uncommitted(default): Can read uncommitted data.
EXACTLY_ONCE depends on
read_committed. Otherwise, consumers may see uncommitted data, which compromises consistency.Transaction timeout and data loss
When Flink recovers from a checkpoint, it relies only on transactions that were committed before that checkpoint began. If the time from job crash to restart exceeds the Kafka transaction timeout, Kafka automatically aborts the transaction, which causes data loss.
The default
transaction.max.timeout.mson the Kafka broker is 15 minutes.The default
transaction.timeout.msfor the Flink Kafka sink is 1 hour.You must increase
transaction.max.timeout.mson the broker side to be greater than or equal to the Flink setting.
Producer pool and checkpoint concurrency
EXACTLY_ONCE mode uses a fixed-size pool of Kafka producers. Each checkpoint occupies one producer from the pool. If the number of concurrent checkpoints exceeds the pool size, the job will fail.
Adjust the producer pool size based on the maximum number of concurrent checkpoints.
Parallelism scale-in limitation
If the job fails before the first checkpoint, the original producer pool information is not retained upon restart. Therefore, do not reduce the job's degree of parallelism before the first checkpoint is completed. If you must scale in, the degree of parallelism must be greater than or equal to
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.Transaction blocking reads
In
read_committedmode, any unfinished transaction (neither committed nor aborted) blocks reading from the entire topic.For example:
Transaction 1 writes data.
Transaction 2 writes and commits data.
As long as transaction 1 is not finished, the data from transaction 2 is not visible to consumers.
Therefore:
During normal operation, the data visibility latency is approximately equal to the checkpoint interval.
When a job fails, the topic being written to will block consumers until the job restarts or the transaction times out. In extreme cases, a transaction timeout can even affect reading.