All Products
Search
Document Center

DataWorks:Kafka data source

Last Updated:Sep 09, 2024

DataWorks provides Kafka Reader and Kafka Writer for you to read data from and write data to Kafka data sources. This topic describes the capabilities of synchronizing data from or to Kafka data sources.

Supported Kafka versions

ApsaraMQ for Kafka data sources and self-managed Kafka data sources are supported. However, the versions of self-managed Kafka data sources must range from 0.10.2 to 2.2.X.

Note

Self-managed Kafka data sources whose versions are earlier than 0.10.2 do not support the query of offsets of partition data and do not support timestamps. Data synchronization is not supported for such self-managed Kafka data sources.

Resource evaluation

Real-time data read

  • If you want to use a subscription serverless resource group, you must select a resource group with the specifications that meet your business requirements. This ensures that tasks can be run as expected.

    A Kafka topic is estimated to require one compute unit (CU). In addition, you also need to estimate the number of required CUs based on the read throughput.

    • If Kafka data is not compressed, one CU is required for a throughput of 10 MB/s.

    • If Kafka data is compressed, two CUs are required for a throughput of 10 MB/s.

    • If Kafka data is compressed and needs to be parsed into JSON-formatted data, three CUs are required for a throughput of 10 MB/s.

  • Take note of the following items when you use subscription serverless resource groups and old-version exclusive resource groups for Data Integration:

    • If your cluster has a high tolerance for failover, we recommend that the slot usage does not exceed 80%.

    • If your cluster has a low tolerance for failover, we recommend that the slot usage does not exceed 70%.

Note

The actual number of required CUs varies based on several factors, such as the data format. You can adjust the number of CUs based on your business requirements after the evaluation.

Limits

Kafka data sources support serverless resource groups (recommended) and old-version exclusive resource groups for Data Integration.

Batch data read of a single table

If you configure both parameter.groupId and parameter.kafkaConfig.group.id, parameter.groupId has a higher priority than parameter.kafkaConfig.group.id.

Real-time data write of a single table

Deduplication is not supported for data that you want to write to Kafka. If you reset the offset for your synchronization task or your synchronization task is restarted after a failover, duplicate data may be written to Kafka.

Real-time data write of a single database

  • You can use serverless resource groups (recommended) and old-version exclusive resource groups for Data Integration to run real-time synchronization tasks.

  • If a source table has a primary key, the values of the primary key are used as the keys in Kafka records during data synchronization. This ensures that changes of data with the same primary key value in the source table are written to the same partition in Kafka in an orderly manner.

  • If you select source tables that do not have a primary key for synchronization when you configure the destination, empty strings are used as the keys in Kafka records during data synchronization. To ensure that data changes in the source table can be sequentially written to Kafka, you must make sure that the Kafka topic to which the data changes are written contains only one partition. You can specify a custom primary key for a source table that does not have a primary key when you configure a destination table. In this case, a field or a combination of multiple fields in the source table are used as the primary key. The values of the primary key are used as the keys in Kafka records during data synchronization.

  • To ensure that changes of data that use the same primary key value in the source table are sequentially written to the same partition in Kafka when a response exception occurs on the Kafka data source, you must add the following configurations to extended parameters when you add the Kafka data source to DataWorks:

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    Important

    After you add the configurations to the extended parameters of the Kafka data source, data synchronization performance is significantly degraded. You must balance the performance and order of the data write operation.

  • For more information about the format of a Kafka message, format of a heartbeat message that is generated by a synchronization task, and format of a Kafka message for data changes in the source, see Appendix: Message formats.

Data type mappings

Kafka stores unstructured data. The data modules in a Kafka record include key, value, offset, timestamp, header, and partition. When you read data from or write data to Kafka, DataWorks processes data based on the following policies.

Batch data read

DataWorks allows you to parse data in the JSON format that you read from Kafka. The following table describes the policies for processing different data modules in a Kafka record.

Data module

Data type after processing

key

The data type depends on the keyType parameter that you configure for a synchronization task. For more information about the keyType parameter, see the Parameters in code for Kafka Reader section in this topic.

value

The data type depends on the valueType parameter that you configure for a synchronization task. For more information about the valueType parameter, see the Parameters in code for Kafka Reader section in this topic.

offset

Long.

timestamp

Long.

headers

String.

partition

Long.

Batch data write

