All Products
Search
Document Center

DataWorks:Synchronize incremental data from Kafka to MaxCompute

Last Updated:Jul 08, 2024

This topic describes how to configure a batch synchronization task that can be scheduled by minute, hour, or day to synchronize incremental data from Kafka to the hourly or daily partitions in a MaxCompute table.

Precautions

  • The version of the Kafka data source that you use must range from 0.10.2 to 2.2.x, and timestamps must be used to record the time when messages are transferred to Kafka.

  • If messages whose timestamps are earlier than or equal to the start offset of your batch synchronization task are transferred to the specified Kafka topic after the task starts to synchronize incremental data from Kafka, the messages may be missed. In this case, you must pay attention to data missing risks caused by latency or timestamp disorder of messages transferred to the Kafka topic.

  • You can set the Exit Strategy parameter to Exit when poll nothing in 1 minute for your batch synchronization task in the codeless user interface (UI) only if the following conditions are met. If you configure this setting but the conditions are not met, some incremental data in Kafka may be missed when the task is run.

    • No data is written to all or some partitions in the specified Kafka topic for 10 or more minutes.

    • After each auto triggered instance generated for the batch synchronization task is started, no messages whose timestamps are earlier than the value of the End Time parameter are written to the specified Kafka topic.

Make preparations

  • Create a workspace in which you want to run a batch synchronization task and an exclusive resource group for Data Integration that can be used to synchronize data. For information about how to create an exclusive resource group for Data Integration, see Create and use an exclusive resource group for Data Integration. In this example, a workspace in standard mode and an exclusive resource group for Data Integration are created, and the resource group is associated with the workspace.

  • Add a Kafka data source to the workspace and test the network connectivity between the Kafka data source and the exclusive resource group for Data Integration. For more information, see Add a Kafka data source.

    Important

    When you configure a batch synchronization task to synchronize incremental data from Kafka in a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. This way, when the batch synchronization task is run, the task can synchronize data from the topic in the Kafka data source used in the production environment.

  • Add a MaxCompute data source. For more information, see Add a MaxCompute data source.

Create a batch synchronization task

You can find the required workflow in the Scheduled Workflow pane of the DataStudio page in the DataWorks console and create a batch synchronization task in the workflow. When you create a batch synchronization task, you must configure the parameters such as Path and Name. For more information, see Configure a batch synchronization task by using the codeless UI.

Configure the data source and resource group

After you create a batch synchronization task, you must configure the source, resource group, and destination and test the network connectivity.

Configure the source

Configure parameters related to the source. In this example, a batch synchronization task is created to synchronize incremental data from Kafka to MaxCompute. The following figure shows the parameters that can be configured for the source.数据来源kafka

Note

For more information about the parameters that can be configured for a batch synchronization task used to synchronize data from Kafka, see Kafka Reader. The following table describes the parameters that need to be configured in this example.

Parameter

Description

Data source and Topic

The name of the Kafka topic from which you want to read data. If you use a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. You can select the topic from the Topic drop-down list.

Note

Before you configure the Topic parameter, take note of the following items:

  • If the Kafka data source used in the development environment does not contain the topic, the topic is not displayed in the drop-down list when you configure the Topic parameter.

  • If the Kafka data source used in the production environment does not contain the topic, the batch synchronization task fails after the task is committed and deployed. This is because the task cannot find the topic in the production environment.

Consumer Group ID

The consumer group ID. The ID can be used to collect statistics on and monitor data consumption in the Kafka data source. You can configure this parameter based on your business requirements. If you configure this parameter, make sure that the ID is unique in the Kafka data source.

Kafka version

The version range of the Kafka data source.

Note

The version of the Kafka data source must range from 0.10.2 to 2.2.x.

Read From, Start Time, Read To, and End Time

You can set Read From and Read To to Specific Offset, Start Time to ${startTime}, and End Time to ${endTime}. ${startTime} and ${endTime} are scheduling parameters.

These parameters specify the start offset and end offset of data synchronization. In this example, the batch synchronization task synchronizes data within the time range specified by the ${startTime} and ${endTime} scheduling parameters. When the batch synchronization task is run, the system replaces ${startTime} and ${endTime} with actual values based on the scheduling configurations of the task.

