This topic describes how to configure a batch synchronization task that can be scheduled by minute, hour, or day to synchronize incremental data from Kafka to the hourly or daily partitions in a MaxCompute table.
Precautions
The version of the Kafka data source that you use must range from 0.10.2 to 2.2.x, and timestamps must be used to record the time when messages are transferred to Kafka.
If messages whose timestamps are earlier than or equal to the start offset of your batch synchronization task are transferred to the specified Kafka topic after the task starts to synchronize incremental data from Kafka, the messages may be missed. In this case, you must pay attention to data missing risks caused by latency or timestamp disorder of messages transferred to the Kafka topic.
You can set the Exit Strategy parameter to Exit when poll nothing in 1 minute for your batch synchronization task in the codeless user interface (UI) only if the following conditions are met. If you configure this setting but the conditions are not met, some incremental data in Kafka may be missed when the task is run.
No data is written to all or some partitions in the specified Kafka topic for 10 or more minutes.
After each auto triggered instance generated for the batch synchronization task is started, no messages whose timestamps are earlier than the value of the End Time parameter are written to the specified Kafka topic.
Make preparations
Create a workspace in which you want to run a batch synchronization task and an exclusive resource group for Data Integration that can be used to synchronize data. For information about how to create an exclusive resource group for Data Integration, see Create and use an exclusive resource group for Data Integration. In this example, a workspace in standard mode and an exclusive resource group for Data Integration are created, and the resource group is associated with the workspace.
Add a Kafka data source to the workspace and test the network connectivity between the Kafka data source and the exclusive resource group for Data Integration. For more information, see Add a Kafka data source.
ImportantWhen you configure a batch synchronization task to synchronize incremental data from Kafka in a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. This way, when the batch synchronization task is run, the task can synchronize data from the topic in the Kafka data source used in the production environment.
Add a MaxCompute data source. For more information, see Add a MaxCompute data source.
Create a batch synchronization task
You can find the required workflow in the Scheduled Workflow pane of the DataStudio page in the DataWorks console and create a batch synchronization task in the workflow. When you create a batch synchronization task, you must configure the parameters such as Path and Name. For more information, see Configure a batch synchronization task by using the codeless UI.
Configure the data source and resource group
After you create a batch synchronization task, you must configure the source, resource group, and destination and test the network connectivity.
Configure the source
Configure parameters related to the source. In this example, a batch synchronization task is created to synchronize incremental data from Kafka to MaxCompute. The following figure shows the parameters that can be configured for the source.
For more information about the parameters that can be configured for a batch synchronization task used to synchronize data from Kafka, see Kafka Reader. The following table describes the parameters that need to be configured in this example.
Parameter | Description |
Data source and Topic | The name of the Kafka topic from which you want to read data. If you use a workspace in standard mode, you must make sure that the Kafka data source used in the production environment of the workspace contains a topic whose name is the same as a topic in the Kafka data source used in the development environment of the workspace. You can select the topic from the Topic drop-down list. Note Before you configure the Topic parameter, take note of the following items:
|
Consumer Group ID | The consumer group ID. The ID can be used to collect statistics on and monitor data consumption in the Kafka data source. You can configure this parameter based on your business requirements. If you configure this parameter, make sure that the ID is unique in the Kafka data source. |
Kafka version | The version range of the Kafka data source. Note The version of the Kafka data source must range from 0.10.2 to 2.2.x. |
Read From, Start Time, Read To, and End Time | You can set Read From and Read To to Specific Offset, Start Time to These parameters specify the start offset and end offset of data synchronization. In this example, the batch synchronization task synchronizes data within the time range specified by the |
Time Zone | You can leave this parameter empty or retain the default value. The default value of this parameter is the time zone of the server in the region where the DataWorks workspace resides. Note If you have asked the technical personnel to change the time zone in which you want to schedule tasks, you can select the new time zone. |
Key Type, Value Type, and Encoding | You can configure these parameters based on your business requirements. |
Exit Strategy | You can set this parameter to Exit when poll nothing in 1 minute only if the following conditions are met. If the following conditions are not met, set this parameter to Exit when reach configured end offset or time.
|
Advanced configuration | You can retain the default value. |
Configure the destination
In the Configure tasks step, you can configure parameters related to the destination. In this example, a batch synchronization task is created to synchronize incremental data from Kafka to MaxCompute. The following figure shows the parameters that can be configured for the destination.
Parameter | Description |
Data source | The name of the MaxCompute data source that you added. |
Table | The name of the table to which you want to write data. If you use a workspace in standard mode, you must make sure that the MaxCompute data source used in the production environment of the workspace contains a table whose name and schema are the same as those of a table in the MaxCompute data source used in the development environment of the workspace. Note Before you configure the Topic parameter, take note of the following items:
If the MaxCompute table to which you want to write data is not created, you can click Generate Destination Table Schema to quickly generate a table creation statement. For more information, see Appendix: Generate the schema of a destination table. |
Partition Key Column | In this example, the destination MaxCompute table contains one partition key column ds, and the Note Whether the Partition Key Column parameter needs to be configured and the number of times the system displays the Partition Key Column parameter vary based on the type of the destination MaxCompute table that you select. If you select a non-partitioned destination MaxCompute table, you do not need to manually configure this parameter. If you select a partitioned destination MaxCompute table, the system displays the Partition Key Column parameter based on the number of partition key columns in the table and the names of the partition key columns. The number of times that the system displays the Partition Key Column parameter is the same as the number of partition key columns in the table. |
You can retain default values for the parameters that are not described in the table.
Configure field mappings
In the Field Mapping step, you can edit fields in the Kafka data source.
The following table describes the default fields in Kafka.
Field name
Description
__key__
The key of a Kafka message.
__value__
The value of a Kafka message.
__partition__
The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0.
__headers__
The headers of a Kafka message.
__offset__
The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0.
__timestamp__
The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds.
You can specify a method for parsing JSON-formatted content in Kafka messages based on your business requirements. You can use the
.Sub-field
or[Element in an array]
syntax to obtain data in the JSON-formatted values of a message.ImportantIf the name of a field in the JSON-formatted values of a Kafka message contains periods (.), the value of the field cannot be obtained or synchronized because of a field definition syntax error.
The following code provides an example of data in the JSON-formatted values of a message.
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ], "a.b": "unreachable" }
If you want to synchronize
"hello"
in a1, add thea.a1
field to the Kafka data source.If you want to synchronize
"world"
in b, add theb
field to the Kafka data source.If you want to synchronize
"yyyyyyy"
in c, add thec[1]
field to the Kafka data source.If you want to synchronize
"this"
in AA, add thed[0].AA
field to the Kafka data source.If you add the
a.b
to the Kafka data source,"unreachable"
cannot be synchronized.
In the Field Mapping step, you can configure mappings between fields in the Kafka data source and fields in the MaxCompute data source.
DataWorks allows the existence of fields that have no mapped fields. If some fields in the source have no mapped fields in the destination, the batch synchronization task does not synchronize the fields from the source. If some fields in the destination have no mapped fields in the source, NULL is written to the fields in the destination as values.
Each field in the source can be mapped to only one field in the destination, and each field in the destination can have only one mapped field in the source.
Configure scheduling settings
This section describes the scheduling settings that can be configured for a batch synchronization task. For information about common scheduling settings and all scheduling settings that can be configured for a batch synchronization task, see the topics in the Schedule directory.
Configure scheduling parameters.
In the preceding configurations of the batch synchronization task, the following scheduling parameters are used:
${startTime}
,${endTime}
, and${partition}
. You must specify value formats for the scheduling parameters based on your business requirements when you configure scheduling settings for the task. The following table provides configuration examples for some typical business scenarios.Scenario
Recommended configuration
Sample scenario description
You want to schedule the batch synchronization task every 5 minutes.
startTime=$[yyyymmddhh24mi-8/24/60]00
endTime=$[yyyymmddhh24mi-3/24/60]00
partition=$[yyyymmddhh24mi-8/24/60]
If the batch synchronization task is scheduled at 10:00 on November 22, 2022, the following situations occur:
Data in the time range from 09:52 on November 22, 2022 to 09:57 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
The data synchronized from the specified Kafka topic is written to the 202211220952 partition in the MaxCompute table.
The time specified by
endTime
is 3 minutes earlier than the scheduling time of the batch synchronization task. The scheduling time of the batch synchronization task is specified by $[yyyymmddhh24mi]. This configuration helps ensure that data in the specified time range is written to the specified Kafka topic before the task starts to read the data and prevents data missing.
You want to schedule the batch synchronization task every hour.
startTime=$[yyyymmddhh24-1/24]0000
endTime=$[yyyymmddhh24]0000
partition=$[yyyymmddhh24]
NoteIf you want to schedule the batch synchronization task every 2 hours, set startTime to $[yyyymmddhh24-2/24]0000 and retain the settings of the endTime and partition parameters.
If you want to schedule the batch synchronization task every 3 hours, set startTime to $[yyyymmddhh24-3/24]00000 and retain the settings of the endTime and partition parameters.
You can refer to the preceding two configuration examples to configure scheduling parameters for a batch synchronization task that is scheduled by hour.
If the batch synchronization task is scheduled at 10:05 on November 22, 2022, the following situations occur:
Data in the time range from 09:00 on November 22, 2022 to 10:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
The data synchronized from the specified Kafka topic is written to the 2022112210 partition in the MaxCompute table.
You want to schedule the batch synchronization task every day.
startTime=$[yyyymmdd-1]000000
endTime=$[yyyymmdd]000000
partition=$[yyyymmdd-1]
If the batch synchronization task is scheduled at 00:05 on November 22, 2022, the following situations occur:
Data in the time range from 00:00 on November 21, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
You want to schedule the batch synchronization task every week.
startTime=$[yyyymmdd-7]000000
endTime=$[yyyymmdd]000000
partition=$[yyyymmdd-1]
If the batch synchronization task is scheduled at 00:05 on November 22, 2022, the following situations occur:
Data in the time range from 00:00 on November 15, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
You want to schedule the batch synchronization task every month.
startTime=$[add_months(yyyymmdd,-1)]000000
endTime=$[yyyymmdd]000000
partition=$[yyyymmdd-1]
If the batch synchronization task is scheduled at 00:05 on November 22, 2022, the following situations occur:
Data in the time range from 00:00 on October 22, 2022 to 00:00 on November 22, 2022 in the specified Kafka topic is synchronized to MaxCompute. The time range is a left-closed, right-open interval.
The data synchronized from the specified Kafka topic is written to the 20221121 partition in the MaxCompute table.
Configure a scheduling cycle.
You can configure a scheduling cycle for the batch synchronization task based on the frequency at which you want to schedule the task.
Scenario
Recommended configuration
Sample scenario description
You want to schedule the batch synchronization task every 5 minutes.
Scheduling Cycle: Set this parameter to Minute.
Start From: Set this parameter to 00:00.
Interval: Set this parameter to 05. Unit: minutes.
End At: Set this parameter to 23:59.
N/A.
You want to schedule the batch synchronization task every hour.
Scheduling Cycle: Set this parameter to Hour.
Start From: Set this parameter to 00:15.
Interval: Set this parameter to 1. Unit: hours.
End At: Set this parameter to 23:59.
You can specify a point in time that is later than 00:00, such as 00:15, for the Start From parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.
You want to schedule the batch synchronization task every day.
Scheduling Cycle: Set this parameter to Day.
Run At: Set this parameter to 00:15.
You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.
You want to schedule the batch synchronization task every week.
Scheduling Cycle: Set this parameter to Week.
Run Every: Set this parameter to Monday.
Run At: Set this parameter to 00:15.
You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.
You want to schedule the batch synchronization task every month.
Scheduling Cycle: Set this parameter to Month.
Run Every: Set this parameter to Day 1.
Run At: Set this parameter to 00:15.
You can specify a point in time that is later than 00:00, such as 00:15, for the Run At parameter. This ensures that data in the specified time range is written to the specified Kafka topic before the batch synchronization task starts to read data.
ImportantIf messages whose timestamps are earlier than or equal to the start offset of your batch synchronization task are written to the specified Kafka topic after the task starts to synchronize data from Kafka, the data may be missed. In this case, you must pay attention to the data missing risks caused by latency or timestamp disorder of data transferred to the Kafka topic.
Configure the rerun attribute. We recommend that you select the Auto Rerun upon Error check box and set the Number of re-runs parameter to 3 and the Rerun interval parameter to 2. This way, if an error occurs on the batch synchronization task, the task can automatically rerun.
Configure a resource group for scheduling.
You need to select a resource group for scheduling. We recommend that you use an exclusive resource group for scheduling.
Configure scheduling dependencies.
The batch synchronization task configured in this example does not depend on other tasks. You need to only click Add Root Node in the Dependencies section of the Properties tab to configure scheduling dependencies for the task.
Configure a resource group for Data Integration
You can select the resource group for Data Integration that is connected to the Kafka and MaxCompute data sources.
Test the task
After the preceding configuration is complete, you can test the batch synchronization task on the DataStudio page to check whether the task can run as expected.
Click the icon in the top toolbar of the configuration tab of the batch synchronization task. In the dialog box that appears, assign values to the
${startTime}
,${endTime}
, and${partition}
scheduling parameters and select a resource group for scheduling.Check the running results of the batch synchronization task.
Create an ad hoc query task and execute the following statements to check whether all data that is read from Kafka is correctly written to MaxCompute:
select * from test_project.test_table where ds=2022112200 limit 10; select count(*) from test_project.test_table where ds=2022112200;
Commit and deploy the batch synchronization task
If the test on the batch synchronization task is successful, you can save the configurations of the task and commit and deploy the task to Operation Center. The batch synchronization task periodically synchronizes data from Kafka to MaxCompute. For information about how to deploy a task, see Deploy nodes.
Appendix: Generate the schema of a destination table
After you click Generate Destination Table Schema when you configure the destination, the statement that can be used to create a destination MaxCompute table is generated. The statement defines the name of the table and the fields in the table.
The destination MaxCompute table name defined in the statement is the same as the name of the Kafka topic that you specified.
The destination MaxCompute table contains six fields, and the fields map to the following Kafka fields.
Field name
Description
__key__
The key of a Kafka message.
__value__
The value of a Kafka message.
__partition__
The ID of the partition to which a Kafka message belongs. The ID is an integer that starts from 0.
__headers__
The headers of a Kafka message.
__offset__
The sequence number that is assigned to a Kafka message when the message is transferred to a partition. The sequence number is an integer that starts from 0.
__timestamp__
The timestamp of a Kafka message. The timestamp is a 13-digit integer, in milliseconds.
By default, the lifecycle of a MaxCompute table is 100 years.
You can modify the items defined in the statement based on your business requirements. In addition, you can configure the batch synchronization task to parse the JSON-formatted values of Kafka messages and add the fields obtained from the parsing results to the default table creation statement based on your business requirements.