DataWorks allows you to write data in the JSON or text format to Kafka. The policies for processing the data that you want to write to Kafka vary based on the synchronization task that you use to synchronize the data to Kafka. The following table describes the policies.

Important
  • When data in the text format is written to Kafka, field names are not written to Kafka, and delimiters are used to separate the values of a field.

  • When you use a real-time synchronization task to write data to Kafka, the data is written to Kafka in the JSON format, and the data contains the data about changes to databases, data timestamp, and DDL messages. For more information, see Appendix: Message formats.

Synchronization type

Format of data that is written to the value data module

Source field type

Data type after processing

Batch synchronization task

JSON

String

A UTF-8 encoded string

Boolean

A UTF-8 encoded string, which can be true or false

Time/Date

A UTF-8 encoded string in the yyyy-MM-dd HH:mm:ss format

Number

A UTF-8 encoded string

Byte stream

A UTF-8 encoded string

text

String

A UTF-8 encoded string

Boolean

A UTF-8 encoded string, which can be true or false

Time/Date

A UTF-8 encoded string in the yyyy-MM-dd HH:mm:ss format

Number

A UTF-8 encoded string

Byte stream

A UTF-8 encoded string

Real-time synchronization task

JSON

String

A UTF-8 encoded string

Boolean

Boolean type in JSON

Time/Date

  • For time accurate to a unit level more rough than millisecond: A timestamp in milliseconds, which is a 13-digit JSON-formatted integer

  • For time accurate to microsecond or nanosecond: A timestamp, which consists of a 13-digit integer in milliseconds and a 6-digit JSON-formatted decimal in nanoseconds

Number

Number type in JSON

Byte stream

A UTF-8 encoded string

text

String

A UTF-8 encoded string

Boolean

A UTF-8 encoded string, which can be true or false

Time/Date

A UTF-8 encoded string in the yyyy-MM-dd HH:mm:ss format

Number

A UTF-8 encoded string

Byte stream

A UTF-8 encoded string

Real-time synchronization task for synchronizing incremental data from a database to Kafka

JSON

String

A UTF-8 encoded string

Boolean

Boolean type in JSON

Time/Date

A 13-digit timestamp in milliseconds

Number

Number type in JSON

Byte stream

A UTF-8 encoded string

Synchronization task for synchronizing full data and incremental data to Kafka

JSON

String

A UTF-8 encoded string

Boolean

Boolean type in JSON

Time/Date

A 13-digit timestamp in milliseconds

Number

Number type in JSON

Byte stream

A UTF-8 encoded string

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.

Add a data source

Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.

Configure a batch synchronization task to synchronize data of a single table

Configure a real-time synchronization task to synchronize data of a single table or synchronize all data of a database

For more information about the configuration procedure, see Configure a real-time synchronization task in DataStudio.

Configure synchronization settings to implement real-time synchronization of full and incremental data in a single table or a database

For more information about the configuration procedure, see Configure a synchronization task in Data Integration.

Authentication configuration

SSL

If you set Authentication to SSL or SASL_SSL when you add a Kafka cluster to DataWorks as a data source, SSL-based authentication is enabled for the Kafka cluster. In this case, you must upload a client-side truststore file and specify a password for the truststore file.

  • If you want to add an ApsaraMQ for Kafka instance as a Kafka data source, you can refer to Update the SSL certificate algorithm to download a valid truststore file. The password of the file is KafkaOnsClient.

  • If you want to add an EMR Kafka cluster as a Kafka data source, you can refer to Use SSL to encrypt Kafka data to download a valid truststore file and obtain the password of the file.

  • If you want to add a self-managed Kafka cluster as a Kafka data source, you must upload a valid truststore file and specify the password of the file.

The Keystore certificate file, Keystore certificate password, and SSL-based key password need to be configured only when SSL-based mutual authentication is enabled. The information is used to authenticate the identity of a client on a Kafka broker. If ssl.client.auth=required is specified in the server.properties configuration file, SSL-based mutual authentication is enabled. For more information, see Use SSL to encrypt Kafka data.

GSSAPI