Time Zone

You can leave this parameter empty or retain the default value. The default value of this parameter is the time zone of the server in the region where the DataWorks workspace resides.

Note

If you have asked the technical personnel to change the time zone in which you want to schedule tasks, you can select the new time zone.

Key Type, Value Type, and Encoding

You can configure these parameters based on your business requirements.

Exit Strategy

You can set this parameter to Exit when poll nothing in 1 minute only if the following conditions are met. If the following conditions are not met, set this parameter to Exit when reach configured end offset or time.

  • No data is written to all or some partitions in the specified Kafka topic for 10 or more minutes.

  • After each auto triggered instance generated for the batch synchronization task is started, no messages whose timestamps are earlier than the value of the End Time parameter are written to the specified Kafka topic.

Advanced configuration

You can retain the default value.

Configure the destination

In the Configure tasks step, you can configure parameters related to the destination. In this example, a batch synchronization task is created to synchronize incremental data from Kafka to MaxCompute. The following figure shows the parameters that can be configured for the destination.去向MaxCompute

Parameter

Description

Data source

The name of the MaxCompute data source that you added.

Table

The name of the table to which you want to write data. If you use a workspace in standard mode, you must make sure that the MaxCompute data source used in the production environment of the workspace contains a table whose name and schema are the same as those of a table in the MaxCompute data source used in the development environment of the workspace.

Note

Before you configure the Topic parameter, take note of the following items:

  • If the MaxCompute data source used in the development environment does not contain the table, the table is not displayed in the drop-down list when you configure the Table parameter.

  • If the MaxCompute data source used in the production environment does not contain the table, the batch synchronization task fails after the task is committed and deployed. This is because the task cannot find the table in the production environment.

  • If the schema of the table in the production environment is different from the schema of the table in the development environment, field mappings that are established when the batch synchronization task is run may be different from the field mappings that are configured in the Mappings step. As a result, data may be incorrectly written to the destination.

If the MaxCompute table to which you want to write data is not created, you can click Generate Destination Table Schema to quickly generate a table creation statement. For more information, see Appendix: Generate the schema of a destination table.

Partition Key Column

In this example, the destination MaxCompute table contains one partition key column ds, and the ${partition} scheduling parameter is entered in the Partition Key Column field. This way, each time the batch synchronization task is run, data is written to the partition specified by the ${partition} scheduling parameter. The system replaces the ${partition} scheduling parameter with the actual value based on the scheduling configurations of the task.

Note

Whether the Partition Key Column parameter needs to be configured and the number of times the system displays the Partition Key Column parameter vary based on the type of the destination MaxCompute table that you select. If you select a non-partitioned destination MaxCompute table, you do not need to manually configure this parameter. If you select a partitioned destination MaxCompute table, the system displays the Partition Key Column parameter based on the number of partition key columns in the table and the names of the partition key columns. The number of times that the system displays the Partition Key Column parameter is the same as the number of partition key columns in the table.

You can retain default values for the parameters that are not described in the table.

Configure field mappings

  1. In the Field Mapping step, you can edit fields in the Kafka data source.

    • The following table describes the default fields in Kafka.

      Field name

      Description

      __key__

      The key of a Kafka message.

      __value__

      The value of a Kafka message.

      __partition__

      The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0.

      __headers__

      The headers of a Kafka message.

      __offset__

      The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0.

      __timestamp__

      The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds.

    • You can specify a method for parsing JSON-formatted content in Kafka messages based on your business requirements. You can use the .Sub-field or [Element in an array] syntax to obtain data in the JSON-formatted values of a message.

      Important

      If the name of a field in the JSON-formatted values of a Kafka message contains periods (.), the value of the field cannot be obtained or synchronized because of a field definition syntax error.

      The following code provides an example of data in the JSON-formatted values of a message.

      {
            "a": {
            "a1": "hello"
            },
            "b": "world",
            "c":[
                  "xxxxxxx",
                  "yyyyyyy"
                  ],
            "d":[
                  {
                        "AA":"this",
                        "BB":"is_data"
                  },
                  {
                        "AA":"that",
                        "BB":"is_also_data"
                  }
              ],
           "a.b": "unreachable"
      }
      • If you want to synchronize "hello" in a1, add the a.a1 field to the Kafka data source.

      • If you want to synchronize "world" in b, add the b field to the Kafka data source.

      • If you want to synchronize "yyyyyyy" in c, add the c[1] field to the Kafka data source.

      • If you want to synchronize "this" in AA, add the d[0].AA field to the Kafka data source.

      • If you add the a.b to the Kafka data source, "unreachable" cannot be synchronized.

  2. In the Field Mapping step, you can configure mappings between fields in the Kafka data source and fields in the MaxCompute data source.

    • DataWorks allows the existence of fields that have no mapped fields. If some fields in the source have no mapped fields in the destination, the batch synchronization task does not synchronize the fields from the source. If some fields in the destination have no mapped fields in the source, NULL is written to the fields in the destination as values.

    • Each field in the source can be mapped to only one field in the destination, and each field in the destination can have only one mapped field in the source.

