All Products
Search
Document Center

Realtime Compute for Apache Flink:Message Queue for Kafka

Last Updated:Jan 14, 2026

This topic describes how to use the ApsaraMQ for Kafka connector.

Background information

Apache Kafka is an open source, distributed message queue system widely used in big data applications for high-performance data processing, stream analytics, and data integration. The Kafka connector, based on the open source Apache Kafka client, provides high-performance data throughput for Realtime Compute for Apache Flink. It supports reading and writing multiple data formats and offers exactly-once semantics.

Category

Details

Supported type

Source table, sink table, and data ingestion target

Execution mode

Streaming mode

Data format

Supported data formats

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

Note
  • Only Ververica Runtime (VVR) 8.0.9 and later support the built-in Protobuf data format.

  • Each of the supported data formats has corresponding configuration items that you can use directly in the WITH clause. For more information, see the Flink community documentation.

Specific metrics

Specific metrics

  • Source table

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Sink table

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Metric descriptions.

API type

SQL, Datastream, and data ingestion YAML

Can you update or delete data in a sink table?

The connector does not support updating or deleting data in sink tables. It only supports inserting data.

Note

For information about updating and deleting data, see Upsert Kafka.

Prerequisites

You can connect to a cluster using one of the following methods:

  • Connect to an ApsaraMQ for Kafka cluster

    • The Kafka cluster is version 0.11 or later.

    • You have created an ApsaraMQ for Kafka cluster. For more information, see Create resources.

    • The Flink workspace and the Kafka cluster are in the same VPC, and the CIDR block of the Flink VPC is added to the whitelist of the ApsaraMQ for Kafka cluster. For more information, see Configure a whitelist.

    Important

    Limitations on writing to ApsaraMQ for Kafka:

    • ApsaraMQ for Kafka does not support writing data in the zstd compression format.

    • ApsaraMQ for Kafka supports neither idempotent nor transactional writes. Therefore, you cannot use the exactly-once semantics feature of the Kafka sink table. If you use Ververica Runtime (VVR) 8.0.0 or later, you must add the properties.enable.idempotence=false configuration item to the sink table to disable idempotent writes. For a comparison of storage engines and feature limitations of ApsaraMQ for Kafka, see Storage engine comparison.

  • Connect to a self-managed Apache Kafka cluster

    • The self-managed Apache Kafka cluster is version 0.11 or later.

    • You have established network connectivity between Flink and the self-managed Apache Kafka cluster. For more information about how to connect to a self-managed cluster over the Internet, see Select a network connection method.

    • Only client configuration items for Apache Kafka 2.8 are supported. For more information, see the Apache Kafka documentation for consumer and producer configurations.

Precautions

Transactional writes are not recommended because of design limitations in the Flink and Kafka communities. When you set sink.delivery-guarantee = exactly-once, the Kafka connector enables transactional writes, which has the following three known issues:

  • Each checkpoint generates a transaction ID. If the checkpoint interval is too short, too many transaction IDs are generated. This can cause the coordinator in the Kafka cluster to run out of memory and compromise the stability of the Kafka cluster.

  • Each transaction creates a producer instance. If too many transactions are committed at the same time, the TaskManager may run out of memory and compromise the stability of the Flink job.

  • If multiple Flink jobs use the same sink.transactional-id-prefix, the transaction IDs they generate may conflict. When one job fails to write data, it blocks the Log Start Offset (LSO) of the Kafka partition from advancing. This affects all consumers that read data from the partition.

If you require exactly-once semantics, use an Upsert Kafka sink table and use primary keys to ensure idempotence. To use transactional writes, see Precautions for EXACTLY_ONCE semantics.

Troubleshoot network connectivity

If a Flink job reports a Timed out waiting for a node assignment error at startup, it is usually caused by a network connectivity issue between Flink and Kafka.

A Kafka client connects to the server in the following steps:

  1. The client uses the addresses in bootstrap.servers to connect to Kafka.

  2. Kafka returns the metadata of each broker in the cluster, including their connection addresses.

  3. The client then uses these returned addresses to connect to each broker for read and write operations.

Even if the bootstrap.servers addresses are reachable, the client cannot read or write data if Kafka returns incorrect broker addresses. This issue often occurs in network architectures that use proxies, port forwarding, or leased lines.

Troubleshooting steps

ApsaraMQ for Kafka

  1. Confirm the endpoint type

    • Default endpoint (internal network)

    • SASL endpoint (internal network + authentication)

    • Public endpoint (requires a separate request)

    You can use the network probe in the Flink development console to rule out connectivity issues with the bootstrap.servers address.

  2. Check security groups and whitelists

    The CIDR block of the Flink VPC must be added to the whitelist of the Kafka instance. For more information, see View the VPC CIDR block and Configure a whitelist.

  3. Check the SASL configuration (if enabled)

    If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in the Flink job. Missing authentication can cause the connection to fail during the handshake, which can also appear as a timeout. For more information, see Security and authentication.

Self-managed Kafka (ECS)

  1. Use the network probe in the Flink development console.

    You can use the network probe to rule out connectivity issues with the bootstrap.servers address and confirm that the internal and public endpoints are correct.

  2. Check security groups and whitelists

    • The ECS security group must allow access to the Kafka endpoint port (usually 9092 or 9093).

    • The CIDR block of the Flink VPC must be added to the whitelist of the ECS instance. For more information, see View the VPC CIDR block.

  3. Check the configuration

    1. Log on to the ZooKeeper cluster that Kafka uses. You can use the zkCli.sh or zookeeper-shell.sh tool.

    2. Run a command to retrieve broker metadata. For example, run get /brokers/ids/0. In the result, find the address that Kafka advertises to clients in the `endpoints` field.

      example

    3. Use the network probe in the Flink development console to test whether the address is reachable.

      Note
      • If the address is not reachable, ask the Kafka Operations and Maintenance (O&M) engineer to check and correct the listeners and advertised.listeners configurations. Ensure that the returned address is accessible to Flink.

      • For more information about the connection between Kafka clients and servers, see Troubleshoot Connectivity.

  4. Check the SASL configuration (if enabled)

    If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in the Flink job. Missing authentication can cause the connection to fail during the handshake, which can also appear as a timeout. For more information, see Security and authentication.

SQL