If you set Sasl Mechanism to GSSAPI(Kerberos) when you add a Kafka cluster to DataWorks as a data source, you must upload the following authentication files: JAAS configuration file, Kerberos configuration file, and Keytab file, and configure host and DNS settings for a specified exclusive resource group. The following content describes the authentication files and the methods that are used to configure host and DNS settings for a specified exclusive resource group.

  • JAAS configuration file

    The content of a JAAS configuration file must start with KafkaClient. Then, a pair of braces {} are used to enclose all configuration items.

    • The first line in braces {} defines the logon module class. The logon module class is fixed for different SASL authentication mechanisms. The other configuration items are all specified in the key=value format.

    • All configuration items except for the last configuration item cannot contain a semicolon (;) at the end.

    • The last configuration item must end with a semicolon (;). A semicolon (;) must also be added after the right brace }.

    If the content of a JAAS configuration file does not meet the preceding format requirements, the JAAS configuration file fails to be parsed. The following example provides a typical configuration format of a JAAS configuration file. Replace xxx in the following content based on your business requirements.

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="xxx"
       storeKey=true
       serviceName="kafka-server"
       principal="kafka-client@EXAMPLE.COM";
    };

    Configuration item

    Description

    Logon module class

    Configure com.sun.security.auth.module.Krb5LoginModule as the logon module class.

    useKeyTab

    Set the value to true.

    keyTab

    Specifies a path. If a synchronization task is running, the Keytab file that is uploaded when you add a Kafka data source is automatically downloaded to your on-premises machine. The path in which the Keytab file is stored is used as the value of this configuration item.

    storeKey

    Specifies whether to save the key at the client. You can set this configuration item to true or false, which does not affect data synchronization.

    serviceName

    This configuration item corresponds to the sasl.kerberos.service.name configuration item in the server.properties configuration file on a Kafka broker. Specify this configuration item based on your business requirements.

    principal

    The Kerberos principal used by the Kafka client. Specify this configuration item based on your business requirements. Make sure that the uploaded Keytab file contains the key of the principal.

  • Kerberos configuration file

    A Kerberos configuration file must contain the following modules: [libdefaults] and [realms].

    • The [libdefaults] module contains configuration items that are relevant to Kerberos authentication. Each configuration item is specified in the key=value format.

    • The [realms] module specifies a Key Distribution Center (KDC) address and can contain multiple realm submodules. Each realm submodule starts with a realm name that is followed by an equal sign (=).

    A pair of braces {} are used to enclose all configuration items for each realm submodule. Each configuration item is specified in the key=value format. The following example provides a typical configuration format of a Kerberos configuration file. Replace xxx in the following content based on your business requirements.

    [libdefaults]
      default_realm = xxx
    
    [realms]
      xxx = {
        kdc = xxx
      }

    Configuration item

    Description

    [libdefaults].default_realm

    The default realm that is used to access a Kafka cluster. In most cases, the default realm is the same as the realm of the client principal that is specified in a JAAS configuration file.

    [libdefaults]Other parameters

    The [libdefaults] module specifies other configuration items that are relevant to Kerberos authentication, such as ticket_lifetime. Specify the configuration items based on your business requirements.

    [realms].Realm name

    The value must be the same as the name of the realm of the client principal that is specified in a JAAS configuration file and the value of [libdefaults].default_realm. If the realm of the client principal that is specified in a JAAS configuration file and the value of [libdefaults].default_realm are different, two realm submodules must be specified to separately correspond to the realm of the client principal that is specified in a JAAS configuration file and the value of [libdefaults].default_realm.

    [realms].Realm name.kdc

    Specifies the IP address and port number of a KDC in the IP address:Port number format. Example: kdc=10.0.0.1:88. If you do not specify a port for a KDC, the port 88 is used by default. Example: kdc=10.0.0.1.

  • Keytab file

    A Keytab file must contain the key of the principal that is specified in a JAAS configuration file and must be able to pass the KDC verification. For example, a Keytab file named client.keytab exists in the current working directory on your on-premises machine. You can run the following command to check whether the Keytab file contains the key of a specified principal:

    klist -ket ./client.keytab
    
    Keytab name: FILE:client.keytab
    KVNO Timestamp           Principal
    ---- ------------------- ------------------------------------------------------
       7 2018-07-30T10:19:16 test@demo.com (des-cbc-md5)
  • Configuration of DNS and host settings for a specified exclusive resource group

    A Kafka cluster with Kerberos authentication enabled uses the hostname of a node in the Kafka cluster as a part of the principal registered by the node in the KDC. When you use a client to access the node in the Kafka cluster, the client can infer the principal of the node based on the DNS and host settings on your on-premises machine and then obtain the access credential of the node from the KDC. A KDC is a Kerberos broker. When you use an exclusive resource group to access a Kafka cluster with Kerberos authentication enabled, you must configure DNS and host settings to ensure that the client can obtain the access credential of the desired node in the Kafka cluster.

    • DNS settings

      In the virtual private cloud (VPC) in which a specified exclusive resource group is deployed, if you use PrivateZone to configure DNS settings for a node in a Kafka cluster, you can add the 100.100.2.136 and 100.100.2.138 IP addresses as custom routes on the VPC Binding tab of the specified exclusive resource group in the DataWorks console. This way, the DNS settings take effect for the exclusive resource group.

    • Host settings

      In the VPC in which a specified exclusive resource group is deployed, if you do not use PrivateZone to configure DNS settings for a node in a Kafka cluster, you must configure mappings between IP addresses of nodes in the Kafka cluster and hostnames on the Hostname-to-IP Mapping tab of the specified exclusive resource group on the Exclusive Resource Groups tab in the DataWorks console.

