Kafka Reader uses a Kafka SDK to read data from Kafka in real time.
Background information
Kafka Reader can read data from ApsaraMQ for Kafka data sources and self-managed Kafka data sources. However, the versions of self-managed Kafka data sources must range from 0.10.2 to 2.2.x.
Self-managed Kafka data sources whose versions are earlier than 0.10.2 do not support the query of offsets of partition data and do not support timestamps. If you use such a Kafka data source in a data synchronization node, the latency data that is displayed in Operation Center for the data synchronization node may be incorrect, and the offset from which incremental data starts to be synchronized cannot be reset.
For more information about how to add a Kafka data source, see Add a Kafka data source.
Procedure
Go to the DataStudio page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
In the Scheduled Workflow pane of the DataStudio page, move the pointer over the icon and choose .
Alternatively, find the desired workflow in the Scheduled Workflow pane, right-click the workflow name, and then choose
.In the Create Node dialog box, set the Sync Method parameter to End-to-end ETL, enter a name in the Name field, and configure the Path parameter.
ImportantThe node name cannot exceed 128 characters in length and can contain only letters, digits, underscores (_), and periods (.).
Click Confirm.
On the configuration tab of the real-time synchronization node, drag Kafka in the Input section to the canvas on the right.
Click the Kafka node. In the panel that appears, configure the parameters.
Parameter
Description
Data source
The name of the Kafka data source that you added to DataWorks. You can select only a Kafka data source. If no data source is available, click New data source on the right to go to the Data Sources page in Management Center to add a Kafka data source. For more information, see Add a Kafka data source.
Topic
The name of the Kafka topic from which you want to read data. Topics are categories in which Kafka maintains the feeds of messages.
Each message that is published to a Kafka cluster is assigned a topic. Each topic contains a group of messages.
NoteKafka Reader in each data synchronization node can read data from only one topic.
Key Type
The data type of the keys in the Kafka topic. The value of this parameter determines the setting of key.deserializer that is used to initialize a Kafka consumer. Valid values: String, ByteArray, Double, Float, Integer, Long, and Short.
Value Type
The data type of the values in the Kafka topic. The value of this parameter determines the setting of value.deserializer that is used to initialize a Kafka consumer. Valid values: String, ByteArray, Double, Float, Integer, Long, and Short.
Output Mode
The mode in which Kafka Reader parses messages in the Kafka topic. Valid values:
Single-row Output: Kafka Reader parses messages as unstructured strings or JSON objects. One message is parsed into one output record.
Multi-row Output: Kafka Reader parses messages as JSON arrays. Each array element is parsed into one output record. Therefore, one message may be parsed into multiple output records.
NoteThis parameter is supported only in some regions and will be supported in other regions in the future.
Path of Array
The path of the JSON array in the value of the Kafka message. This parameter is displayed only if you set the Output Mode parameter to Multi-row Output. If you want to reference the fields in a specific JSON object, you can specify a value for this parameter in the
a.a1
format. If you want to reference the fields in a specific JSON array, you can specify a value for this parameter in thea[0].a1
format. If you leave this parameter empty, Kafka Reader parses the value of a message as a JSON array.You must make sure that the JSON array to be parsed is an object array such as
[{"a":"hello"},{"b":"world"}]
, instead of a numeric array or string array such as["a","b"]
.Configuration parameters
The extended parameters that you can configure when you create a Kafka consumer. For example, you can configure the bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms parameters. For more information about the parameters supported by Kafka clusters of different versions for Kafka consumers, see Documentation of Apache Kafka. You can configure parameters in KafkaConfig to control the data read behavior of a Kafka consumer. For a real-time synchronization node that synchronizes data from Kafka, a Kafka consumer uses a random string as the value of the
group.id
parameter. If you want the synchronization offset to be uploaded to a specified group in the Kafka cluster, you can manually specify a value for thegroup.id
parameter. A real-time synchronization node that synchronizes data from Kafka does not manage offsets based on the group information maintained by the Kafka server. Therefore, the setting of thegroup.id
parameter does not affect the synchronization offset after the data synchronization node is started or restarted or after a failover is performed on the node.Output Fields
The output fields, which can be customized.
Click Add more fields. In the fields that appear, enter a field name and select a data type to customize a field.
DataWorks provides two types of methods based on which Kafka Reader obtains values for fields from messages. You can click the icon to switch between the two types of methods.
Default methods:
value: the values of messages
key: the keys of messages
partition: the IDs of partitions
offset: the offsets of messages
timestamp: the timestamps of messages, in milliseconds
headers: the headers of messages
JSON-based parsing: You can use the .Sub-field or [Element in an array] syntax to obtain the content in the JSON format. To ensure that the values of fields are compatible with historical logic, you can use a string that starts with two underscores (_), such as __value__, to obtain specific values for fields from messages. The following code shows the data in a sample Kafka message:
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }
You can use one of the following methods based on the preceding code:
If you want to read the values of messages, use __value__.
If you want to read the keys of messages, use __key__.
If you want to read the partitions that store messages, use __partition__.
If you want to read the offsets of messages, use __offset__.
If you want to read the timestamps of messages, use __timestamp__.
If you want to read the headers of messages, use __headers__.
If you want to read "hello" in the a1 field, use a.a1.
If you want to read "world" in the b field, use b.
If you want to read "yyyyyyy" in the c field, use c[1].
If you want to read "this" in the AA field, use d[0].AA.
To remove a field, move the pointer over the field and click the icon.
Sample scenario: If you set the Output Mode parameter to Multi-row Output, Kafka Reader parses messages as JSON arrays based on the JSON array path that is specified by the Path of Array parameter, extracts each element in the JSON arrays, and then forms output fields based on the field names that are defined and the specified method to obtain values. The method to obtain values for the output fields is the same as the method to obtain values that is used if you set the Output Mode parameter to Single-row Output. You can use the .Sub-field or [Element in an array] syntax to obtain the content in the JSON format. The following code shows the data in a sample Kafka message: The following code shows the data in a sample Kafka message:
{ "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }
If you specify a value in the
c.c0
format for the Array of Path parameter and define the output fieldAA
whose method to obtain a value isAA
, and the output fieldBB
whose method to obtain a value isBB
, the output records shown in the following table will be obtained.In the top toolbar of the configuration tab of the real-time synchronization node, click the icon to save the node.
NoteKafka Reader in each data synchronization node can read data from only one topic.