Configure scheduling settings

This section describes the scheduling settings that can be configured for a batch synchronization task. For information about common scheduling settings and all scheduling settings that can be configured for a batch synchronization task, see the topics in the Schedule directory.

  • Configure scheduling parameters.

    In the preceding configurations of the batch synchronization task, the following scheduling parameters are used: ${startTime}, ${endTime}, and ${partition}. You must specify value formats for the scheduling parameters based on your business requirements when you configure scheduling settings for the task. The following table provides configuration examples for some typical business scenarios.

    Scenario

    Recommended configuration

    Sample scenario description

    You want to schedule the batch synchronization task every 5 minutes.

    调度参数1

    • startTime=$[yyyymmddhh24mi-8/24/60]00

    • endTime=$[yyyymmddhh24mi-3/24/60]00

    • partition=$[yyyymmddhh24mi-8/24/60]

    If the batch synchronization task is scheduled at 10:00 on November 22, 2022, the following situations occur:

    • Data in the time range from 09:52 on November 22, 2022 to 09:57 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.

    • The data synchronized from the specified Kafka topic is written to the 202211220952 partition in the MaxCompute table.

    • The time specified by endTime is 3 minutes earlier than the scheduling time of the batch synchronization task. The scheduling time of the batch synchronization task is specified by $[yyyymmddhh24mi]. This configuration helps ensure that data in the specified time range is written to the specified Kafka topic before the task starts to read the data and prevents data missing.

    You want to schedule the batch synchronization task every hour.

    调度参数22

    • startTime=$[yyyymmddhh24-1/24]0000

    • endTime=$[yyyymmddhh24]0000

    • partition=$[yyyymmddhh24]

    Note
    • If you want to schedule the batch synchronization task every 2 hours, set startTime to $[yyyymmddhh24-2/24]0000 and retain the settings of the endTime and partition parameters.

    • If you want to schedule the batch synchronization task every 3 hours, set startTime to $[yyyymmddhh24-3/24]00000 and retain the settings of the endTime and partition parameters.

    • You can refer to the preceding two configuration examples to configure scheduling parameters for a batch synchronization task that is scheduled by hour.

    If the batch synchronization task is scheduled at 10:05 on November 22, 2022, the following situations occur:

    • Data in the time range from 09:00 on November 22, 2022 to 10:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.

    • The data synchronized from the specified Kafka topic is written to the 2022112210 partition in the MaxCompute table.

    You want to schedule the batch synchronization task every day.

    调度参数3

    • startTime=$[yyyymmdd-1]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    If the batch synchronization task is scheduled at 00:05 on November 22, 2022, the following situations occur:

    • Data in the time range from 00:00 on November 21, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.

    • The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.

    You want to schedule the batch synchronization task every week.

    调度参数4

    • startTime=$[yyyymmdd-7]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    If the batch synchronization task is scheduled at 00:05 on November 22, 2022, the following situations occur:

    • Data in the time range from 00:00 on November 15, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.

    • The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.

    You want to schedule the batch synchronization task every month.

    调度参数4

    • startTime=$[add_months(yyyymmdd,-1)]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    If the batch synchronization task is scheduled at 00:05 on November 22, 2022, the following situations occur:

    • Data in the time range from 00:00 on October 22, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.

    • The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.

  • Configure a scheduling cycle.

    • You can configure a scheduling cycle for the batch synchronization task based on the frequency at which you want to schedule the task.

      Scenario

      Recommended configuration

      Sample scenario description

      You want to schedule the batch synchronization task every 5 minutes.

      • Scheduling Cycle: Set this parameter to Minute.

      • Start From: Set this parameter to 00:00.

      • Interval: Set this parameter to 05. Unit: minutes.

      • End At: Set this parameter to 23:59.

      N/A.

      You want to schedule the batch synchronization task every hour.

      • Scheduling Cycle: Set this parameter to Hour.

      • Start From: Set this parameter to 00:15.

      • Interval: Set this parameter to 1. Unit: hours.

      • End At: Set this parameter to 23:59.

      You can specify a point in time that is later than 00:00, such as 00:15, for the Start From parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.

      You want to schedule the batch synchronization task every day.

      • Scheduling Cycle: Set this parameter to Day.

      • Run At: Set this parameter to 00:15.

      You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.

      You want to schedule the batch synchronization task every week.

      • Scheduling Cycle: Set this parameter to Week.

      • Run Every: Set this parameter to Monday.

      • Run At: Set this parameter to 00:15.

      You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.

      You want to schedule the batch synchronization task every month.

      • Scheduling Cycle: Set this parameter to Month.

      • Run Every: Set this parameter to Day 1.

      • Run At: Set this parameter to 00:15.

      You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.

      Important

      If messages whose timestamps are earlier than or equal to the start offset of your batch synchronization task are written to the specified Kafka topic after the task starts to synchronize data from Kafka, the data may be missed. In this case, you must pay attention to the data missing risks caused by latency or timestamp disorder of data transferred to the Kafka topic.

    • Configure the rerun attribute. We recommend that you select the Auto Rerun upon Error check box and set the Number of re-runs parameter to 3 and the Rerun interval parameter to 2. This way, if an error occurs on the batch synchronization task, the task can automatically rerun.

  • Configure a resource group for scheduling.

    You need to select a resource group for scheduling. We recommend that you use an exclusive resource group for scheduling.

  • Configure scheduling dependencies.

    The batch synchronization task configured in this example does not depend on other tasks. You need to only click Add Root Node in the Dependencies section of the Properties tab to configure scheduling dependencies for the task.