PLAIN

If you set Sasl Mechanism to PLAIN when you add a Kafka cluster to DataWorks as a data source, you must upload a JAAS configuration file. The content of the JAAS configuration file must start with KafkaClient. Then, a pair of braces {} are used to enclose all configuration items.

  • The first line in braces {} defines the logon module class. The logon module class is fixed for different SASL authentication mechanisms. The other configuration items are all specified in the key=value format.

  • All configuration items except for the last configuration item cannot contain a semicolon (;) at the end.

  • The last configuration item must end with a semicolon (;). A semicolon (;) must also be added after the right brace }.

If the content of a JAAS configuration file does not meet the preceding format requirements, the JAAS configuration file fails to be parsed. The following example provides a typical configuration format of a JAAS configuration file. Replace xxx in the following content based on your business requirements.

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxx"
  password="xxx";
};

Configuration item

Description

Logon module class

Configure org.apache.kafka.common.security.plain.PlainLoginModule as the logon module class.

username

The username. Specify this configuration item based on your business requirements.

password

The password. Specify this configuration item based on your business requirements.

FAQ

Appendix: Code and parameters

Appendix: Configure a batch synchronization task by using the code editor

If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.

Code for Kafka Reader

In the following JSON code, a synchronization task is configured to read data from a Kafka topic:

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent": 1,// The maximum number of parallel threads.
            "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
        }
    }
}

Parameters in code for Kafka Reader

Parameter

Description

Required

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes.

server

The address of a Kafka broker in your Kafka cluster. Specify the address in the following format: IP address:Port number.

You can specify one or more broker addresses in the server parameter. You must make sure that DataWorks can use the specified addresses to access the related brokers.

Yes.

topic

The name of the Kafka topic from which you want to read data. Kafka maintains feeds of messages in topics.

Yes.

column

The names of the columns from which you want to read data. Constant columns, data columns, and property columns are supported.

  • Constant column: a column whose name is enclosed in single quotation marks ('). Example: ["'abc'", "'123'"].

  • Data column:

    • If your data is in the JSON format, you can obtain JSON properties. Example: ["event_id"].

    • If your data is in the JSON format, you can obtain the properties of nested objects in the data. Example: ["tag.desc"].

  • Property column:

    • __key__: the key of a Kafka record.

    • __value__: the complete content of a Kafka record.

    • __partition__: the partition where a Kafka record resides.

    • __headers__: the header of a Kafka record.

    • __offset__: the offset of a Kafka record.

    • __timestamp__: the timestamp of a Kafka record.

    The following code provides a configuration example of the column parameter:

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]

Yes.

keyType

The data type of the key in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes.

valueType

The data type of the value in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes.

beginDateTime

The start time of data consumption. This parameter specifies the left boundary of a left-closed, right-open interval. Specify the time in the yyyymmddhhmmss format. This parameter can be used together with the scheduling parameters in DataWorks. For more information, see Supported formats of scheduling parameters.

Note

This parameter is supported by Kafka 0.10.2 and later.

You must configure either the beginDateTime or beginOffset parameter.

Note

The beginDateTime and endDateTime parameters must be used in pairs.

endDateTime

The end time of data consumption. This parameter specifies the right boundary of a left-closed, right-open interval. Specify the time in the yyyymmddhhmmss format. This parameter can be used together with the scheduling parameters in DataWorks. For more information, see Supported formats of scheduling parameters.

