All Products
Search
Document Center

Realtime Compute for Apache Flink:Kafka connector

Last Updated:Nov 22, 2024

This topic describes how to use the Kafka connector.

Background

Apache Kafka is an open source distributed message queue service. This service is widely used in big data fields, such as high-performance data processing, streaming analytics, and data integration. The Kafka connector supports high-performance data throughput, read and write operations on data in various formats, and exactly-once semantics for Realtime Compute for Apache Flink based on the Apache Kafka client.

Item

Description

Table type

Source table, sink table, data ingestion sink

Running mode

Streaming mode

Data format

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

Note
  • Protobuf is supported only for Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.9 or later

  • You can configure the parameter that is related to each data format in the WITH clause. For more information, see Formats.

Metric

  • Metrics for source tables

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Metrics for sink tables:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Metrics.

API type

SQL, DataStream API, and data ingestion YAML API

Data update or deletion in a sink table

Data in a sink table cannot be updated or deleted. Data can only be inserted into a sink table.

Note

If you want to update or delete data in a sink table, see Upsert Kafka connector.

Prerequisites

The prerequisites vary based on the type of the Kafka cluster to which Realtime Compute for Apache Flink needs to connect.

  • ApsaraMQ for Kafka cluster

    • An ApsaraMQ for Kafka cluster is created. For more information, see Step 3: Create resources.

    • The ApsaraMQ for Kafka cluster resides in the same VPC as your Realtime Compute for Apache Flink workspace and includes workspace's CIDR block in its whitelist. For more information, see Configure whitelists.

    Important

    Take note of the following items for writing data into ApsaraMQ for Kafka:

    • ApsaraMQ for Kafka does not support the data compression algorithm Zstandard for data writing.

    • ApsaraMQ for Kafka does not support idempotent or transactional write operations. Therefore, you cannot use the exactly-once semantics supported by ApsaraMQ for Kafka sink tables. If your Realtime Compute for Apache Flink uses VVR 8.0.0 or later, you must disable the idempotent write feature by configuring properties.enable.idempotence=false for the sink table. For more information about the comparison between storage engines of ApsaraMQ for Kafka and the limits on the storage engines, see Comparison between storage engines.

  • Self-managed Apache Kafka cluster

    • The version of the self-managed Apache Kafka cluster is 0.11 or later.

    • A network connection is established between Realtime Compute for Apache Flink and the self-managed Apache Kafka cluster. For more information about how to connect Realtime Compute for Apache Flink to a self-managed Apache Kafka cluster over the Internet, see FAQ about network connectivity

    • Only the client parameters of Apache Kafka 2.8 are supported. For more information about the configuration parameters of Kafka producers and consumers, see Consumer Configs and Producer Configs.

Limits

CREATE TABLE AS (CTAS)

  • Only Realtime Compute for Apache Flink whose compute engine is vvr-4.0.12-flink-1.13 or later allows you to use Kafka as a data source for the CTAS statement.

  • The CTAS statement can infer column data types only from a JSON source and synchronize schema changes of such a table.

  • The CTAS statement can infer the data types only for value fields from a Kafka source and synchronize schema changes to those fields. To synchronize data of key fields from a Kafka source, you must specify them in the DDL statement. For more information, see Example 3.

Troubleshoot network connectivity

If the error message Timed out waiting for a node assignment appears when you start a deployment, it indicates a network connectivity problem between your Realtime Compute for Apache Flink workspace and the Kafka cluster.

The process of establishing a connection between a Kafka client and a Kafka broker is as follows:

  1. The client innitiates a connection with a broker by using the IP address and port number specified by the properties.bootstrap.servers parameter. Then, the client returns the metadata of each broker, including the endpoint, based on the configuration.

  2. The client uses the acquired endpoint to connect to a broker to produce or consume data.

If the broker is misconfigured, the client will receive an incorrect endpoint. In this case, even if the initial connection can be correctly established via the IP address and port number specified in the properties.bootstrap.servers parameter, data cannot be read from or written to Kafka. In most cases, this issue occurs if a forwarding mechanism, such as a proxy, a port forwarding, or a leased line, is used to establish the connection between Realtime Compute for Apache Flink and Kafka.

You can perform the following steps to check whether the configuration of your Kafka cluster is valid:

  1. Use the ZooKeeper command line tool, zkCli.sh or zookeeper-shell.sh, to log on to the ZooKeeper service that is used by your Kafka cluster.

  2. Run a command based on the information about your Kafka cluster to obtain the metadata of your Kafka broker. In most cases, you can run the get /brokers/ids/0 command to obtain the metadata of Kafka brokers. As shown in the following figure, the endpoint of a Kafka broker returned by the Kafka server is specified in the endpoints field.example

  3. Run commands such as ping or telnet to test the connectivity to the returned endpoint. If the endpoint cannot be connected, contact the O&M engineers of Kafka to configure the listeners and advertised.listeners properties of the Kafka broker for Realtime Compute for Apache Flink.