You can use the Kafka connector in SQL jobs as a source table or a sink table.

Syntax

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Metadata columns

You can define metadata columns in source and sink tables to retrieve or write metadata for Kafka messages. For example, if you define multiple topics in the WITH clause and define a metadata column in the Kafka source table, Flink can identify the topic from which each record is read. The following example shows how to use metadata columns.

CREATE TABLE kafka_source (
  -- Read the topic of the message as the `record_topic` field.
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- Read the timestamp from the ConsumerRecord as the `ts` field.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- Read the offset of the message as the `record_offset` field.
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- Write the timestamp from the `ts` field to Kafka as the timestamp of the ProducerRecord.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

The following table lists the supported metadata columns for Kafka source and sink tables.

Key

Data type

Description

Source or sink table

topic

STRING NOT NULL METADATA VIRTUAL

The name of the topic that contains the Kafka message.

Source table

partition

INT NOT NULL METADATA VIRTUAL

The partition ID of the Kafka message.

Source table

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

The headers of the Kafka message.

Source and sink tables

leader-epoch

INT NOT NULL METADATA VIRTUAL

The leader epoch of the Kafka message.

Source table

offset

BIGINT NOT NULL METADATA VIRTUAL

The offset of the Kafka message.

Source table

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

The timestamp of the Kafka message.

Source and sink tables

timestamp-type

STRING NOT NULL METADATA VIRTUAL

The timestamp type of the Kafka message:

  • NoTimestampType: No timestamp is defined in the message.

  • CreateTime: The time when the message was created.

  • LogAppendTime: The time when the message was appended to the Kafka broker.

Source table

__raw_key__

STRING NOT NULL METADATA VIRTUAL

The key field of the raw Kafka message.

Source and sink tables

__raw_value__

STRING NOT NULL METADATA VIRTUAL

The value field of the raw Kafka message.

Source and sink tables

WITH parameters

  • Common

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The table type.

    String

    Yes

    None

    The value must be kafka.

    properties.bootstrap.servers

    The Kafka broker address.

    String

    Yes

    None

    The format is host:port,host:port,host:port. Use commas (,) to separate multiple addresses.

    properties.*

    Direct configurations for the Kafka client.

    String

    No

    None

    The suffix must be a producer or consumer configuration defined in the official Kafka documentation.

    Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use 'properties.allow.auto.create.topics'='false' to disable automatic topic creation.

    Do not use this method to modify the following configurations, because they are overwritten by the Kafka connector:

    • key.deserializer

    • value.deserializer

    format

    The format used to read or write the value part of a Kafka message.

    String

    No

    None

    Supported formats:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    For more information about format parameters, see Format Parameters.

    key.format

    The format used to read or write the key part of a Kafka message.

    String

    No

    None

    Supported formats:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    If you use this configuration, the key.options configuration is required.

    key.fields

    The fields in the source or sink table that correspond to the key part of the Kafka message.

    String

    No

    None

    Use semicolons (;) to separate multiple field names. For example: field1;field2

    key.fields-prefix

    Specifies a custom prefix for all Kafka message key fields to avoid name conflicts with the value part format fields.

    String

    No

    None

    This configuration item is only used to distinguish column names in source and sink tables. The prefix is removed when parsing and generating the key part of the Kafka message.

    Note

    If you use this configuration, you must set value.fields-include to EXCEPT_KEY.

    value.format

    The format used to read or write the value part of a Kafka message.

    String

    No

    None

    This configuration is equivalent to format. You can only set one of format or value.format. If both are configured, value.format overwrites format.

    value.fields-include

    Specifies whether to include the fields corresponding to the key part of the message when parsing or generating the value part of the Kafka message.

    String

    No

    ALL

    Valid values:

    • ALL (default): All columns are processed as the value part of the Kafka message.

    • EXCEPT_KEY: The remaining fields, excluding those defined in key.fields, are processed as the value part of the Kafka message.

  • Source table

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic to read from.

    String

    No

    None

    Use semicolons (;) to separate multiple topic names, such as topic-1 and topic-2.

    Note

    You can specify only one of the topic and topic-pattern options.

    topic-pattern

    A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job runs.

    String

    No

    None

    Note

    You can specify only one of the topic and topic-pattern options.

    properties.group.id

    The consumer group ID.

    String

    No

    KafkaSource-{source_table_name}

    If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.

    scan.startup.mode

    The start offset for reading data from Kafka.

    String

    No

    group-offsets

    Valid values:

    • earliest-offset: Starts reading from the earliest partition in Kafka.

    • latest-offset: Starts reading from the latest offset in Kafka.

    • group-offsets (default): Starts reading from the committed offset of the specified properties.group.id.

    • timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.

    • specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.

    Note

    This parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.

    scan.startup.specific-offsets

    In specific-offsets startup mode, specifies the start offset for each partition.

    String

    No

    None

    For example: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    In timestamp startup mode, specifies the start offset timestamp.

    Long

    No

    None

    The unit is milliseconds.

    scan.topic-partition-discovery.interval

    The interval for dynamically detecting Kafka topics and partitions.

    Duration

    No

    5 minutes

    The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.

    Note

    In VVR 6.0.x, dynamic partition discovery is disabled by default. Starting from VVR 8.0, this feature is enabled by default, and the detection interval is set to 5 minutes.

    scan.header-filter

    Conditionally filters data based on whether the Kafka data contains a specified header.

    String

    No

    None

    Use a colon (:) to separate the header key and value. Use logical operators (&, |) to connect multiple header conditions. The negation logical operator (!) is also supported. For example, depart:toy|depart:book&!env:test retains Kafka data where the header contains depart=toy or depart=book, and does not contain env=test.

    Note
    • This parameter is supported only in VVR 8.0.6 and later.

    • Parenthetical operations are not supported.

    • Logical operations are performed from left to right.

    • The header value is converted to a string in UTF-8 format for comparison with the specified header value.

    scan.check.duplicated.group.id

    Specifies whether to check for duplicate consumer groups specified by properties.group.id.

    Boolean

    No

    false

    Valid values:

    • true: Before starting the job, the system checks for duplicate consumer groups. If a duplicate is found, the job reports an error and stops, avoiding conflicts with existing consumer groups.

    • false: The job starts directly without checking for consumer group conflicts.

    Note

    This parameter is supported only in VVR 6.0.4 and later.

  • Sink table

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic to write to.

    String

    Yes

    None

    None

    sink.partitioner

    The mapping mode from Flink concurrency to Kafka partitions.

    String

    No

    default

    Valid values:

    • default: Uses the default Kafka partition mode.

    • fixed: Each Flink concurrency corresponds to a fixed Kafka partition.

    • round-robin: Data from Flink concurrencies is allocated to Kafka partitions in a round-robin fashion.

    • Custom partition mapping mode: If fixed and round-robin do not meet your needs, you can create a subclass of FlinkKafkaPartitioner to define a custom partition mapping mode. For example: org.mycompany.MyPartitioner

    sink.delivery-guarantee

    The semantic mode of the Kafka sink table.

    String

    No

    at-least-once

    Valid values:

    • none: No guarantee is provided. Data may be lost or duplicated.

    • at-least-once (default): Guarantees that no data is lost, but data may be duplicated.

    • exactly-once: Uses Kafka transactions to guarantee that data is not lost or duplicated.

    Note

    When using exactly-once semantics, the sink.transactional-id-prefix parameter is required.

    sink.transactional-id-prefix

    The prefix for the Kafka transaction ID used in exactly-once semantics.

    String

    No

    None

    This configuration takes effect only when sink.delivery-guarantee is set to exactly-once.

    sink.parallelism

    The degree of parallelism for the Kafka sink table operator.

    Integer

    No

    None

    The degree of parallelism of the upstream operator is determined by the framework.

Security and authentication

If the Kafka cluster requires a secure connection or authentication, add the relevant security and authentication configurations with the properties. prefix to the WITH clause. The following example shows how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

The following example shows how to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*SSL configuration*/
  /*Configure the path of the truststore (CA certificate) provided by the server.*/
  /*Files uploaded through File Management are stored in the /flink/usrlib/ path.*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*If client authentication is required, configure the path of the keystore (private key).*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*The algorithm for client to verify the server address. An empty value disables server address verification.*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*SASL configuration*/
  /*Configure the SASL mechanism as SCRAM-SHA-256.*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*Configure JAAS*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

You can upload the CA certificate and private key mentioned in the example to the platform using the File Management feature in the Realtime Compute console. After you upload the files, they are stored in the /flink/usrlib directory. If the CA certificate file to be used is named my-truststore.jks, you can specify 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' in the WITH clause to use this certificate.

Note
  • The preceding examples apply to most configuration scenarios. Before you configure the Kafka connector, contact the Kafka server O&M engineer to obtain the correct security and authentication configuration information.

  • Unlike open source Flink, the SQL editor in Realtime Compute for Apache Flink automatically escapes double quotation marks ("). You do not need to add extra escape characters (\) for double quotation marks in the username and password when you configure properties.sasl.jaas.config.

Source table start offset

Startup mode

You can specify the initial read offset for a Kafka source table by configuring scan.startup.mode:

  • earliest-offset: Starts reading from the earliest offset of the current partition.

  • latest-offset: Starts reading from the latest offset of the current partition.

  • group-offsets: Starts reading from the committed offset of the specified group ID. The group ID is specified by properties.group.id.

  • timestamp: Starts reading from the first message whose timestamp is greater than or equal to the specified time. The timestamp is specified by scan.startup.timestamp-millis.

  • specific-offsets: Starts consuming from the specified partition offset. The offset is specified by scan.startup.specific-offsets.

Note
  • If you do not specify a start offset, consumption starts from the committed offset (group-offsets) by default.

  • scan.startup.mode takes effect only for jobs that start without a state. When a stateful job starts, it consumes from the offset stored in its state.

The following code provides an example:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- Consume from the earliest offset.
  'scan.startup.mode' = 'earliest-offset',
  -- Consume from the latest offset.
  'scan.startup.mode' = 'latest-offset',
  -- Consume from the committed offset of the consumer group "my-group".
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- If "my-group" is used for the first time, consume from the earliest offset.
  'properties.auto.offset.reset' = 'latest', -- If "my-group" is used for the first time, consume from the latest offset.
  -- Consume from the specified millisecond timestamp 1655395200000.
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- Consume from the specified offset.
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

Start offset priority

The start offset for the source table is prioritized as follows:

Priority (High to Low)

Offset stored in a checkpoint or savepoint

Start time selected in the Realtime Compute console when starting the job

Start offset specified by scan.startup.mode in the WITH clause

If scan.startup.mode is not specified, group-offsets is used by default, and consumption starts from the offset of the corresponding consumer group.

In any of the preceding steps, if the offset is invalid because it has expired or the Kafka cluster has issues, the policy specified by properties.auto.offset.reset is used to reset the offset. If this configuration item is not set, an exception is thrown and requires user intervention.

A common scenario is starting consumption with a new group ID. The source table first queries the Kafka cluster for the committed offset of this group. Because the group ID is being used for the first time, no valid offset is found. The offset is then reset according to the policy specified by the properties.auto.offset.reset parameter. Therefore, when you consume data with a new group ID, you must configure properties.auto.offset.reset to specify the offset reset policy.

Source table offset commit

The Kafka source table commits the current consumer offset to the Kafka cluster only after a checkpoint is successful. If the checkpoint interval is long, the observed consumer offset in the Kafka cluster will be delayed. During a checkpoint, the Kafka source table stores the current read progress in its state and does not rely on the offset committed to the cluster for fault recovery. Committing the offset is only for monitoring the read progress on the Kafka side. A failed offset commit does not affect the correctness of the data.

Sink table custom partitioner

If the built-in Kafka producer partition modes do not meet your needs, you can implement a custom partition mode to write data to the corresponding partitions. A custom partitioner must inherit FlinkKafkaPartitioner. After development, you can compile the JAR package and upload it to the Realtime Compute console using the File Management feature. After you upload and reference the package, you can set the sink.partitioner parameter in the WITH clause to the full class path of the partitioner, such as org.mycompany.MyPartitioner.

Choose between Kafka, Upsert Kafka, or Kafka JSON catalog

Kafka is an append-only message queue system. It does not support data updates or deletions. Therefore, it cannot handle upstream CDC change data or retraction logic from operators such as aggregation and join in streaming SQL. To write data with changes or retractions to Kafka, use an Upsert Kafka sink table, which is designed to handle change data.

To easily synchronize change data from one or more tables in an upstream database to Kafka in batches, you can use a Kafka JSON catalog. If the data stored in Kafka is in JSON format, using a Kafka JSON catalog eliminates the need to define a schema and WITH parameters. For more information, see Manage a Kafka JSON Catalog.

Usage examples

Example 1: Read data from Kafka and write it to Kafka

Read Kafka data from a topic named 'source' and write it to a topic named 'sink'. The data is in CSV format.

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Example 2: Synchronize table schema and data

Synchronize messages from a Kafka topic to Hologres in real time. In this case, you can use the offset and partition ID of the Kafka message as the primary key to ensure that no duplicate messages exist in Hologres during failover.

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- Optional. Flattens all nested columns.
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Example 3: Synchronize table schema and Kafka message key and value data

The key part of a Kafka message already stores relevant information. You can synchronize both the key and value from Kafka.

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Note

The key part of a Kafka message does not support table schema evolution or type parsing. You need to declare it manually.

Example 4: Synchronize table schema and data and perform computation

When you synchronize Kafka data to Hologres, lightweight computations are often required.

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Use COALESCE to handle null values.

Example 5: Parse nested JSON

JSON message example

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

To avoid using functions such as JSON_VALUE(payload, '$.properties.owner') to parse fields later, you can define the structure directly in the Source DDL:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

This way, Flink parses the JSON into structured fields at read time. Subsequent SQL queries can directly use properties.owner without extra function calls, which results in better overall performance.

Datastream API

Important

When you read and write data using DataStream, you must use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For information about how to set up the DataStream connector, see Use a DataStream connector.

  • Build a Kafka source

    The Kafka source provides a builder class to create an instance of the Kafka source. The following example code shows how to build a Kafka source to consume data from the earliest offset of input-topic. The consumer group name is my-group, and the Kafka message body is deserialized as a string.

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    When you build a KafkaSource, you must specify the following parameters.

    Parameter

    Description

    BootstrapServers

    The Kafka broker address. Configure it using the setBootstrapServers(String) method.

    GroupId

    The consumer group ID. Configure it using the setGroupId(String) method.

    Topics or Partition

    The name of the subscribed topic or partition. The Kafka source provides the following three ways to subscribe to topics or partitions:

    • Topic list: Subscribes to all partitions in the topic list.

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • Regular expression matching: Subscribes to all partitions under the topics that match the regular expression.

      KafkaSource.builder().setTopicPattern("topic.*")
    • Partition list: Subscribes to the specified partitions.

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
              new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    The deserializer for parsing Kafka messages.

    Specify the deserializer using setDeserializer(KafkaRecordDeserializationSchema), where KafkaRecordDeserializationSchema defines how to parse Kafka's ConsumerRecord. If you only need to parse the data in the message body (value) of a Kafka message, you can do so in one of the following ways:

    • Use the setValueOnlyDeserializer(DeserializationSchema) method in the KafkaSource builder class provided by Flink. DeserializationSchema defines how to parse the binary data in the Kafka message body.

    • Use a parser provided by Kafka, which includes multiple implementation classes. For example, you can use StringDeserializer to parse the Kafka message body into a string.

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    Note

    To fully parse the ConsumerRecord, you must implement the KafkaRecordDeserializationSchema interface yourself.

    XML

    The Kafka DataStream connector is available in the Maven Central Repository.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    When you use the Kafka DataStream connector, you must understand the following Kafka properties:

    • Start offset

      The Kafka source can specify to start consuming from different offsets using an offset initializer (OffsetsInitializer). The built-in offset initializers include the following.

      Offset initializer

      Code setting

      Consume from the earliest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      Consume from the latest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      Start consuming from data whose timestamp is greater than or equal to the specified time, in milliseconds.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      Start consuming from the offset committed by the consumer group. If the committed offset does not exist, use the earliest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      Start consuming from the offset committed by the consumer group, without specifying an offset reset policy.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      Note
      • If the built-in initializers do not meet your needs, you can also implement a custom offset initializer.

      • If no offset initializer is specified, OffsetsInitializer.earliest() is used by default, which starts consumption from the earliest offset.

    • Streaming and batch modes

      The Kafka source supports both streaming and batch execution modes. By default, the Kafka source is set to run in streaming mode, so the job never stops until the Flink job fails or is canceled. To configure the Kafka source to run in batch mode, you can use setBounded(OffsetsInitializer) to specify a stop offset. When all partitions reach their stop offset, the Kafka source exits.

      Note

      Typically, the Kafka source has no stop offset in streaming mode. For debugging purposes, you can use setUnbounded(OffsetsInitializer) to specify a stop offset in streaming mode. Note that the method names for specifying the stop offset in streaming and batch modes (setUnbounded and setBounded) are different.

    • Dynamic Partition Check

      To handle scenarios such as topic scale-out or new topic creation without restarting the Flink job, you can enable the dynamic partition discovery feature in the provided topic or partition subscription mode.

      Note

      Dynamic partition discovery is enabled by default, and the partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. The following code provides an example.

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // Check for new partitions every 10 seconds.
      Important

      The dynamic partition discovery feature relies on the metadata update mechanism of the Kafka cluster. If the Kafka cluster does not update partition information in time, new partitions may not be discovered. Ensure that the partition.discovery.interval.ms configuration of the Kafka cluster matches the actual situation.

    • Event time and watermarks

      By default, the Kafka source uses the timestamp in the Kafka message as the event time. You can customize a watermark strategy to extract the event time from the message and send watermarks downstream.

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      For more information about custom watermark strategies, see Generating Watermarks.

      Note

      If some tasks of a parallel source are idle for a long time (for example, if a Kafka partition has no data input for a long time, or the degree of parallelism of the source exceeds the number of Kafka partitions), the watermark generation mechanism may fail. In this case, the system cannot trigger window calculations normally, which causes the data processing flow to stall.

      To resolve this issue, you can make the following adjustments:

      • Configure a watermark timeout mechanism: Enable the table.exec.source.idle-timeout parameter to force the system to generate a watermark after a specified timeout period, which ensures the progress of the window calculation cycle.

      • Optimize the data source: Maintain a reasonable ratio between the number of Kafka partitions and the degree of parallelism of the source (recommended: number of partitions ≥ source degree of parallelism).

    • Offset commit

      The Kafka source commits the current consumer offset when a checkpoint is completed to ensure consistency between the Flink checkpoint state and the committed offset on the Kafka broker. If checkpointing is not enabled, the Kafka source relies on the internal automatic offset commit logic of the Kafka consumer. The automatic commit feature is configured by the enable.auto.commit and auto.commit.interval.ms Kafka consumer configuration items.

      Note

      The Kafka source does not rely on the offset committed on the broker to recover from a failed job. Committing the offset is only for reporting the consumption progress of the Kafka consumer and consumer group for monitoring on the broker side.

    • Other properties

      In addition to the preceding properties, you can also use setProperties(Properties) and setProperty(String, String) to set any properties for the Kafka source and Kafka consumer. The KafkaSource typically has the following configuration items.

      Configuration item

      Description

      client.id.prefix

      Specifies the client ID prefix for the Kafka consumer.

      partition.discovery.interval.ms

      Defines the interval at which the Kafka source checks for new partitions.

      Note

      partition.discovery.interval.ms is overwritten to -1 in batch mode.

      register.consumer.metrics

      Specifies whether to register the Kafka consumer's metrics in Flink.

      Other Kafka consumer configurations

      For more information about Kafka consumer configurations, see Apache Kafka.

      Important

      The Kafka connector forcibly overwrites some manually configured parameters. The details are as follows:

      • key.deserializer is always overwritten to ByteArrayDeserializer.

      • value.deserializer is always overwritten to ByteArrayDeserializer.

      • auto.offset.reset.strategy is overwritten to OffsetsInitializer#getAutoOffsetResetStrategy().

      The following example shows how to configure a Kafka consumer to use PLAIN as the SASL mechanism and provide a JAAS configuration.

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • Monitoring

      The Kafka source registers metrics in Flink for monitoring and diagnosis.

      • Metric scope

        All metrics of the Kafka source reader are registered under the KafkaSourceReader metric group, which is a subgroup of the operator metric group. Metrics related to a specific topic partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.

        For example, the current consumer offset (currentOffset) for partition 1 of topic "my-topic" is registered under <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. The number of successful offset commits (commitsSucceeded) is registered under <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.

      • Metric list

        Metric name

        Description

        Scope

        currentOffset

        The current consumer offset.

        TopicPartition

        committedOffset

        The current committed offset.

        TopicPartition

        commitsSucceeded

        The number of successful commits.

        KafkaSourceReader

        commitsFailed

        The number of failed commits.

        KafkaSourceReader

      • Kafka consumer metrics

        The Kafka consumer's metrics are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the Kafka consumer metric records-consumed-total is registered under <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

        You can use the register.consumer.metrics configuration item to specify whether to register the Kafka consumer's metrics. By default, this option is set to true. For more information about Kafka consumer metrics, see Apache Kafka.

  • Build a Kafka sink

    The Flink Kafka sink can write stream data to one or more Kafka topics.

    DataStream<String> stream = ...
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", );
    KafkaSink<String> kafkaSink =
                    KafkaSink.<String>builder()
                            .setKafkaProducerConfig(kafkaProperties) // // producer config
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder()
                                            .setTopic("my-topic") // target topic
                                            .setKafkaValueSerializer(StringSerializer.class) // serialization schema
                                            .build())
                            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // fault-tolerance
                            .build();
    
    stream.sinkTo(kafkaSink);

    You need to configure the following parameters.

    Parameter

    Description

    Topic

    The default topic name to write data to.

    Data serialization

    When building, you need to provide a KafkaRecordSerializationSchema to convert the input data into Kafka's ProducerRecord. Flink provides a schema builder with common components such as message key/value serialization, topic selection, and message partitioning. You can also implement the corresponding interfaces for more control. The ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) method is called for each data record to generate a ProducerRecord to be written to Kafka.

    You can have fine-grained control over how each data record is written to Kafka. With ProducerRecord, you can perform the following operations:

    • Set the name of the topic to write to.

    • Define the message key.

    • Specify the partition to write data to.

    Kafka client properties

    bootstrap.servers is required. It is a comma-separated list of Kafka brokers.

    Fault tolerance semantics

    After enabling Flink's checkpointing, the Flink Kafka sink can guarantee exactly-once semantics. In addition to enabling Flink's checkpointing, you can also specify different fault tolerance semantics using the DeliveryGuarantee parameter. The details of the DeliveryGuarantee parameter are as follows:

    • DeliveryGuarantee.NONE: (default) Flink provides no guarantee. Data may be lost or duplicated.

    • DeliveryGuarantee.AT_LEAST_ONCE: Guarantees that no data is lost, but data may be duplicated.

    • DeliveryGuarantee.EXACTLY_ONCE: Uses Kafka transactions to provide exactly-once semantics.

      Note

      For precautions when using EXACTLY_ONCE semantics, see Precautions for EXACTLY_ONCE semantics.

Data ingestion

You can use the Kafka connector in YAML-based data ingestion job development to read from a source or write to a target.

Limits

  • We recommend that you use Kafka as a synchronous data source for Flink CDC data ingestion in VVR 11.1 and later.

  • Only the JSON, Debezium JSON, and Canal JSON formats are supported. Other data formats are not.

  • For data sources, only VVR 8.0.11 and later support distributing data from a single table across multiple partitions.

Syntax

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

Configuration items

  • Common

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    type

    The source or target type.

    Yes

    String

    None

    The value must be kafka.

    name

    The source or target name.

    No

    String

    None

    None

    properties.bootstrap.servers

    The Kafka broker address.

    Yes

    String

    None

    The format is host:port,host:port,host:port. Use commas (,) to separate multiple addresses.

    properties.*

    Direct configurations for the Kafka client.

    No

    String

    None

    The suffix must be a producer or consumer configuration defined in the official Kafka documentation.

    Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use 'properties.allow.auto.create.topics' = 'false' to disable automatic topic creation.

    key.format

    The format used to read or write the key part of a Kafka message.

    No

    String

    None

    • For Source, only json is supported.

    • For Sink, valid values are:

      • csv

      • json

    Note

    This parameter is supported only in VVR 11.0.0 and later.

    value.format

    The format used to read or write the value part of a Kafka message.

    No

    String

    debezium-json

    Valid values:

    • debezium-json 

    • canal-json

    • json

    Note
    • The debezium-json and canal-json formats are supported only in VVR 8.0.10 and later.

    • The json format is supported only in VVR 11.0.0 and later.

  • Source table

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    topic

    The name of the topic to read from.

    No

    String

    None

    Use semicolons (;) to separate multiple topic names, such as topic-1 and topic-2.

    Note

    You can specify only one of the topic and topic-pattern options.

    topic-pattern

    A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job runs.

    No

    String

    None

    Note

    You can specify only one of the topic and topic-pattern options.

    properties.group.id

    The consumer group ID.

    No

    String

    None

    If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.

    scan.startup.mode

    The start offset for reading data from Kafka.

    No

    String

    group-offsets

    Valid values:

    • earliest-offset: Starts reading from the earliest Kafka partition.

    • latest-offset: Starts reading from the latest offset in Kafka.

    • group-offsets (default): Starts reading from the committed offset of the specified properties.group.id.

    • timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.

    • specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.

    Note

    This parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.

    scan.startup.specific-offsets

    In specific-offsets startup mode, specifies the start offset for each partition.

    No

    String

    None

    For example: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    In timestamp startup mode, specifies the start offset timestamp.

    No

    Long

    None

    The unit is milliseconds.

    scan.topic-partition-discovery.interval

    The interval for dynamically detecting Kafka topics and partitions.

    No

    Duration

    5 minutes

    The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.

    scan.check.duplicated.group.id

    Specifies whether to check for duplicate consumer groups specified by properties.group.id.

    No

    Boolean

    false

    Valid values:

    • true: Before starting the job, checks for duplicate consumer groups. If a duplicate is found, the job reports an error to avoid conflicts with existing consumer groups.

    • false: The job starts directly without checking for consumer group conflicts.

    schema.inference.strategy

    The schema parsing strategy.

    No

    String

    continuous

    Valid values:

    • continuous: Parses the schema for each data record. When the schemas are incompatible, it derives a wider schema and generates a schema evolution event.

    • static: Parses the schema only once when the job starts. Subsequent data is parsed based on the initial schema, and no schema evolution events are generated.

    Note

    scan.max.pre.fetch.records

    The maximum number of messages to try to consume and parse for each partition during initial schema parsing.

    No

    Int

    50

    Before the job actually reads and processes data, it tries to pre-consume a specified number of the latest messages for each partition to initialize the schema information.

    key.fields-prefix

    A custom prefix added to the field names parsed from the message key to avoid naming conflicts after parsing the Kafka message key.

    No

    String

    None

    Assume this configuration item is set to key_. When the key contains a field named a, the field name after parsing the key is key_a.

    Note

    The value of `key.fields-prefix` cannot be a prefix of the value of `value.fields-prefix`.

    value.fields-prefix

    A custom prefix added to the field names parsed from the message value to avoid naming conflicts after parsing the Kafka message value.

    No

    String

    None

    Assume this configuration item is set to value_. When the value contains a field named b, the field name after parsing the value is value_b.

    Note

    The value of value.fields-prefix cannot be a prefix of key.fields-prefix.

    metadata.list

    The metadata columns to be passed to the downstream.

    No

    String

    None

    Available metadata columns include topic, partition, offset, timestamp, timestamp-type, headers, leader-epoch, __raw_key__, and __raw_value__. Use commas to separate them.

    scan.value.initial-schemas.ddls

    Specifies the initial schema for certain tables using DDL.

    No

    String

    None

    Use a semicolon (;) to connect multiple DDL statements. For example, use CREATE TABLE a.b (id BIGINT, name VARCHAR(10)); CREATE TABLE b.c (id BIGINT); to specify the initial schema for tables a.b and b.c respectively.

    Note

    This configuration is supported in VVR 11.5 and later.

    ingestion.ignore-errors

    Specifies whether to ignore errors during data parsing.

    No

    Boolean

    false

    Note

    This configuration is supported in VVR 11.5 and later.

    ingestion.error-tolerance.max-count

    When ignoring data parsing errors, specifies the cumulative number of errors after which the job fails.

    No

    Integer

    -1

    This takes effect only when ingestion.ignore-errors is enabled. The default value -1 means that parsing exceptions do not cause the job to fail.

    Note

    This configuration is supported in VVR 11.5 and later.

    • Source table Debezium JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      debezium-json.distributed-tables

      No

      Boolean

      false

      If data for a single table in Debezium JSON appears in multiple partitions, you need to enable this option.

      Note

      This configuration item is supported only in VVR 8.0.11 and later.

      Important

      After modifying this configuration item, you need to start the job without a state.

      debezium-json.schema-include

      No

      Boolean

      false

      When setting up the Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include the schema in the message. This option indicates whether the Debezium JSON message includes the schema.

      Valid values:

      • true: The Debezium JSON message includes the schema.

      • false: The Debezium JSON message does not include the schema.

      debezium-json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row when a parsing error occurs.

      • false (default): Reports an error, and the job fails to start.

      debezium-json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to parse all types as String when parsing the table schema.

      Valid values:

      • true: Parses all primitive data types as String.

      • false (default): Parses according to the basic rules.

    • Source table Canal JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      canal-json.distributed-tables

      No

      Boolean

      false

      If data for a single table in Canal JSON appears in multiple partitions, you need to enable this option.

      Note

      This configuration item is supported only in VVR 8.0.11 and later.

      Important

      After modifying this configuration item, you need to start the job without a state.

      canal-json.database.include

      No

      String

      None

      An optional regular expression that matches the database metadata field in Canal records. It reads only the changelog records of the specified database. The regular expression string is compatible with Java's Pattern.

      canal-json.table.include

      No

      String

      None

      An optional regular expression that matches the table metadata field in Canal records. It reads only the changelog records of the specified table. The regular expression string is compatible with Java's Pattern.

      canal-json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row when a parsing error occurs.

      • false (default): Reports an error, and the job fails to start.

      canal-json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to parse all types as String when parsing the table schema.

      Valid values:

      • true: Parses all primitive data types as String.

      • false (default): Parses according to the basic rules.

      canal-json.infer-schema.strategy

      No

      String

      AUTO

      The parsing strategy for the table schema.

      Valid values:

      • AUTO (default): Automatically parses by analyzing the JSON data. If the data does not contain a sqlType field, we recommend using AUTO to avoid parsing failures.

      • SQL_TYPE: Parses using the sqlType array in the Canal JSON data. If the data contains a sqlType field, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE to get a more precise type.

      • MYSQL_TYPE: Parses using the mysqlType array in the Canal JSON data.

      When the Canal JSON data in Kafka contains a sqlType field and a more precise type mapping is needed, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE.

      For sqlType mapping rules, see Canal JSON schema parsing.

      Note
      • This configuration is supported in VVR 11.1 and later.

      • MYSQL_TYPE is supported in VVR 11.3 and later.

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      No

      Boolean

      true

      Specifies whether to map the MySQL timestamp type to the CDC timestamp type:

      • true (default): The MySQL timestamp type is mapped to the CDC timestamp type.

      • false: The MySQL timestamp type is mapped to the CDC timestamp_ltz type.

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      No

      Boolean

      true

      When parsing with MYSQL_TYPE, specifies whether to map the MySQL tinyint(1) type to the CDC boolean type:

      • true (default): The MySQL tinyint(1) type is mapped to the CDC boolean type.

      • false: The MySQL tinyint(1) type is mapped to the CDC tinyint(1) type.

      This configuration takes effect only when canal-json.infer-schema.strategy is set to MYSQL_TYPE.

    • Source table JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      json.timestamp-format.standard

      No

      String

      SQL

      Specifies the input and output timestamp format. Valid values:

      • SQL: Parses input timestamps in the yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123.

      • ISO-8601: Parses input timestamps in the yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123.

      json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row when a parsing error occurs.

      • false (default): Reports an error, and the job fails to start.

      json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to parse all types as String when parsing the table schema.

      Valid values:

      • true: Parses all primitive data types as String.

      • false (default): Parses according to the basic rules.

      json.infer-schema.flatten-nested-columns.enable

      No

      Boolean

      false

      When parsing JSON data, specifies whether to recursively expand nested columns in the JSON. Valid values:

      • true: Recursively expands.

      • false (default): Treats nested columns as String.

      json.decode.parser-table-id.fields

      No

      String

      None

      When parsing JSON data, specifies whether to use some JSON field values to generate the tableId. Use commas (,) to connect multiple fields. For example, if the JSON data is {"col0":"a", "col1","b", "col2","c"}, the results are as follows:

      Configuration

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

      json.infer-schema.fixed-types

      No

      String

      None

      When parsing JSON data, specifies the concrete types for certain fields. Use a comma (,) to connect multiple fields. For example: id BIGINT, name VARCHAR(10) specifies that the `id` field in the JSON data has the type BIGINT, and the `name` field has the type VARCHAR(10).

      Note

      This configuration is supported in VVR 11.5 and later.

  • Sink table

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    type

    Specifies the target type.

    Yes

    String

    None

    The value must be kafka.

    name

    Specifies the name of the target.

    No

    String

    None

    None

    topic

    Specifies the name of the Kafka topic.

    No

    String

    None

    Once enabled, all data is written to this topic.

    Note

    If this parameter is not specified, each data record is written to a topic whose name is the TableID string. The TableID is generated by concatenating the database name and table name with a period (.). For example, databaseName.tableName.

    partition.strategy

    Specifies the strategy for writing data to Kafka partitions.

    No

    String

    all-to-zero

    Valid values:

    • all-to-zero (default): Writes all data to partition 0.

    • hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures that data with the same primary key is written to the same partition and remains in order.

    sink.tableId-to-topic.mapping

    Specifies the mapping from upstream table names to downstream Kafka topic names.

    No

    String

    None

    Separate mappings with a semicolon (;). Within each mapping, separate the upstream table name and the downstream Kafka topic name with a colon (:). You can use regular expressions for table names. To map multiple tables to the same topic, separate the table names with a comma (,). For example: mydb.mytable1:topic1;mydb.mytable2:topic2.

    Note

    You can use this parameter to change the mapped topic while preserving the original table name information.

    • Sink table Debezium JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      debezium-json.include-schema.enabled

      No

      Boolean

      false

      Specifies whether to include schema information in the Debezium JSON data.

Usage examples

  • Use Kafka as a data ingestion source:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Use Kafka as a data ingestion target:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    Here, the route module is used to set the topic name for writing the source table to Kafka.

Note

ApsaraMQ for Kafka does not enable automatic topic creation by default. For more information, see Issues related to automatic topic creation. When you write to ApsaraMQ for Kafka, you must create the corresponding topic in advance. For more information, see Step 3: Create resources.

Table schema parsing and change synchronization strategy

  • Partition message pre-consumption and table schema initialization

    The Kafka connector maintains the schema of all currently known tables. Before reading Kafka data, the Kafka connector attempts to pre-consume up to scan.max.pre.fetch.records messages in each partition. It parses the schema of each data record and merges these schemas to initialize the table schema information. Then, before consuming the data, it generates corresponding table creation events based on the initialized schema.

    Note

    For Debezium JSON and Canal JSON formats, the table information is in the specific message. The scan.max.pre.fetch.records messages that are pre-consumed may contain data from several tables. Therefore, the number of pre-consumed data records for each table cannot be determined. Pre-consumption and table schema initialization are performed only once before the messages of each partition are consumed and processed. If new table data appears later, the table schema parsed from the first data record of that table is used as the initial table schema. The corresponding table schema is not re-consumed and initialized.

    Important

    Only VVR 8.0.11 and later support distributing data from a single table across multiple partitions. For this scenario, you must set the configuration item debezium-json.distributed-tables or canal-json.distributed-tables to true.

  • Table information

    • For Canal JSON and Debezium JSON formats, the table information, including the database and table name, is parsed from the specific message.

    • For JSON format, the table information includes only the table name, which is the name of the topic where the data is located.

  • Primary key information

    • For Canal JSON format, the primary key of the table is defined based on the pkNames field in the JSON.

    • For Debezium JSON and JSON formats, the JSON does not contain primary key information. You can manually add a primary key to the table using a transform rule:

      transform:
        - source-table: \.*.\.*
          projection: \*
          primary-keys: key1, key2
  • Schema parsing and schema evolution

    After the table schema is initialized, if schema.inference.strategy is set to static, the Kafka connector parses the value of each message based on the initial table schema and does not generate schema evolution events. If schema.inference.strategy is set to continuous, the Kafka connector parses the value of each Kafka message, extracts the physical columns of the message, and compares them with the currently maintained schema. If the parsed schema is inconsistent with the current schema, it attempts to merge the schemas and generates a table schema evolution event. The merging rules are as follows:

    • If the parsed physical columns contain fields that are not in the current schema, these fields are added to the schema, and an event to add nullable columns is generated.

    • If the parsed physical columns do not contain fields that are already in the current schema, the fields are retained, and their data is filled with NULL. No event to delete columns is generated.

    • If there are columns with the same name, they are handled as follows:

      • If the types are the same but the precision is different, the type with the higher precision is used, and a column type change event is generated.

      • If the types are different, the smallest parent node in the tree structure shown in the following figure is identified and used as the type for the column with the same name. A column type change event is also generated.

        image

  • The currently supported schema evolution strategies are as follows:

    • Add column: Adds the corresponding column to the end of the current schema and synchronizes the data of the new column. The new column is set to be nullable.

    • Delete column: Does not generate a delete column event. Instead, the data of that column is automatically filled with NULL values later.

    • Rename column: Is treated as adding and deleting a column. The renamed column is added to the end of the current schema, and the data of the column before renaming is filled with NULL values.

    • Column type change:

      • For downstream systems that support column type changes, after the downstream sink supports handling column type changes, the data ingestion job supports changing the type of ordinary columns, for example, from INT to BIGINT. Such changes depend on the column type change rules supported by the downstream sink. Different sink tables support different column type change rules. Refer to the documentation for the specific sink table to learn about its supported column type change rules.

      • For downstream systems that do not support column type changes, such as Hologres, you can use broad type mapping. This means that when the job starts, a table with broader types is created in the downstream system. When a column type change occurs, it is determined whether the downstream sink can accept the type change. This provides tolerant support for column type changes.

  • Currently unsupported schema changes:

    • Changes to constraints such as primary keys or indexes.

    • Changing from NOT NULL to NULLABLE.

  • Canal JSON schema parsing

    Canal JSON data may contain an optional sqlType field, which records the precise type information of the data columns. To obtain a more accurate schema, you can set canal-json.infer-schema.strategy to SQL_TYPE to use the types in sqlType. The type mapping relationships are as follows:

    JDBC Type

    Type Code

    CDC Type

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    -5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    Other types

Table name and topic mapping strategy

When you use Kafka as the target for a data ingestion job, you must configure the table name and topic mapping strategy carefully. This is because the Kafka message format (debezium-json or canal-json) also contains table name information, and downstream systems often use the table name in the data as the actual table name, not the topic name.

Suppose you need to synchronize two tables, mydb.mytable1 and mydb.mytable2, from MySQL. The possible configuration strategies are as follows:

1. Do not configure any mapping strategy

Without any mapping strategy, each table is written to a topic named "database_name.table_name". Therefore, data from mydb.mytable1 is written to a topic named mydb.mytable1, and data from mydb.mytable2 is written to a topic named mydb.mytable2. The following code provides a configuration example:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. Configure route rules for mapping (not recommended)

In many scenarios, users do not want the topic to be in the "database_name.table_name" format. They want to write data to a specified topic, so they configure route rules for mapping. The following code provides a configuration example:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable1

In this case, all data from mydb.mytable1 and mydb.mytable2 is written to the mytable1 topic.

However, when you use a route rule to change the topic name, it also changes the table name information in the Kafka message (debezium-json or canal-json format). In this case, all table names in the Kafka messages become mytable1, which may cause unexpected behavior when other systems consume Kafka messages from this topic.

3. Configure the sink.tableId-to-topic.mapping parameter for mapping (recommended)

To configure the mapping rule between table names and topics while you retain the source table name information, use the sink.tableId-to-topic.mapping parameter. The following code provides a configuration example:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

or

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

In this case, all data from mydb.mytable1 and mydb.mytable2 is written to the mytable1 topic. The table name information in the Kafka message (debezium-json or canal-json format) remains mydb.mytable1 or mydb.mytable2. When other systems consume Kafka messages from this topic, they can correctly retrieve the source table name information.

Precautions for EXACTLY_ONCE semantics

  • Configure consumer isolation level

    All applications that consume Kafka data must set isolation.level:

    • read_committed: Reads only committed data.

    • read_uncommitted (default): Can read uncommitted data.

    EXACTLY_ONCE depends on read_committed. Otherwise, consumers may see uncommitted data, which compromises consistency.

  • Transaction timeout and data loss

    When Flink recovers from a checkpoint, it relies only on transactions that were committed before that checkpoint began. If the time from job crash to restart exceeds the Kafka transaction timeout, Kafka automatically aborts the transaction, which causes data loss.

    • The default transaction.max.timeout.ms on the Kafka broker is 15 minutes.

    • The default transaction.timeout.ms for the Flink Kafka sink is 1 hour.

    • You must increase transaction.max.timeout.ms on the broker side to be greater than or equal to the Flink setting.

  • Producer pool and checkpoint concurrency

    EXACTLY_ONCE mode uses a fixed-size pool of Kafka producers. Each checkpoint occupies one producer from the pool. If the number of concurrent checkpoints exceeds the pool size, the job will fail.

    Adjust the producer pool size based on the maximum number of concurrent checkpoints.

  • Parallelism scale-in limitation

    If the job fails before the first checkpoint, the original producer pool information is not retained upon restart. Therefore, do not reduce the job's degree of parallelism before the first checkpoint is completed. If you must scale in, the degree of parallelism must be greater than or equal to FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

  • Transaction blocking reads

    In read_committed mode, any unfinished transaction (neither committed nor aborted) blocks reading from the entire topic.

    For example:

    • Transaction 1 writes data.

    • Transaction 2 writes and commits data.

    • As long as transaction 1 is not finished, the data from transaction 2 is not visible to consumers.

    Therefore:

    • During normal operation, the data visibility latency is approximately equal to the checkpoint interval.

    • When a job fails, the topic being written to will block consumers until the job restarts or the transaction times out. In extreme cases, a transaction timeout can even affect reading.

FAQ