Note

This parameter is supported by Kafka 0.10.2 and later.

You must configure either the endDateTime or endOffset parameter.

Note

The beginDateTime and endDateTime parameters must be used in pairs.

beginOffset

The offset from which data consumption starts. The following formats are supported:

  • Numeric string: Data consumption starts from the specified offset, such as 15553274.

  • seekToBeginning: Data is consumed from the start offset.

  • seekToLast: Data is consumed from the offset of the consumer group that is specified by the group.id parameter in kafkaConfig. The offset of the consumer group is automatically committed to the Kafka server on a regular basis. If a synchronization task fails and is rerun, data may be duplicated or lost. In this case, if you set the skipExceedRecord parameter to true, some data records that were last read may be discarded. As a result, the synchronization task cannot read the discarded data in the next cycle because the offset of the consumer group for the discarded data is already committed to the Kafka server.

  • seekToEnd: Data is consumed from the end offset. In this case, null may be read.

You must configure either the beginOffset or beginDateTime parameter.

endOffset

The offset at which data consumption ends.

You must configure either the endOffset or endDateTime parameter.

skipExceedRecord

The Kafka consumer uses public ConsumerRecords<K, V> poll(final Duration timeout) to poll for data. However, the data obtained in a poll may be beyond the boundary that is specified by the endOffset or endDateTime parameter. The skipExceedRecord parameter specifies whether to poll for the excess data. Kafka Reader automatically commits offsets for data consumption. Therefore, we recommend that you configure the skipExceedRecord parameter based on the following instructions:

  • If the data source from which you want to read data is of a version earlier than Kafka 0.10.2, set the skipExceedRecord parameter to false.

  • If the data source from which you want to read data is of Kafka 0.10.2 or later, set the skipExceedRecord parameter to true.

No. Default value: false.

partition

If a Kafka topic contains multiple partitions, Kafka Reader reads data in a specific offset interval from the partitions. If you want Kafka Reader to read data from a specific partition, you can use the partition parameter to specify the partition.

No. This parameter does not have a default value.

kafkaConfig

The extended parameters that are specified when you create the Kafka consumer, such as bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms. You can configure the parameters in kafkaConfig to manage the data consumption of the Kafka consumer.

No.

encoding

If the keyType or valueType parameter is set to STRING, strings are parsed based on the value of the encoding parameter.

No. Default value: UTF-8.

waitTIme

The maximum duration during which the Kafka consumer waits to poll for data from Kafka each time. Unit: seconds.

No. Default value: 60.

stopWhenPollEmpty

You can set this parameter to true or false. If you set this parameter to true, the Kafka consumer may poll for null from Kafka because all data in the Kafka topic is polled or an error occurs on the network or Kafka cluster. If the Kafka consumer polls for null, the related synchronization task immediately stops. If you set this parameter to false and the Kafka consumer polls for null, the Kafka consumer attempts to poll for data until the data is obtained.

No. Default value: true.

stopWhenReachEndOffset

This parameter takes effect only when the stopWhenPollEmpty parameter is set to true. You can set this parameter to true or false.

  • If you set this parameter to true, and if null is returned from Kafka when the Kafka consumer polls for data, the system checks whether the data at the latest offset in all partitions of the Kafka topic is read. If the data at the latest offset in all partitions of the Kafka topic is read, the related synchronization task immediately stops. Otherwise, the Kafka consumer attempts to poll for data until the data is obtained.

  • If you set this parameter to false, and if null is returned form Kafka when the Kafka consumer polls for data, the system does not perform a check and the related synchronization task immediately stops.

No. Default value: false.

Note

Historical logic is compatible. The system cannot check whether the data at the latest offset in all partitions of the Kafka topic of a version earlier than V0.10.2 is read. However, some tasks that are configured by using the code editor online may read data of the Kafka topic of a version earlier than V0.10.2.

The following table describes the parameters in kafkaConfig.

Parameter

Description

fetch.min.bytes

The minimum number of bytes that the consumer can obtain from the broker. The broker returns data to the consumer only after the number of bytes reaches the specified value.

fetch.max.wait.ms

The maximum duration during which the consumer waits for data from the broker. Default value: 500. Unit: milliseconds. The broker returns data to the consumer when one of the conditions that are specified by the fetch.min.bytes and fetch.max.wait.ms parameters is met.