Configure a resource group for Data Integration

You can select the resource group for Data Integration that is connected to the Kafka and MaxCompute data sources.

Test the task

After the preceding configuration is complete, you can test the batch synchronization task on the DataStudio page to check whether the task can run as expected.

  1. Click the 带参运行 icon in the top toolbar of the configuration tab of the batch synchronization task. In the dialog box that appears, assign values to the ${startTime}, ${endTime}, and ${partition} scheduling parameters and select a resource group for scheduling.带参运行

  2. Check the running results of the batch synchronization task.

  3. Create an ad hoc query task and execute the following statements to check whether all data that is read from Kafka is correctly written to MaxCompute:

    select * from test_project.test_table where ds=2022112200 limit 10;
    select count(*) from test_project.test_table where ds=2022112200;

Commit and deploy the batch synchronization task

If the test on the batch synchronization task is successful, you can save the configurations of the task and commit and deploy the task to Operation Center. The batch synchronization task periodically synchronizes data from Kafka to MaxCompute. For information about how to deploy a task, see Deploy nodes.

Appendix: Generate the schema of a destination table

After you click Generate Destination Table Schema when you configure the destination, the statement that can be used to create a destination MaxCompute table is generated. The statement defines the name of the table and the fields in the table. 一键生成目标表

  • The destination MaxCompute table name defined in the statement is the same as the name of the Kafka topic that you specified.

  • The destination MaxCompute table contains six fields, and the fields map to the following Kafka fields.

    Field name

    Description

    __key__

    The key of a Kafka message.

    __value__

    The value of a Kafka message.

    __partition__

    The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0.

    __headers__

    The headers of a Kafka message.

    __offset__

    The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0.

    __timestamp__

    The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds.

  • By default, the lifecycle of a MaxCompute table is 100 years.

You can modify the items defined in the statement based on your business requirements. In addition, you can configure the batch synchronization task to parse the JSON-formatted values of Kafka messages and add the fields obtained from the parsing results to the default table creation statement based on your business requirements.