DataHub is fully compatible with the Kafka protocol. You can use a native Kafka client to read data from and write data to DataHub.
Background information
Mapping from Kafka to DataHub
Topic type
The topic expansion mode in Kafka is different from that in DataHub. To adapt to the topic expansion mode in Kafka, you need to set the ExpandMode parameter to ONLY_EXTEND when you create a topic in DataHub. A topic whose ExpandMode parameter is set to ONLY_EXTEND does not support separate or merge operations. You can add but cannot remove shards.
Topic naming
A topic name in Kafka maps a project name and a topic name in DataHub, which are separated with a period (.). For example, a topic named test_project.test_topic in Kafka maps a topic named test_topic in a project named test_project in DataHub. If a topic name in Kafka contains multiple periods (.), the part before the first period (.) is the project name in DataHub and the remaining part is the topic name. The other periods (.) and hyphens (-) are replaced by underscores (_).
Partition
Each active shard in DataHub corresponds to a partition in Kafka. If the number of active shards in DataHub is five, it can be considered that Kafka contains five partitions. When you write data, you can specify a partition based on the partition IDs [0,4]. If you do not specify a partition, the Kafka client determines the partition to which data is to be written.
TUPLE topic
When you write a key-value pair from Kafka to a TUPLE topic in DataHub, the schema of the TUPLE topic must contain one or two fields of the STRING type. Otherwise, data fails to be written. If the schema contains only one field, only the value of the key-value pair is written and the key is discarded. If the schema contains two fields, the value is written to one field and the key is written to the other field. If you write binary data to a TUPLE topic, the data is displayed in garbled text in the topic. We recommend that you write binary data to a BLOB topic.
BLOB topic
When you write a key-value pair from Kafka to a BLOB topic in DataHub, the value is written to the BLOB topic. If the key is not NULL, the key is written to DataHub as an attribute. The key of the attribute is __kafka_key__ and the value is the key of the Kafka data.
Header
A header in Kafka corresponds to an attribute in DataHub. However, a header whose value is NULL is ignored in Kafka. We recommend that you do not use __kafka_key__ as the key of a header.
Consumer group
In DataHub, a consumer group maps a subscription ID and can subscribe to only one topic at a time. However, a Kafka group can subscribe to multiple topics at a time. To be better compatible with the subscription method in Kafka, DataHub allows you to create a group in a project and bind the group with the topics you want to subscribe to, and then use the group to subscribe to multiple topics in the project. In DataHub, a group is a set of subscriptions managed by the server. If a group is bound with a topic, you can view the subscription automatically created by the group on the Subscription List tab of the topic details page. If you delete the subscription, the group cannot subscribe to the topic and existing consumption offsets disappear.
A group can subscribe to a maximum of 50 topics. If you need to subscribe to more topics for a group, submit a ticket.
Kafka configuration parameters
C = Consumer, P = Producer, S = Streams
Parameter | C/P/S | Valid value | Required | Description |
bootstrap.servers | * | For more information, see the "Kafka endpoints" section of this topic. | Yes | |
security.protocol | * | SASL_SSL | Yes | To ensure secure data transmission, Secure Sockets Layer (SSL) is used for encryption when data is written from a Kafka client to DataHub. |
sasl.mechanism | * | PLAIN | Yes | The AccessKey authentication mode. Set this parameter to PLAIN. |
compression.type | P | LZ4 | No | Specifies whether to enable compressed transmission. Only the LZ4 compression algorithm is supported. |
group.id | C | project.topic:subId or project.group | Yes | The ID of the consumer group. Set this parameter based on the subscribed topic if you use project.topic:subId. Otherwise, data cannot be read. We recommend that you use project.group. |
partition.assignment.strategy | C | org.apache.kafka.clients.consumer.RangeAssignor | No | The policy for partition assignment. The default policy for partition assignment in Kafka is RangeAssignor, which is also the only policy supported by DataHub. Do not modify this parameter. |
session.timeout.ms | C/S | [60000, 180000] | No | The timeout period of sessions. The default timeout period of sessions in Kafka is 10,000 milliseconds. However, the minimum timeout period of sessions in DataHub is 60,000 milliseconds. Therefore, the default value of this parameter is 60000. |
heartbeat.interval.ms | C/S | We recommend that you set this parameter to two-thirds of the specified timeout period of sessions. | No | The heartbeat interval. The default heartbeat interval in Kafka is 3,000 milliseconds. Because the default value of the |
application.id | S | project.topic:subId or project.group | Yes | The application ID. Set this parameter based on the subscribed topic if you use project.topic:subId. Otherwise, data reads fail. We recommend that you use project.group. |
The preceding table describes the parameters that need special attention when you write data from a Kafka client to DataHub. Client-related parameters such as retries,batch.size
are not affected. Server-related parameters do not affect the behavior of the server. For example, no matter what the value of the acks
parameter is, DataHub returns the value after the data is written.
Kafka endpoints
Region | Region ID | Public endpoint | ECS endpoint on the classic network | ECS endpoint in the VPC |
China (Hangzhou) | cn-hangzhou | dh-cn-hangzhou.aliyuncs.com:9092 | dh-cn-hangzhou.aliyun-inc.com:9093 | dh-cn-hangzhou-int-vpc.aliyuncs.com:9094 |
China (Shanghai) | cn-shanghai | dh-cn-shanghai.aliyuncs.com:9092 | dh-cn-shanghai.aliyun-inc.com:9093 | dh-cn-shanghai-int-vpc.aliyuncs.com:9094 |
China (Beijing) | cn-beijing | dh-cn-beijing.aliyuncs.com:9092 | dh-cn-beijing.aliyun-inc.com:9093 | dh-cn-beijing-int-vpc.aliyuncs.com:9094 |
China (Shenzhen) | cn-shenzhen | dh-cn-shenzhen.aliyuncs.com:9092 | dh-cn-shenzhen.aliyun-inc.com:9093 | dh-cn-shenzhen-int-vpc.aliyuncs.com:9094 |
China (Zhangjiakou) | cn-zhangjiakou | dh-cn-zhangjiakou.aliyuncs.com:9092 | dh-cn-zhangjiakou.aliyun-inc.com:9093 | dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094 |
Singapore | ap-southeast-1 | dh-ap-southeast-1.aliyuncs.com:9092 | dh-ap-southeast-1.aliyun-inc.com:9093 | dh-ap-southeast-1-int-vpc.aliyuncs.com:9094 |
Malaysia (Kuala Lumpur) | ap-southeast-3 | dh-ap-southeast-3.aliyuncs.com:9092 | dh-ap-southeast-3.aliyun-inc.com:9093 | dh-ap-southeast-3-int-vpc.aliyuncs.com:9094 |
India (Mumbai) (closing down) | ap-south-1 | dh-ap-south-1.aliyuncs.com:9092 | dh-ap-south-1.aliyun-inc.com:9093 | dh-ap-south-1-int-vpc.aliyuncs.com:9094 |
Germany (Frankfurt) | eu-central-1 | dh-eu-central-1.aliyuncs.com:9092 | dh-eu-central-1.aliyun-inc.com:9093 | dh-eu-central-1-int-vpc.aliyuncs.com:9094 |
China East 2 Finance | cn-shanghai-finance-1 | dh-cn-shanghai-finance-1.aliyuncs.com:9092 | dh-cn-shanghai-finance-1.aliyun-inc.com:9093 | dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094 |
China (Hong Kong) | cn-hongkong | dh-cn-hongkong.aliyuncs.com:9092 | dh-cn-hongkong.aliyun-inc.com:9093 | dh-cn-hongkong-int-vpc.aliyuncs.com:9094 |
Examples
Create a topic by using code
Note: You cannot create a topic by calling an API operation of Kafka. To create a topic, you must call DataHub SDK. When you create a topic, you need to set the ExpandMode parameter to ONLY_EXTEND. The version of a Maven dependency must be V2.19.0 or later.
You must configure the AccessKey ID and the AccessKey Secret in your project. We recommend that you set the following environment variables to configure the AccessKey ID and the AccessKey Secret in the configuration file:
datahub.endpoint=<yourEndpoint>
datahub.accessId=<yourAccessKeyId>
datahub.accessKey=<yourAccessKeySecret>
The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using the AccessKey pair to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M.
We recommend that you do not hard-code the AccessKey ID and AccessKey secret into your project code. Otherwise, the AccessKey pair may be leaked and the security of all the resources within your account is compromised.
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.19.0-public</version>
</dependency>
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateTopic {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
int shardCount = 1;
int lifeCycle = 7;
try {
datahubClient.createTopic("test_project", "test_topic", shardCount, lifeCycle, RecordType.BLOB, "comment", ExpandMode.ONLY_EXTEND);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}
Create a group by using code
The version of a Maven dependency must be V2.21.6 or later.
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.21.6-public</version>
</dependency>
@Value("${datahub.endpoint}")
String endpoint ;
@Value("${datahub.accessId}")
String accessId;
@Value("${datahub.accessKey}")
String accessKey;
public class CreateGroup {
public static void main(String[] args) {
DatahubClient datahubClient = DatahubClientBuilder.newBuilder()
.setDatahubConfig(
new DatahubConfig(endpoint,
new AliyunAccount(accessId, accessKey)))
.build();
List<String> topicList = new ArrayList<>();
topicList.add("test_project.topic1");
topicList.add("test_project.topic2");
topicList.add("test_project.topic3");
try {
// Create a Kafka group.
datahubClient.createKafkaGroup("test_project", "test_topic", "test comment");
// Bind the desired topic for subscription to the group.
datahubClient.updateTopicsForKafkaGroup("test_project", "test_topic", topicList, UpdateKafkaGroupMode.ADD);
} catch (DatahubClientException e) {
e.printStackTrace();
}
}
}
Sample producer:
Generate the kafka_client_producer_jaas.conf file
Create the kafka_client_producer_jaas.conf file and save it to a directory. The file contains the following content:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
Maven dependency
The version of the Kafka client must be V0.10.0.0 or later. The recommended version is V2.4.0.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
Sample code
public class ProducerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "lz4");
String KafkaTopicName = "test_project.test_topic";
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
List<Header> headers = new ArrayList<>();
RecordHeader header1 = new RecordHeader("key1", "value1".getBytes());
RecordHeader header2 = new RecordHeader("key2", "value2".getBytes());
headers.add(header1);
headers.add(header2);
ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key", "Hello DataHub!", headers);
// sync send
producer.send(record).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
Execution results
After the execution is complete, sample data to check whether DataHub runs properly.
Sample consumer
For information about how to generate the kafka_client_producer_jaas.conf file and the maven dependency, see the relevant information in the "Sample producer" section of this topic.
After you add a consumer, wait about 10 seconds for shard allocation to be complete. Then, the consumer can consume data.
Sample code
(Recommended) Example of using a Kafka group
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class ConsumerExample2 {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// Set the group.id parameter to project.group.
properties.put("group.id", "test_project.test_kafka_group");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = new ArrayList<>();
topicList.add("test_project.test_topic1");
topicList.add("test_project.test_topic2");
topicList.add("test_project.test_topic3");
// Multiple topics can be subscribed to at a time if you use a Kafka group.
kafkaConsumer.subscribe(topicList);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}
Example of using project.topic.subid
package com.aliyun.datahub.kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
// Set the group.id parameter to project.topic.subId.
properties.put("group.id", "test_project.test_topic:1611039998153N71KM");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("ssl.endpoint.identification.algorithm", "");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
// Only a single topic can be subscribed to if you use project.topic.subId.
kafkaConsumer.subscribe(Collections.singletonList("test_project.test_topic"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
}
}
Execution results
After the execution is complete, you can view the read data on the consumer client.
ConsumerRecord(topic = test_project.test_topic, partition = 0, leaderEpoch = 0, offset = 0, LogAppendTime = 1611040892661, serialized key size = 3, serialized value size = 14, headers = RecordHeaders(headers = [RecordHeader(key = key1, value = [118, 97, 108, 117, 101, 49]), RecordHeader(key = key2, value = [118, 97, 108, 117, 101, 50])], isReadOnly = false), key = key, value = Hello DataHub!)
Note: All data returned for a data read request shares the same value of the LogAppendTime parameter, which is the greatest value of the timestamps of the data.
Sample Streams task
Maven dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
Sample code
The following sample code reads the input data in test_project, converts the key and value into lowercase letters, and then writes them to the output data.
public class StreamExample {
static {
System.setProperty("java.security.auth.login.config", "src/main/resources/kafka_client_producer_jaas.conf");
}
public static void main(final String[] args) {
final String input = "test_project.input";
final String output = "test_project.output";
final Properties properties = new Properties();
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("application.id", "test_project.input:1611293595417QH0WL");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("session.timeout.ms", "60000");
properties.put("heartbeat.interval.ms", "40000");
properties.put("auto.offset.reset", "earliest");
final StreamsBuilder builder = new StreamsBuilder();
TestMapper testMapper = new TestMapper();
builder.stream(input, Consumed.with(Serdes.String(), Serdes.String()))
.map(testMapper)
.to(output, Produced.with(Serdes.String(), Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), properties);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
static class TestMapper implements KeyValueMapper<String, String, KeyValue<String, String>> {
@Override
public KeyValue<String, String> apply(String s, String s2) {
return new KeyValue<>(StringUtils.lowerCase(s), StringUtils.lowerCase(s2));
}
}
}
Execution results
After you start a Streams task, wait about 1 minute for the shard allocation to be complete. Then, you can view the number of tasks in the DataHub console. The number of tasks is consistent with the number of shards in the input topic. In this example, the input topic contains three shards.
currently assigned active tasks: [0_0, 0_1, 0_2]
currently assigned standby tasks: []
revoked active tasks: []
revoked standby tasks: []
After the shard allocation is complete, you can write the following test data to the input topic: (AAAA,BBBB), (CCCC,DDDD), and (EEEE,FFFF). Then, sample the output data to check whether the data write is valid.
Usage notes
Transactions and idempotence are not supported.
A Kafka client cannot automatically create a DataHub topic. Before you write data from a Kafka client to DataHub, make sure that a DataHub topic is created.
Each consumer can subscribe to only one topic.
The timestamp of the data read by a consumer is the value of the LogAppendTime parameter, which indicates the time when the data was written to DataHub. All data returned for a data read request shares the same timestamp, which is the greatest value of the timestamps of the data. Therefore, when you read data, the obtained timestamp may be greater than the actual timestamp when the data was written to DataHub.
Each Streams task supports only one input topic and multiple output topics.
Streams tasks are stateless.
Supported Kafka versions are from V0.10.0 to V2.4.0.
FAQ
1. Why a connection is disconnected during a data write?
Selector - [Producer clientId=producer-1] Connection with dh-cn-shenzhen.aliyuncs.com disconnected
java.io.EOFException
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
...
In Kafka, a meta request and a data write request do not use the same connection. When a meta request is sent for the first time, a connection is established. When a data write request is sent, a connection to the broker returned for the meta request is established. After this, all subsequent requests are sent over the second connection and the first connection becomes idle. If the connection remains idle beyond a specific time limit, the server automatically closes the connection. Therefore, you can ignore this error if it does not affect data writes.
What should I do if my Kafka client fails to be started?
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names matching IP address 100.67.134.161 found
If your Kafka client fails to be started, add the following code: properties.put("ssl.endpoint.identification.algorithm", "");
.
Why does a DisconnectException error appear during data consumption on a consumer client?
[INFO][Consumer clientId=client-id, groupId=consumer-project.topic:subid] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectException
The Kafka client must maintain a TCP-based persistent connection to the server. In most cases, the DisconnectException error is caused by network jitters. This error does not affect data consumption on the client because retry logic is configured on the client.