max.partition.fetch.bytes

The maximum number of bytes in each partition that the broker returns to the consumer. Default value: 1. Unit: MB.

session.timeout.ms

The timeout period of a connection session between the consumer and the broker. If this limit is exceeded, the consumer can no longer receive data from the broker. Default value: 30. Unit: seconds.

auto.offset.reset

The handling method that is used when no offset is available or the offset is invalid. This occurs if the consumer times out or the record with the specified offset expires and is deleted. The default value of this parameter is none, which indicates that the offset cannot be automatically reset. You can set this parameter to earliest, which indicates that the consumer reads partition data from the start offset.

max.poll.records

The number of Kafka records that can be returned for a single poll.

key.deserializer

The method that is used to deserialize the Kafka record key, such as org.apache.kafka.common.serialization.StringDeserializer.

value.deserializer

The method that is used to deserialize the Kafka record value, such as org.apache.kafka.common.serialization.StringDeserializer.

ssl.truststore.location

The path of the SSL root certificate.

ssl.truststore.password

The password of the truststore in the SSL root certificate. If you use ApsaraMQ for Kafka, set this parameter to KafkaOnsClient.

security.protocol

The access protocol. Set this parameter to SASL_SSL.

sasl.mechanism

The Simple Authentication and Security Layer (SASL) authentication mode. If you use ApsaraMQ for Kafka, set this parameter to PLAIN.

java.security.auth.login.config

The path of the SASL authentication file.

Code for Kafka Writer

In the following code, a synchronization task is configured to write data to Kafka:

{
"type":"job",
"version":"2.0",// The version number. 
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"Kafka",// The plug-in name. 
"parameter":{
"server": "ip:9092", // The address of a Kafka broker. 
"keyIndex": 0, // The sequence number of the column that is used as the key. You must use the lower camel case for the column name.
"valueIndex": 1, // The sequence number of the column that is used as the value. You can specify only one column. If you leave this parameter empty, all columns obtained from the reader are used as the value.
        // If you want to use the second, third, and fourth columns in a MaxCompute table as the value, cleanse and integrate the data in the table. Create a MaxCompute table, write the processed data to the new table, and then use the new table to synchronize data. 
"keyType": "Integer", // The data type of the key in the Kafka topic. 
"valueType": "Short", // The data type of the value in the Kafka topic. 
"topic": "t08", // The name of the Kafka topic to which you want to write data. 
"batchSize": 1024 // The number of data records to write at a time. Unit: bytes. 
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed. 
},
"speed":{
                     "throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
                     "concurrent":1, // The maximum number of parallel threads. 
                     "mbps":"12"// The maximum transmission rate. Unit: MB/s. 
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}

Parameters in code for Kafka Writer

Parameter

Description

Required

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

server

The address of a Kafka broker in your Kafka cluster. Specify the address in the format of IP address:Port number.

Yes

topic

The name of the Kafka topic to which you want to write data. Topics are categories in which Kafka maintains feeds of messages.

Each message that is published to a Kafka cluster is assigned to a topic. Each topic contains a group of messages.

Yes

valueIndex

The sequence number of the column that is obtained from a reader and used as the value in the Kafka topic. If you leave this parameter empty, all columns obtained from the reader are concatenated by using the delimiter specified by the fieldDelimiter parameter to form the value.

No

writeMode

The write mode. If you leave the valueIndex parameter empty, you can use the writeMode parameter to specify the format in which Kafka Writer concatenates all columns obtained from the reader. The default value is text. Valid values:

  • text: Kafka Writer uses the delimiter specified by the fieldDelimiter parameter to concatenate all columns obtained from the reader.

  • JSON: Kafka Writer concatenates all columns obtained from the reader into a JSON string based on the column names specified by the column parameter.

For example, three columns are obtained from the reader, and the values in the three columns are a, b, and c. If you set the writeMode parameter to text and the fieldDelimiter parameter to a number sign (#), Kafka Writer concatenates the columns into the string a#b#c and writes this string to Kafka. If you set the writeMode parameter to JSON and the column parameter to [{"name":"col1"},{"name":"col2"},{"name":"col3"}], Kafka Writer concatenates the columns into the JSON string {"col1":"a","col2":"b","col3":"c"} and writes this JSON string to Kafka.

If you configure the valueIndex parameter, the writeMode parameter is invalid.

No

column

The names of the columns to which you want to write data. Separate the names with commas (,), such as "column": ["id", "name", "age"].

If you leave the valueIndex parameter empty and set the writeMode parameter to JSON, the column parameter determines the names of the columns in the JSON string into which the columns obtained from the reader are concatenated, such as "column": [{"name":id"}, {"name":"name"}, {"name":"age"}].

  • If the number of columns that are obtained from the reader is greater than the number of columns that are specified in the column parameter, Kafka Writer truncates the columns that are obtained from the reader. Example:

    Three columns are obtained from the reader, and the values in the columns are a, b, and c. If the column parameter is set to [{"name":"col1"},{"name":"col2"}], Kafka Writer concatenates the columns into the JSON string {"col1":"a","col2":"b"} and writes this JSON string to Kafka.

  • If the number of columns that are obtained from the reader is less than the number of columns that are specified in the column parameter, Kafka Writer sets the values of the excess columns in Kafka to null or the string that is specified by the nullValueFormat parameter. Example:

    Two columns are obtained from the reader, and the values in the columns are a and b. If the column parameter is set to [{"name":"col1"},{"name":"col2"},{"name":"col3"}], Kafka Writer concatenates the columns into the JSON string {"col1":"a","col2":"b","col3":null} and writes this JSON string to Kafka. If you configure the valueIndex parameter or set the writeMode parameter to text, the column parameter is invalid.

If you configure the valueIndex parameter or set the writeMode parameter to text, the column parameter is invalid.

Required if valueIndex is not configured and writeMode is set to JSON

partition

The ID of the partition to which you want to write data. The value of this parameter must be an integer that is greater than or equal to 0.

No

keyIndex

The sequence number of the column that is obtained from the reader and used as the key in the Kafka topic.

The value of this parameter must be an integer that is greater than or equal to 0. If you set this parameter to an integer less than 0, an error occurs.

No

keyIndexes

The sequence numbers of the columns that are obtained from the reader and used as the key in the Kafka topic.

The sequence numbers must start from 0 and be separated by commas (,), such as [0,1,2]. If you leave this parameter empty, the key in the Kafka topic is null, and data is written to each partition in the Kafka topic in turn. You can configure either the keyIndex or keyIndexes parameter.

No

fieldDelimiter

The column delimiter. If you set the writeMode parameter to text and leave the valueIndex parameter empty, Kafka Writer uses the column delimiter you specify for the fieldDelimiter parameter to concatenate all columns that are obtained from the reader to form the value. You can use a single character or multiple characters as the column delimiter. The characters can be Unicode characters (such as \u0001) or escape characters (such as \t or \n). Default value: \t.

If the writeMode parameter is not set to text or the valueIndex parameter is configured, the fieldDelimiter parameter is invalid.

No

keyType

The data type of the key in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes

valueType

The data type of the value in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes

nullKeyFormat

If the column specified in the keyIndex or keyIndexes parameter contains the value null, Kafka Writer replaces null with the value of the nullKeyFormat parameter. If you leave the nullKeyFormat parameter empty, Kafka Writer retains the value null.

No

nullValueFormat

If a column obtained from the reader contains the value null, Kafka Writer replaces null with the value of the nullValueFormat parameter. If you leave the nullValueFormat parameter empty, Kafka Writer retains the value null.

No

acks

The acknowledgment configuration used when the Kafka producer is initialized. This parameter specifies the method used to confirm that data is written to Kafka. Default value: all. Valid values:

  • 0: A Kafka producer does not acknowledge whether data is written to the destination.

  • 1: A Kafka producer acknowledges that the write operation is successful if data is written to the primary replica.

  • all: A Kafka producer acknowledges that the write operation is successful if data is written to all replicas.

No

Appendix: Configure the formats of messages written to Kafka

After you configure a real-time synchronization task, you can run the task to read all the existing data in the source and write the data to the destination Kafka topics in the JSON format. You can also run the task to write incremental data to Kafka in real time. In addition, incremental DDL-based data changes in the source are also written to Kafka in the JSON format in real time. For more information about the formats of messages written to Kafka, see Appendix: Message formats.

Note

If you run a batch synchronization task to synchronize data to Kafka, the payload.sequenceId, payload.timestamp.eventTIme, and payload.timestamp.checkpointTime fields are set to -1 in the messages written to Kafka. The messages are in the JSON format.