All Products
Search
Document Center

DataWorks:Configure Kafka Reader

Last Updated:Nov 27, 2024

Kafka Reader uses a Kafka SDK to read data from Kafka in real time.

Background information

Note
  • Kafka Reader can read data from ApsaraMQ for Kafka data sources and self-managed Kafka data sources. However, the versions of self-managed Kafka data sources must range from 0.10.2 to 2.2.x.

  • Self-managed Kafka data sources whose versions are earlier than 0.10.2 do not support the query of offsets of partition data and do not support timestamps. If you use such a Kafka data source in a data synchronization node, the latency data that is displayed in Operation Center for the data synchronization node may be incorrect, and the offset from which incremental data starts to be synchronized cannot be reset.

For more information about how to add a Kafka data source, see Add a Kafka data source.

Procedure

  1. Go to the DataStudio page.

    Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and Governance > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

  2. In the Scheduled Workflow pane of the DataStudio page, move the pointer over the 新建 icon and choose Create Node > Data Integration > Real-time Synchronization.

    Alternatively, find the desired workflow in the Scheduled Workflow pane, right-click the workflow name, and then choose Create Node > Data Integration > Real-time Synchronization.

  3. In the Create Node dialog box, set the Sync Method parameter to End-to-end ETL and configure the Name and Path parameters.

    Important

    The node name cannot exceed 128 characters in length and can contain only letters, digits, underscores (_), and periods (.).

  4. Click Confirm.

  5. On the configuration tab of the real-time synchronization node, drag Kafka in the Input section to the canvas on the right.

  6. Click the Kafka node. In the panel that appears, configure the parameters.

    image

    Parameter

    Description

    Data source

    The name of the Kafka data source that you added to DataWorks. You can select only a Kafka data source. If no data source is available, click New data source on the right to go to the Data Sources page in Management Center to add a Kafka data source. For more information, see Add a Kafka data source.

    Topic

    The name of the Kafka topic from which you want to read data. Topics are categories in which Kafka maintains the feeds of messages.

    Each message that is published to a Kafka cluster is assigned a topic. Each topic contains a group of messages.

    Note

    Kafka Reader in each data synchronization node can read data from only one topic.

    Key Type

    The data type of the keys in the Kafka topic. The value of this parameter determines the setting of key.deserializer that is used to initialize a Kafka consumer. Valid values: String, ByteArray, Double, Float, Integer, Long, and Short.

    Value Type

    The data type of the values in the Kafka topic. The value of this parameter determines the setting of value.deserializer that is used to initialize a Kafka consumer. Valid values: String, ByteArray, Double, Float, Integer, Long, and Short.

    Output Mode

    The mode in which Kafka Reader parses messages in the Kafka topic. Valid values:

    • Single-row Output: Kafka Reader parses messages as unstructured strings or JSON objects. One message is parsed into one output record.

    • Multi-row Output: Kafka Reader parses messages as JSON arrays. Each array element is parsed into one output record. Therefore, one message may be parsed into multiple output records.

    Note

    This parameter is supported only in some regions and will be supported in other regions in the future.

    Path of Array

    The path of the JSON array in the value of the Kafka message. This parameter is displayed only if you set the Output Mode parameter to Multi-row Output. If you want to reference the fields in a specific JSON object, you can specify a value for this parameter in the a.a1 format. If you want to reference the fields in a specific JSON array, you can specify a value for this parameter in the a[0].a1 format. If you leave this parameter empty, Kafka Reader parses the value of a message as a JSON array.

    You must make sure that the JSON array to be parsed is an object array such as [{"a":"hello"},{"b":"world"}], instead of a numeric array or string array such as ["a","b"].

    Configuration parameters

    The extended parameters that you can configure when you create a Kafka consumer. For example, you can configure the bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms parameters. For more information about the parameters supported by Kafka clusters of different versions for Kafka consumers, see Documentation of Apache Kafka. You can configure parameters in KafkaConfig to control the data read behavior of a Kafka consumer. For a real-time synchronization node that synchronizes data from Kafka, a Kafka consumer uses a random string as the value of the group.id parameter. If you want the synchronization offset to be uploaded to a specified group in the Kafka cluster, you can manually specify a value for the group.id parameter. A real-time synchronization node that synchronizes data from Kafka does not manage offsets based on the group information maintained by the Kafka server. Therefore, the setting of the group.id parameter does not affect the synchronization offset after the data synchronization node is started or restarted or after a failover is performed on the node.

    Output Fields

    The output fields, which can be customized.

    • Click Add more fields. In the fields that appear, enter a field name and select a data type to customize a field.

      DataWorks provides two types of methods based on which Kafka Reader obtains values for fields from messages. You can click the 箭头 icon to switch between the two types of methods.

      • Default methods:

        • value: the values of messages

        • key: the keys of messages

        • partition: the IDs of partitions

        • offset: the offsets of messages

        • timestamp: the timestamps of messages, in milliseconds

        • headers: the headers of messages

      • JSON-based parsing: You can use the .Sub-field or [Element in an array] syntax to obtain the content in the JSON format. To ensure that the values of fields are compatible with historical logic, you can use a string that starts with two underscores (_), such as __value__, to obtain specific values for fields from messages. The following code shows the data in a sample Kafka message:

        {
           "a": {
           "a1": "hello"
           },
           "b": "world",
           "c":[
              "xxxxxxx",
              "yyyyyyy"
              ],
           "d":[
              {
                 "AA":"this",
                 "BB":"is_data"
              },
              {
                 "AA":"that",
                 "BB":"is_also_data"
              }
            ]
        }
        • You can use one of the following methods based on the preceding code:

          • If you want to read the values of messages, use __value__.

          • If you want to read the keys of messages, use __key__.

          • If you want to read the partitions that store messages, use __partition__.

          • If you want to read the offsets of messages, use __offset__.

          • If you want to read the timestamps of messages, use __timestamp__.

          • If you want to read the headers of messages, use __headers__.

          • If you want to read "hello" in the a1 field, use a.a1.

          • If you want to read "world" in the b field, use b.

          • If you want to read "yyyyyyy" in the c field, use c[1].

          • If you want to read "this" in the AA field, use d[0].AA.

    • To remove a field, move the pointer over the field and click the 删除 icon.

    Sample scenario: If you set the Output Mode parameter to Multi-row Output, Kafka Reader parses messages as JSON arrays based on the JSON array path that is specified by the Path of Array parameter, extracts each element in the JSON arrays, and then forms output fields based on the field names that are defined and the specified method to obtain values. The method to obtain values for the output fields is the same as the method to obtain values that is used if you set the Output Mode parameter to Single-row Output. You can use the .Sub-field or [Element in an array] syntax to obtain the content in the JSON format. The following code shows the data in a sample Kafka message: The following code shows the data in a sample Kafka message:

    {
        "c": {
            "c0": [
                {
                    "AA": "this",
                    "BB": "is_data"
                },
                {
                    "AA": "that",
                    "BB": "is_also_data"
                }
            ]
        }
    }

    If you specify a value in the c.c0 format for the Array of Path parameter and define the output field AA whose method to obtain a value is AA, and the output field BB whose method to obtain a value is BB, the output records shown in the following table will be obtained.记录

  7. In the top toolbar of the configuration tab of the real-time synchronization node, click the 保存 icon to save the node.

    Note

    Kafka Reader in each data synchronization node can read data from only one topic.