Note

For more information about how the Kafka client is connected to a Kafka broker, see Troubleshoot Connectivity.

SQL

The Kafka connector can be used in SQL drafts to read data from and write data into Kafka topics.

Syntax

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Metadata columns

You can define metadata columns in a Kafka source table or a Kafka sink table to obtain the metadata of Kafka messages. For example, if multiple topics are defined in the WITH clause for a Kafka source table and a metadata column is defined in the source table, the topic from which Flink reads data is marked. The following sample code provides an example on how to use metadata columns:

CREATE TABLE kafka_source (
  -- Read the topic to which the message belongs as the value of the record_topic field.
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- Read the timestamp in ConsumerRecord as the value of the ts field.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- Read the offset of the message as the value of the record_offset field.
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- Write the timestamp in the ts field as the timestamp of ProducerRecord to Kafka.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

The following table describes the metadata columns supported by Kafka source tables and sink tables.

Key

Data type

Description

Source table or sink table

topic

STRING NOT NULL METADATA VIRTUAL

The name of the topic to which the Kafka message belongs.

Source table

partition

INT NOT NULL METADATA VIRTUAL

The ID of the partition to which the Kafka message belongs.

Source table

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Headers of the Kafka message.

Source table and sink table

leader-epoch

INT NOT NULL METADATA VIRTUAL

The leader epoch of the Kafka message.

Source table

offset

BIGINT NOT NULL METADATA VIRTUAL

The offset of the Kafka message.

Source table

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

The timestamp of the Kafka message.

Source table and sink table

timestamp-type

STRING NOT NULL METADATA VIRTUAL

The timestamp type of the Kafka message. Valid values:

  • NoTimestampType: indicates that no timestamp is defined in the message.

  • CreateTime: indicates the time when the message was generated.

  • LogAppendTime: indicates the time when the message was added to Kafka brokers.

Source table

Parameters in WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The table type.

    String

    Yes

    No default value

    Set the value to kafka.

    properties.bootstrap.servers

    The IP address and port number of a Kafka broker.

    String

    Yes

    No default value

    Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

    properties.*

    The parameters that are configured for the Kafka client.

    String

    No

    No default value

    The suffix of this parameter must comply with the rules that are defined in Producer Configs and Consumer Configs.

    Realtime Compute for Apache Flink removes the "properties." prefix and passes the transformed key and values to the Kafka client. For example, you can configure properties.allow.auto.create.topics=false to disable automatic topic creation.

    You cannot add the "properties." prefix to modify the configurations of the following parameters because the values of the parameters are overwritten after you use the Kafka connector:

    • key.deserializer

    • value.deserializer

    format

    The format of the Kafka message value.

    String

    No

    No default value

    Valid values:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    For more information about the format parameter, see Format Options.

    key.format

    The format of the Kafka message key.

    String

    No

    No default value

    Valid values:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    If you configure this parameter, you must configure the key.options parameter.

    key.fields

    The key fields in the source table or sink table that correspond to the key fields of Kafka messages.

    String

    No

    No default value

    Separate multiple field names with semicolons (;), such as field 1;field2.

    key.fields-prefix

    The custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields.

    String

    No

    No default value

    This parameter is used only to distinguish the column names of source tables and sink tables. The prefix is removed from the column names when the key fields of Kafka messages are parsed or generated.

    Note

    If you configure this parameter, you must set the value.fields-include parameter to EXCEPT_KEY.

    value.format

    The format of the Kafka message value.

    String

    No

    No default value

    The configuration of this parameter is equivalent to the configuration of the format parameter. The format parameter cannot be used together with the value.format parameter. If you configure both parameters, a conflict occurs.

    value.fields-include

    Specifies whether to include corresponding message keys when parsing or generating Kafka message velues.

    String

    No

    ALL

    Valid values:

    • ALL: All fields are processed as value fields of Kafka messages.

    • EXCEPT_KEY: All fields except for the fields specified by the key.fields parameter are processed as the Kafka message value.

  • Source table parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic from which you want to read data.

    String

    No

    No default value

    Separate multiple topic names with semicolons (;), such as topic-1;topic-2.

    Note

    The topic parameter cannot be used together with the topic-pattern parameter.

    topic-pattern

    The regular expression used to match topics. Data of all topics whose names match the specified regular expression is read when a deployment is running.

    String

    No

    No default value

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 3.0.0 or later supports this parameter.

    • The topic parameter cannot be used together with the topic-pattern parameter.

    properties.group.id

    The consumer group ID.

    String

    No

    KafkaSource-{Name of the source table}

    If the specified group ID is used for the first time, you must set the properties.auto.offset.reset parameter to "earliest" or "latest" to specify the initial start offset.

    scan.startup.mode

    The start offset from which data is read from Kafka.

    String

    No

    group-offsets

    Valid values:

    • earliest-offset: reads data from the earliest partition of Kafka.

    • latest-offset: reads data from the latest partition of Kafka.

    • group-offsets: reads data from the offset that is committed by the consumer group with the ID that is specified by the properties.group.id parameter. This is the default value.

    • timestamp: reads data from the timestamp that is specified by the scan.startup.timestamp-millis parameter.

    • specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets parameter.

    Note

    This parameter takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumes from the specified state, the deployment preferentially starts to read data at the progress that is saved in the state data.

    scan.startup.specific-offsets

    The start offset of each partition when the scan.startup.mode parameter is set to specific-offsets.

    String

    No

    No default value

    Example: partition:0,offset:42;partition:1,offset:300.

    scan.startup.timestamp-millis

    The timestamp of the start offset when the scan.startup.mode parameter is set to timestamp.

    Long

    No

    No default value

    Unit: milliseconds.

    scan.topic-partition-discovery.interval

    The time interval for dynamically detecting Kafka topics and partitions.

    Duration

    No

    5 minutes

    The default partition discovery interval is 5 minutes. To disable the dynamic partition discovery feature, you must explicitly set this parameter to a non-positive value. After the dynamic partition discovery feature is enabled, the Kafka source can automatically discover new partitions and read data from the partitions. In topic-pattern mode, the Kafka source reads data from new partitions of existing topics and data from all partitions of new topics that match the regular expression.

    Note

    For Realtime Compute for Apache Flink that uses VVR 6.0.X, the dynamic partition discovery feature is disabled by default. For Realtime Compute for Apache Flink that uses VVR 8.0 or later, this feature is enabled by default. The default partition discovery interval is 5 minutes.

    scan.header-filter

    Kafka data filtering based on whether the data contains a specific message header.

    String

    No

    No default value

    Separate a header key and the value with a colon (:). Separate multiple headers with logical operators such as AND (&) or OR (|). The logical operator NOT (!) is supported. For example, depart:toy|depart:book&!env:test indicates that the Kafka data whose header contains depart=toy or depart=book and does not contain env=test is retained.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    • Parenthesis operations are not supported.

    • Logical operations are performed from left to right in sequence.

    • The header value in the UTF-8 format is converted into a string and compared with the header value specified by the scan.header-filter parameter.

    scan.check.duplicated.group.id

    Specifies whether to check duplicate consumer groups specified by the properties.group.id parameter.

    Boolean

    No

    false

    Valid values:

    • true: Duplicate consumer groups are checked before a deployment starts. If duplicate consumer groups exist, an error is reported to prevent conflicts with existing consumer groups.

    • false: Duplicate consumer groups are not checked before a deployment starts.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.4 or later supports this parameter.

  • Sink table parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic to which data is written.

    String

    Yes

    No default value

    N/A

    sink.partitioner

    The mapping pattern between Realtime Compute for Apache Flink partitions and Kafka partitions.

    String

    No

    default

    Valid values:

    • default: The default Kafka partitioner is used to partition data.

    • fixed: Each Flink partition corresponds to a fixed Kafka partition.

    • round-robin: Data in a Flink partition is distributed to the Kafka partitions in round-robin sequence.

    • Custom partition mapping pattern: You can create a subclass of FlinkKafkaPartitioner to configure a custom partition mapping pattern, such as org.mycompany.MyPartitioner.

    sink.delivery-guarantee

    The delivery semantics for the Kafka sink table.

    String

    No

    at-least-once

    Valid values:

    • none: The delivery semantics is not ensured. Data may be lost or duplicated.

    • at-least-once: It ensures that no data is lost. However, duplicate data may exist.

    • exactly-once: Kafka transactions are used to ensure the exactly-once semantics. This ensures that data is not lost or duplicated.

    Note

    You must configure the sink.transactional-id-prefix parameter if you set this parameter to exactly-once.

    sink.transactional-id-prefix

    The prefix of the Kafka transaction ID that is used in the exactly-once semantics.

    String

    No

    No default value

    This parameter takes effect only when the sink.delivery-guarantee parameter is set to exactly-once.

    sink.parallelism

    The parallelism of operators for the Kafka sink table.

    Integer

    No

    No default value

    The parallelism of upstream operators, which is determined by the framework.

  • CTAS parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    json.infer-schema.flatten-nested-columns.enable

    Specifies whether to recursively expand nested columns in a JSON text.

    Boolean

    No

    false

    Valid values:

    • true: Nested columns are recursively expanded. Flink uses the path-based approach to name expanded columns. For example, in the JSON {"nested": {"col": true}}, the column col becomes nested.col after expansion.

    • false: Nested types are parsed as the STRING type.

    json.infer-schema.primitive-as-string

    Specifies whether to infer all basic types as String.

    Boolean

    No

    false

    Valid values:

    • true: All basic types are inferred as the String type.

    • false: Data types are inferred based on basic rules.

    You can add the properties. prefix to the names of all parameters that are supported by Kafka consumers and producers and configure the parameters in the WITH clause. For example, if you want to set the request.timeout.ms parameter of Kafka consumers or producers to 60000 (measured in milliseconds), you can configure 'properties.request.timeout.ms'='60000' in the WITH clause. For more information about the parameters supported by Kafka consumers and producers, see Apache Kafka official documentation.

Security and authentication

If your Kafka cluster requires secure connection or authentication, add the properties. prefix to the names of the parameters related to security and authentication and configure the parameters in the WITH clause. The following sample code provides an example on how to configure a Kafka table to use PLAIN as the Simple Authentication and Security Layer (SASL) mechanism and provide Java Authentication and Authorization Service (JAAS) configurations.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

The following sample code provides an example on how to configure a Kafka table to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*Configure Secure Sockets Layer (SSL).*/
  /*Specify the path of the CA certificate truststore provided by the server.*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*Specify the path of the private key file keystore if client authentication is required.*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*The algorithm used by the client to verify the server address. A null value indicates that server address verification is disabled.*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*Configure SASL.*/
  /*Configure SCRAM-SHA-256 as the SASL mechanism.*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*Configure JAAS.*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

You can click Upload Artifact on the Artifacts page in the development console of Realtime Compute for Apache Flink to upload the CA certificate file and private key file. The uploaded files are stored in the /flink/usrlib directory. For example, if the name of the CA certificate file that you want to use is my-truststore.jks, you can configure 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' in the WITH clause to use the certificate after you upload the certificate file.

Note
  • The preceding code snippets provide examples of the configurations that are used in most cases. Before you configure the Apache Kafka connector, contact Kafka server O&M personnel to obtain the correct security and authentication configuration information.

  • Unlike Apache Flink, the SQL editor of Realtime Compute for Apache Flink escapes double quotation marks (") by default. Therefore, you do not need to add backslashes (\) as escape characters to the double quotation marks (") that are used to enclose the username and password when you configure the properties.sasl.jaas.config parameter.

Start offset for a Kafka source table

Startup Mode

You can configure the scan.startup.mode parameter to specify the data reading startup mode for a Kafka source table.

  • earliest-offset: reads data from the earliest offset of the current partition.

  • latest-offset: reads data from the latest offset of the current partition.

  • group-offsets: reads data from the offset that is committed by the consumer group that has the ID specified by the properties.group.id parameter.

  • timestamp: reads data from the first message whose timestamp is greater than or equal to the timestamp that is specified by the scan.startup.timestamp-millis parameter.

  • specific-offsets: reads data from the partition offset that is specified by the scan.startup.specific-offsets parameter.

Note
  • If you do not specify the start offset, the Kafka source table starts to read data from the offset that is committed by the consumer group that has the specified ID.

  • The scan.startup.mode parameter takes effect only on the deployments that are started without states. If a deployment that uses the required states is started, data is read from the offset that is stored in a state backend.

Sample code:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- Consume data from the earliest offset.
  'scan.startup.mode' = 'earliest-offset',
  -- Consume data from the latest offset.
  'scan.startup.mode' = 'latest-offset',
  -- Consume data from the offset that is committed by the consumer group my-group.
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- If my-group is used for the first time, the consumption starts from the earliest offset.
  'properties.auto.offset.reset' = 'latest', -- If my-group is used for the first time, the consumption starts from the latest offset
  -- Consume data from the timestamp 1655395200000, in milliseconds.
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- Consume data from the specified offset.
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

Priority of start offsets

The source table consumes data based on the following priorities of start offsets in descending order:

  1. The offset that is stored in checkpoints or savepoints.

  2. The start time that you specified in the Realtime Compute for Apache Flink console.

  3. The start offset that is specified by the scan.startup.mode parameter in the WITH clause.

  4. group-offsets is used if the scan.startup.mode parameter is not configured.

If one of the preceding offsets becomes invalid due to expiration or an issue in the Kafka cluster, the reset strategy that is specified by the properties.auto.offset.reset parameter is used to reset the offset. If you do not configure this parameter, an exception occurs.

In most cases, the Kafka source table starts to read data from the offset that is committed by a consumer group that has a new group ID. When the Kafka source table queries the offset that is committed by the consumer group in the Kafka cluster, no valid offset is returned because the group ID is used for the first time. In this case, the reset strategy that is specified by the properties.auto.offset.reset parameter is used to reset the offset. Therefore, you must configure the properties.auto.offset.reset parameter in the scenario when the Kafka source table starts to read data from the offset that is committed by a consumer group that has a new group ID.

Kafka source offset commiting

The Kafka source table commits a consumer offset to the Kafka cluster only after the checkpointing operation is successful. If the checkpoint interval that you specify is excessively large, the consumer offset is committed with a delay to the Kafka cluster. During the checkpointing operation, the Kafka source table stores the current data reading progress in the state backend. The offset that is committed to the Kafka cluster is not used for fault recovery. The committed offset is used to only monitor the data reading progress in Kafka. Data accuracy is not affected even if the offset fails to be committed.

Custom partitioner for a sink table

If the built-in Kafka producer cannot meet your business requirements, you can use a custom Kafka partitioner to write data to the related partitions. A custom partitioner needs to inherit FlinkKafkaPartitioner. After you configure a custom partitioner, compile the JAR package for the partitioner and click Upload Artifact on the Artifacts page in the development console of Realtime Compute for Apache Flink to upload the package. After the package is uploaded and referenced, you need to configure the sink.partitioner parameter in the WITH clause. The value of this parameter must be a complete classpath of the partitioner, such as org.mycompany.MyPartitioner.

Comparison among Kafka, Upsert Kafka, and a Kafka JSON catalog

Kafka is a message queue system that allows for only data addition and does not support data updates or deletions. Therefore, Kafka cannot process Change Data Capture (CDC) data of upstream systems and the retraction logic of operators such as aggregate and join during streaming SQL computing. If you want to write data that contains change data or retraction data to Kafka, use an Upsert Kafka sink table that can perform special processing on change data.

If you want to synchronize change data from one or more data tables in an upstream database to Kafka in batches, you can use a Kafka JSON catalog. If data that is stored in Kafka is in the JSON format, you can use a Kafka JSON catalog. This way, you do not need to configure the schema and parameters in the WITH clause. For more information, see Manage Kafka JSON catalogs.

Data synchronization with CTAS

The CTAS statement can perform data synchronization from a Kafka source table in the JSON format. If specific fields do not exist in a predefined table schema during data synchronization, Flink attempts to automatically infer the data types of the columns. If the inferred data types do not meet your business requirements, you can perform auxiliary type inference to declare the data types of the columns.

Note

For more information about the JSON format, see JSON Format.

  • Type inference

    By default, Realtime Compute for Apache Flink displays only the first layer of data in the JSON text during type inference. Realtime Compute for Apache Flink infers the SQL data types based on the JSON data types, values, and the data type mappings. The following table lists the mappings between JSON data types and Flink SQL data types.

    JSON data type

    Flink SQL data type

    BOOLEAN

    BOOLEAN

    STRING

    DATE, TIMESTAMP, TIMESTAMP_LTZ, TIME, or STRING

    INT or LONG

    BIGINT

    BIGINT

    DECIMAL or STRING

    Note

    The precision of the DECIMAL type in Realtime Compute for Apache Flink is limited. If an integer exceeds the maximum allowed digits for DECIMAL, Realtime Compute for Apache Flink casts it to STRING to avoid precision loss.

    FLOAT, DOUBLE, or BIG DECIMAL

    DOUBLE

    ARRAY

    STRING

    OBJECT

    STRING

    Example

    • JSON text

      {
        "id": 101,
        "name": "VVP",
        "properties": {
          "owner": "Alibaba Cloud",
          "engine": "Flink"
        }
          "type": ["Big data"]
      }
    • The following table is written into the downstream system:

      id

      name

      properties

      type

      101

      VVP

      {
           "owner": "Alibaba Cloud",
           "engine": "Flink"
      }

      ["Big data"]

  • Auxiliary type inference

    If type inference based on the preceding rules does not meet your needs, you can explicitly declare the data type of a specific column in the source table's DDL statement. Realtime Compute for Apache Flink prioritize the declared type during parsing. In this example, Realtime Compute for Apache Flink parses the price field as DECIMAL instead of converting it to DOUBLE based on the data type mappings.

    CREATE TABLE evolvingKafkaSource (
      price DECIMAL(18, 2)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'topic' = 'evolving_kafka_demo',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    );

    However, if the data type that you specify in the DDL statement is different from the actual data type, you can use one of the following methods to handle the issue:

    • If a column's declared data type has a wider range than the actual data type, the field is parsed according to the declared data type. For example, if a column declared as the DOUBLE type is used to hold BIGINT data, the column's data type is parsed as DOUBLE.

    • If the actual data type has a wider range than or is incompatible with the declared data type, an error is returned because CTAS does not support type changes. To resolve this, you must restart your deployment and declare a correct data type.

      The following figure shows the ranges of data types and the compatibility between the data types.zongjie

      Note
      • In the preceding figure, data types closer to the root node have wider ranges. Data types on different branches are incompatible.

      • Auxiliary type inference is not supported for complex data types, such as ROW, ARRAY, MAP, and MULTISET.

      • By default, Realtime Compute for Apache Flink parses complex data types as STRING.

In most cases, the JSON text in Kafka topics has a nested structure. If you want to extract nested columns from JSON texts, you can use one of the following methods:

  • Declare 'json.infer-schema.flatten-nested-columns.enable'='true' in the source table's DDL statement. This way, all nested columns are expanded to the top level in sequence. To prevent column name conflicts, Realtime Compute for Apache Flink uses path-based approach to name expanded columns.

    Important

    Column name conflicts cannot be directly resolved. However, you can walk around this by setting 'json.ignore-parse-errors' to 'true' in the source table's DDL statement to ignore conflicts.

  • Add `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`) to the CTAS syntax in the DDL statement to add a computed column and specify the column to expand. For more information, see Example 4: Synchronize the table schema and data and perform calculation.

Examples

Example 1: Read data from a Kafka topic and write the data into another Kafka topic

The following code sample reads data from the source Kafka topic and then writes the data into the sink Kafka topic. The data is in the CSV format.

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Example 2: Synchronize table schema and data

Use the Kafka connector to synchronize messages from a Kafka topic to Hologres in real time. By configuring the offset and partition ID of Kafka messages as primary keys, you get no duplicate messages in Hologres in case of a failover.

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- Optional. Expand all nested columns. 
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Example 3: Synchronize the table schema and data in the key and value columns of Kafka messages

The key fields of Kafka messages store relevant information. You can synchronize data in the key and value columns of Kafka messages at the same time.

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Note

The key columns in a Kafka message do not support table schema changes and type inference. Manual declaration is required.

Example 4: Synchronize table schema and data and perform calculation

When you synchronize data from Kafka to Hologres, lightweight calculation is required.

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Use COALESCE to handle null values.

Datastream API

Important

If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.

  • Create a Kafka source

    The Kafka source provides a builder class for constructing the instance of KafkaSource. The following sample code shows how to create a Kafka source to consume messages from the earliest offset of the "input-topic" topic. The consumer group is my-group. The value of each message is deserialized as a string.

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    XML

    The Kafka DataStream connectors of different versions are stored in the Maven central repository.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    The following table describes the parameters that you must configure when you create a Kafka source.

    Parameter

    Description

    BootstrapServers

    The addresses of Kafka brokers. You can call the setBootstrapServers(String) operation to configure the addresses.

    GroupId

    The ID of the consumer group. You can call the setGroupId(String) method to configure the ID.

    Topics or Partition

    The topics or names of the partitions to which you subscribe. You can configure a Kafka source to subscribe to topics or partitions by using one of the following subscription patterns:

    • Topic list. After you configure a topic list, the Kafka source subscribes to all partitions of the specified topics.

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • Topic pattern. After you specify a regular expression, the Kafka source subscribes to all partitions of the topics that match the specified regular expression.

      KafkaSource.builder().setTopicPattern("topic.*")
    • Partition list. After you configure a partition list, the Kafka source subscribes to the specified partitions.

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
              new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    A deserializer that deserializes Kafka messages.

    You can call the setDeserializer(KafkaRecordDeserializationSchema) method to specify a deserializer. The KafkaRecordDeserializationSchema interface defines how a ConsumerRecord object is deserialized. You can use one of the following methods to deserialize only the Value fields in the Kafka messages of the ConsumerRecord object:

    • A Kafka source provides the setValueOnlyDeserializer(DeserializationSchema) method. The DeserializationSchema class defines how a Kafka message that is stored as a binary value is deserialized.

    • Use the classes that implement the Deserializer interface of Kafka. For example, you can use the StringDeserializer class to deserialize a message into a string.

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    Note

    If you want to deserialize a ConsumerRecord object, you must create a class that implements the KafkaRecordDeserializationSchema interface.

    When you use a Kafka DataStream connector, you must configure the following Kafka properties:

    • Start offset

      You can use an offset initializer to specify an offset for a Kafka source when the Kafka source starts to read data. An offset initializer is an object that is based on the OffsetsInitializer interface. The KafkaSource class provides the following built-in offset initializers.

      Offset initializer

      Code

      Specifies that the Kafka source starts to consume messages from the earliest record of each partition.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      Specifies that the Kafka source starts to consume messages from the latest record of each partition.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      Specifies that the Kafka source starts to consume messages from the first record of each partition. The first record has a timestamp that is greater than or equal to the specified timestamp. Unit: milliseconds.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      Specifies that the Kafka source starts to consume messages from the committed offset of each partition and a reset strategy is specified. If a partition does not have a committed offset, the reset strategy resets the offset and the Kafka source starts from the earliest record of the partition.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      Specifies that the Kafka source starts to consume messages from the committed offset of each partition and no reset strategy is specified.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      Note
      • If the built-in offset initializers do not meet your business requirements, you can create custom offset initializers.

      • If you do not specify an offset initializer, the OffsetsInitializer.earliest() offset initializer is used by default.

    • Streaming execution mode and batch execution mode

      A Kafka source can operate in streaming mode or batch mode. By default, a Kafka source operates in streaming mode. In this mode, the deployment continues to run until the deployment fails or is canceled. If you want a Kafka source to operate in batch mode, you can call the setBounded(OffsetsInitializer) method to specify a stop offset. When all partitions reach their stop offsets, the Kafka source exits.

      Note

      In most cases, a Kafka source that operates in streaming mode does not have a stop offset. If you want to debug a Kafka source that operates in streaming mode, you can call the setUnbounded(OffsetsInitializer) method to specify a stop offset. The methods that you can use to specify a stop offset vary based on whether you use the streaming mode or batch mode.

    • Dynamic partition discovery

      If you want a running deployment to process data from new topics and from new partitions that match your subscription pattern without the need to restart the deployment, you can enable the dynamic partition discovery feature on the Kafka source.

      Note

      By default, the dynamic partition discovery feature is enabled, with an interval of 5 minutes. To disable this feature, you must explicitly set this parameter to a non-positive value. The following sample code shows how to configure the partition.discovery.interval.ms parameter:

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // Discover new partitions per 10 seconds.
    • Event time and watermarks

      By default, a Kafka source uses the timestamp that is attached to a record as the event time for the record. You can define a watermark strategy based on the event time of each record and send the watermarks to downstream services.

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      For more information about how to define a watermark strategy, see Generating Watermarks.

    • Consumer offsets

      When a checkpoint is generated, a Kafka source commits the Kafka consumer offset of each partition to Kafka brokers. This way, the Kafka consumer offsets that are recorded on Kafka brokers are consistent with the state of the checkpoint. The Kafka consumer can automatically commit the offsets on each partition to Kafka brokers on a regular basis. You can configure the automatic offset commission feature by using the enable.auto.commit and auto.commit.interval.ms parameters. If you disable the checkpointing feature, a Kafka source relies on the Kafka consumer to commit the offsets to Kafka brokers.

      Note

      Kafka sources do not use the committed offsets that are recorded on Kafka brokers for fault tolerance. When you commit offsets, Kafka brokers can monitor the progress of record consumption on each partition.

    • Additional properties

      You can call the setProperties(Properties) and setProperty(String, String) methods to configure additional properties for the Kafka source and Kafka consumer. The following table describes the properties of a Kafka source.

      Property

      Description

      client.id.prefix

      Specifies the prefix for the client ID of the Kafka consumer.

      partition.discovery.interval.ms

      Specifies the time interval at which the Kafka source checks for new partitions.

      Note

      If the Kafka source operates in batch mode, the property is automatically set to -1.

      register.consumer.metrics

      Specifies whether to register metrics for the Kafka consumer in Realtime Compute for Apache Flink.

      Additional properties for a Kafka consumer

      For more information about the properties of a Kafka consumer, see Apache Kafka.

      Important

      The Kafka DataStream connector overwrites the values of the following properties:

      • key.deserializer: The value of this property is set to ByteArrayDeserializer.

      • value.deserializer: The value of this property is set to ByteArrayDeserializer.

      • auto.offset.reset.strategy: The value of this property is set to OffsetsInitializer#getAutoOffsetResetStrategy().

      The following sample code shows how the Kafka consumer connects to the Kafka cluster by using a JAAS configuration and the SASL/PLAIN authentication mechanism.

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • Monitoring

      Kafka sources register metrics in Realtime Compute for Apache Flink for monitoring and diagnosis.

      • Metric scope

        All metrics of a Kafka source are registered under the KafkaSourceReader metric group. KafkaSourceReader is a subgroup of the operator metric group. The metrics for a specific partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.

        For example, a topic is named my-topic and the partition of the topic is named 1. The consumer offset of the partition is reported by the <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset metric. The number of successful commits of consumer offsets is measured by the <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded metric.

      • Metrics

        Metric

        Description

        Scope

        currentOffset

        Reports the Kafka consumer offset of a partition.

        TopicPartition

        committedOffset

        Reports the committed consumer offset of a partition.

        TopicPartition

        commitsSucceeded

        Reports the number of successful commits of consumer offsets.

        KafkaSourceReader

        commitsFailed

        Reports the number of failed commits of consumer offsets.

        KafkaSourceReader

      • Metrics for a Kafka consumer

        The metrics for a Kafka consumer are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the records-consumed-total metric is registered at <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

        You can configure the register.consumer.metrics parameter to specify whether to register metrics for the Kafka consumer. By default, the register.consumer.metrics parameter is set to true. For more information about the metrics for a Kafka consumer, see Apache Kafka.

  • Create a Kafka producer

    A Kafka producer can write data from multiple streams to one or more Kafka topics.

    DataStream<String> stream = ...
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    
    FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
            "my-topic",                  // target topic
            new SimpleStringSchema(),    // serialization schema
            properties,                  // producer config
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
    
    stream.addSink(myProducer);

    The following table describes the parameters.

    Parameter

    Description

    Topic

    The name of the topic to which data is written.

    Serialization schema

    The serialization schema. The schema is specified by the SerializationSchema or KafkaSerializationSchema class.

    A producer converts Java or Scala objects into binary data and then writes the data to the required topic. You can use the KafkaSerializationSchema or SerializationSchema class to specify the schema that the Kafka producer uses during data serialization. A Kafka producer calls the ProducerRecord<byte[], byte[]> serialize(T element, Long timestamp) method for each incoming record to generate a ProducerRecord object that represents the serialized record. Then, the Kafka producer writes the ProducerRecord object to the required topic.

    The ProducerRecord class provides properties that you can configure to manage behavior when the Kafka producer writes a record to the required Kafka topic. You can configure the following properties of the ProducerRecord class:

    • The name of the topic to which the record is written.

    • The key for the record.

    • The name of the partition to which the record is written.

    Properties of the Kafka client

    The bootstrap.servers property is required. This property specifies the addresses of the Kafka brokers. Separate multiple addresses with a comma (,).

    Fault tolerance semantics

    After you enable the checkpointing feature, a Kafka producer can ensure exactly-once delivery. You can also configure the semantic parameter of a Kafka producer to specify one of the following fault tolerance semantics:

    • Semantic.NONE: The Kafka producer does not ensure the delivery of data. Data may be lost or duplicated.

    • Semantic.AT_LEAST_ONCE: By default, the Kafka producer ensures that data is not lost. However, data may be duplicated.

    • Semantic.EXACTLY_ONCE: The Kafka producer ensures that data is not lost or duplicated. The Kafka transaction mechanism is used to ensure exactly-once delivery.

      Note

      For more information about the exactly-once semantics, see Usage notes of Semantic.EXACTLY_ONCE.

Data ingestion

You can use the Kafka connector in a YAML draft to write data into a Kafka topic.

Syntax

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

Parameters

Parameter

Description

Required

Data type

Default value

Remarks

type

The type of the sink system.

Yes

String

No default value

Set the value to kafka.

name

The name of the sink.

No

String

No default value

N/A

properties.bootstrap.servers

The IP address and port number of a Kafka broker.

Yes

String

No default value

Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

properties.*

The parameters that are configured for the Kafka client.

No

String

No default value

The suffix of this parameter must comply with the rules that are defined in Producer Configs.

Flink removes the "properties." prefix and passes the transformed key and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable automatic topic creation.

key.format

The format of the Kafka message key.

No

String

No default value

Valid values:

  • csv

  • json

value.format

The format of the Kafka message value.

No

String

debezium-json

Valid values:

  • debezium-json 

  • canal-json

topic

The name of the Kafka topic.

No

String

No default value

If this parameter is specified, all data all be written into this topic.

Note

If this parameter is not specified, data is written into the topic whose name derives from TableID (a concatenation of a database name and table name with .), such as databaseName.tableName.

partition.strategy

The Kafka partitioning strategy.

No

String

all-to-zero

Valid values:

  • all-to-zero: Writes all data to partition 0.

  • hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures data with the same primary key is in the same partition and in order.

Example

Ingest data into Kafka:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

route:
  - source-table: ${mysql.source.table}
    sink-table: ${kafka.topic}

In the route section, specify the name of the target Kafka topic.

Usage notes of Semantic.EXACTLY_ONCE

  • When you use the transaction mechanism to write data into a Kafka topic, you must configure the isolation.level parameter for all Kafka consumers. The parameter has the following valid values:

    • read_committed: Only the committed data is read.

    • read_uncommitted (default): Data that is not committed can be read.

  • Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the restart duration after deployment failure takes longer than Kafka transaction timeout duration, data loss may happen when Kafka expires an uncommitted transaction. Therefore, we recommend that you configure a transaction timeout period appropriately to your expected downtime.

    The transaction timeout that you configure for a Kafka producer cannot exceed the Kafka broker's transaction timeout period, which is set to 15 minutes by default. FlinkKafkaProducer sets the transaction.timeout.ms parameter in producer configuration to 1 hour by default, thus you must extend the transaction timeout period for Kafka brokers before using the Semantic.EXACTLY_ONCE mode.

  • Semantic.EXACTLY_ONCE mode uses a fixed-size pool for each FlinkKafkaProducer instance. Each producer is used per one checkpoint. If the number of concurrent checkpoints exceeds the pool size, FlinkKafkaProducer will throw an exception and fails the entire deployment. You must configure a maximum pool size and the maximum number of concurrent checkpoints accordingly.

  • Semantic.EXACTLY_ONCE takes all possible measures to not leave any lingering transactions that would block the consumers from reading from a Kafka topic more than it is necessary. However, in the event of deployment failure before the first checkpoint, the information about previous pool sizes is lost after restarting the deployment. If you want to reduce the deployment parallelism before the first checkpoint completes, make sure it is greater than or equal to the value of FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

  • For a Kafka consumer for which the isolation.level is set to read_committed, any uncommitted transaction (neither terminated nor completed) will block all reads from the Kafka topic. Example:

    1. A user creates a transaction to write data to a topic.

    2. The user creates another transaction to write data to the topic.

    3. The user commits the second transaction.

    In this case, the Kafka consumer cannot read the data from the second transaction until the first transaction is committed or terminated. Therefore, take note of the following points:

    • Data writes into a Kafka topic experience delays, which are approximately equal to the average checkpointing interval.

    • In the event of deployment failure, the topics that are written by the deployment block Kafka consumers from reading data until the deployment is restarted or the transactions time out.

FAQ