This topic describes how to use API operations to create, modify, and delete a batch synchronization task.
Prerequisites
A Maven project is created. For more information, see Create a Maven project.
A workflow is created. For more information, see Create an auto triggered workflow.
Data sources are created and added to DataWorks for data synchronization. For more information, see Add a data source.
Limits
You can use only the code editor to configure data synchronization tasks that are created by calling the CreateDISyncTask operation. For more information, see Configure a batch synchronization task by using the code editor.
DataWorks does not allow you to create workflows by using API operations. You can use an API operation to create a synchronization task only in an existing workflow.
Configure environment dependencies and perform account authentication
Configure Maven dependencies.
Open the pom.xml file of your Maven project and add
aliyun-java-sdk-core
to the file.<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.20</version> </dependency>
Open the pom.xml file of the Maven project and add
aliyun-java-sdk-dataworks-public
to the file.<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>3.3.18</version> </dependency>
Open the pom.xml file of the Maven project and add
credentials-java
to the file. We recommend that you use the latest version of credentials-java.<dependency> <groupId>com.aliyun</groupId> <artifactId>credentials-java</artifactId> <version>0.2.11</version> </dependency>
NoteYou can use the Alibaba Cloud Credentials tool to manage your AccessKey pair. For information about how to use the Alibaba Cloud Credentials tool, see Configure credentials.
Authenticate an account.
Before you can use an API operation to create a data synchronization task, you must run the following code to authenticate the Alibaba Cloud account that you want to use to access DataWorks. If the account passes the authentication, you can perform subsequent operations. If the account fails the authentication, an error is returned, and you must resolve the issue based on the error.
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
Overview
After you complete the preceding preparations, you can call API operations to perform the following steps:
Procedure
Create a data synchronization task.
You can call the CreateDISyncTask operation to create a data synchronization task. The following sample code provides the settings of some parameters. For more information about parameters, see CreateDISyncTask.
public void createFile() throws ClientException{ CreateDISyncTaskRequest request = new CreateDISyncTaskRequest(); request.setProjectId(181565L); request.setTaskType("DI_OFFLINE"); request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment of the same table\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_source\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment of the same table\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}"); request.setTaskParam("{\"FileFolderPath\":\"Business Flow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}"); request.setTaskName("new_di_task_0607_1416"); String regionId = "cn-hangzhou"; // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com"); IAcsClient client; client = new DefaultAcsClient(profile); CreateDISyncTaskResponse response1 = client.getAcsResponse(request); Gson gson1 = new Gson(); System.out.println(gson1.toJson(response1)); }
Call the UpdateFile operation to configure scheduling settings for the task.
The following table describes the request parameters of the operation.
Parameter
Type
Required
Example
Description
Action
String
Yes
UpdateFile
The operation that you want to perform.
FileFolderPath
String
No
Business Flow/1/Data Integration/Folder 1/Folder 2
The path of the file.
ProjectId
Long
No
10000
The ID of the DataWorks workspace. You can log on to the DataWorks console and go to the Workspace page to obtain the workspace ID.
FileName
String
No
ods_user_info_d
The name of the file. You can set the FileName parameter to another value to change the file name.
You can call the ListFiles operation to query the ID of the file whose name you want to change. Then, you can set the FileId parameter to the ID and set the FileName parameter to a new value when you call the UpdateFile operation.
FileDescription
String
No
File description
The description of the file.
Content
String
No
SELECT "1";
The code of the file. The code format varies based on the file type. To view the code format for a specific file type, go to Operation Center, right-click a task of the file type, and then select View Code.
AutoRerunTimes
Integer
Yes
3
The number of automatic reruns that are allowed after an error occurs.
AutoRerunIntervalMillis
Integer
No
120000
The interval between two consecutive automatic reruns after an error occurs. Unit: milliseconds. Maximum value: 1800000 (30 minutes).
This parameter corresponds to the Rerun interval parameter that is displayed after the Auto Rerun upon Failure check box is selected in the Schedule section of the Properties tab in the DataWorks console.
The interval that you specify in the DataWorks console is measured in minutes. Pay attention to the conversion between the units of time when you call the operation.
RerunMode
String
No
ALL_ALLOWED
Specifies whether the task that corresponds to the file can be rerun. Valid values:
ALL_ALLOWED: The task can be rerun regardless of whether it is successfully run or fails to run.
FAILURE_ALLOWED: The task can be rerun only after it fails to run.
ALL_DENIED: The task cannot be rerun regardless of whether it is successfully run or fails to run.
This parameter is equivalent to the Rerun parameter in the Schedule section of the Properties panel in the DataWorks console.
Stop
Boolean
No
false
Specifies whether to suspend the scheduling of the task. Valid values:
true
false
Setting this parameter to true is equivalent to setting the Recurrence parameter to Skip Execution in the Schedule section of the Properties tab in the DataWorks console.
ParaValue
String
No
x=a y=b z=c
The scheduling parameters of the task.
Configuring this parameter is equivalent to configuring scheduling parameters in the Scheduling Parameter section of the Properties tab in the DataWorks console. For more information, see Configure scheduling parameters.
StartEffectDate
Long
No
936923400000
The start time of automatic scheduling. Set this parameter to a UNIX timestamp representing the number of milliseconds that have elapsed since January 1, 1970, 00:00:00 UTC.
Configuring this parameter is equivalent to specifying a start time after you select Specified Time for the Validity Period parameter in the Schedule section of the Properties tab in the DataWorks console.
EndEffectDate
Long
No
4155787800000
The end time of automatic scheduling. Set this parameter to a UNIX timestamp representing the number of milliseconds that have elapsed since January 1, 1970, 00:00:00 UTC.
Configuring this parameter is equivalent to specifying an end time after you select Specified Time for the Validity Period parameter in the Schedule section of the Properties tab in the DataWorks console.
CronExpress
String
No
00 00-59/5 1-23 * * ?
The cron expression that represents the periodic scheduling policy of the task. This parameter corresponds to the Cron Expression parameter in the Schedule section of the Properties tab in the DataWorks console. After you configure the Scheduling Cycle and Scheduled time parameters in the DataWorks console, DataWorks automatically generates a value for the Cron Expression parameter.
The following code provides a configuration example:
Cron expression for a task that is scheduled to run at 05:30 every day:
00 30 05 * * ?
Cron expression for a task that is scheduled to run at the fifteenth minute of each hour:
00 15 * * * ?
Cron expression for a task that is scheduled to run every 10 minutes:
00 00/10 * * * ?
Cron expression for a task that is scheduled to run every 10 minutes from 08:00 to 17:00 every day:
00 00-59/10 8-23 * * * ?
Cron expression for a task that is scheduled to run at 00:20 on the first day of each month:
00 20 00 1 * ?
Cron expression for a task that is scheduled to run every three months starting from 00:10 on January 1:
00 10 00 1 1-12/3 ?
Cron expression for a task that is scheduled to run at 00:05 every Tuesday and Friday:
00 05 00 * * 2,5
The scheduling system of DataWorks imposes the following limits on cron expressions:
The minimum interval that can be specified in a cron expression to schedule a task is 5 minutes.
The earliest time a task can be scheduled to run every day is 00:05.
CycleType
String
No
NOT_DAY
The type of the scheduling cycle of the task. Valid values: NOT_DAY and DAY. The value NOT_DAY indicates that the task is scheduled to run by minute or hour. The value DAY indicates that the task is scheduled to run by day, week, or month.
This parameter corresponds to the Scheduling Cycle parameter in the Schedule section of the Properties panel in the DataWorks console.
DependentType
String
No
USER_DEFINE
The type of the cross-cycle scheduling dependency of the task. Valid values:
SELF: The instance generated for the task in the current cycle depends on the instance generated for the task in the previous cycle.
CHILD: The instance generated for the task in the current cycle depends on the instances generated for the descendant tasks at the nearest level of the task in the previous cycle.
USER_DEFINE: The instance generated for the task in the current cycle depends on the instances generated for one or more specified tasks in the previous cycle.
NONE: No cross-cycle scheduling dependency type is specified for the task.
DependentNodeIdList
String
No
5,10,15,20
The ID of the task on which the task that corresponds to the file depends when the DependentType parameter is set to USER_DEFINE. If you specify multiple IDs, separate them with commas (,).
The value of this parameter corresponds to the ID of the task that you specified after you select Other Nodes for the Cross-Cycle Dependency (Original Previous-Cycle Dependency) parameter in the Dependencies section of the Properties tab in the DataWorks console.
InputList
String
No
project_root,project.file1,project.001_out
The output names of the parent files on which the current file depends. If you specify multiple output names, separate them with commas (,).
This parameter corresponds to the Output Name of Ancestor Node parameter in the Parent Nodes subsection of the Dependencies section of the Properties tab in the DataWorks console.
NoteYou must configure this parameter when you call the CreateDISyncTask or UpdateFile operation to create a batch synchronization task.
ProjectIdentifier
String
No
dw_project
The name of the DataWorks workspace. You can log on to the DataWorks console and go to the Workspace page to obtain the workspace name.
You must configure either this parameter or the ProjectId parameter to determine the DataWorks workspace to which the operation is applied.
FileId
Long
Yes
100000001
The ID of the file. You can call the ListFiles operation to obtain the ID.
OutputList
String
No
dw_project.ods_user_info_d
The output name of the file.
This parameter corresponds to the Output Name parameter in the Output Name of Current Node subsection of the Dependencies section of the Properties tab in the DataWorks console.
ResourceGroupIdentifier
String
No
default_group
The identifier of the resource group that is used to run the task that corresponds to the file. You can call the ListResourceGroups operation to query the available resource groups in the workspace.
ConnectionName
String
No
odps_source
The name of the data source that is used to run the task. You can call the ListDataSources operation to query the available data sources in the workspace.
Owner
String
No
18023848927592
The ID of the file owner.
AutoParsing
Boolean
No
true
Specifies whether to enable the automatic parsing feature for the file. Valid values:
true: enables the automatic parsing feature for the file.
false: disables the automatic parsing feature for the file.
This parameter corresponds to the Parse Input and Output from Code button in the Dependencies section of the Properties tab in the DataWorks console.
SchedulerType
String
No
NORMAL
The scheduling type of the task. Valid values:
NORMAL: The task is an auto triggered task.
MANUAL: The task is a manually triggered task. Manually triggered tasks cannot be automatically scheduled. You can go to the Manually Triggered Workflows pane to view manually triggered tasks.
PAUSE: The task is a paused task.
SKIP: The task is a dry-run task. Dry-run tasks are started as scheduled but the scheduling system sets the status of the tasks to successful when the scheduling system starts to run the tasks.
AdvancedSettings
String
No
{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}
The advanced configurations of the task.
This parameter is valid only for an EMR Spark Streaming task or an EMR Streaming SQL task. This parameter corresponds to the Advanced Settings tab of the task in the DataWorks console.
The value of this parameter must be in the JSON format.
StartImmediately
Boolean
No
true
Specifies whether to run a task immediately after the task is deployed. Valid values:
true: runs the task immediately after the task is deployed.
false: does not run the task immediately after the task is deployed.
This parameter is valid only for an EMR Spark Streaming task or an EMR Streaming SQL task. This parameter corresponds to the Start Method parameter in the Schedule section of the Configure tab in the DataWorks console.
InputParameters
String
No
[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]
The input parameters of the task. The value of this parameter must be in the JSON format. For more information about the input parameters, see the InputContextParameterList parameter in the Response parameters section of the GetFile operation.
This parameter corresponds to the Input Parameters subsection of the Input and Output Parameters section of the Properties tab in the DataWorks console.
OutputParameters
String
No
[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]
The output parameters of the task. The value of this parameter must be in the JSON format. For more information about the output parameters, see the OutputContextParameterList parameter in the Response parameters section of the GetFile operation.
This parameter corresponds to the Output Parameters subsection of the Input and Output Parameters section of the Properties tab in the DataWorks console.
Commit the task.
Call the SubmitFile operation to commit the task to the development environment of the scheduling system. After you commit the task, the system returns a response that contains the ID of the related deployment package. You can call the GetDeployment operation to obtain the details of the deployment package based on the ID.
public void submitFile() throws ClientException{ SubmitFileRequest request = new SubmitFileRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // The ID is the value of the FileId parameter that is returned when you create the task. request.setFileId(501576542L); request.setComment("Comment"); SubmitFileResponse acsResponse = client.getAcsResponse(request); // The ID is the deployment package ID that is returned after you commit or deploy the task. Long deploymentId = acsResponse.getData(); log.info(acsResponse.toString()); }
The preceding sample code provides the settings of some parameters. For more information about parameters, see SubmitFile and GetDeployment.
Deploy the task to the production environment.
Call the DeployFile operation to deploy the task to the production environment.
NoteIf your workspace is in standard mode, you must perform this step.
public void deploy() throws ClientException{ DeployFileRequest request = new DeployFileRequest(); request.setProjectIdentifier("zxy_8221431"); request.setFileId(501576542L); request.setComment("Comment"); // You must configure the NodeId or FileId parameter. The value of NodeId is the value of the Node ID parameter in the General section of the Properties tab of the task in the DataWorks console. request.setNodeId(700004537241L); DeployFileResponse acsResponse = client.getAcsResponse(request); // The ID is the deployment package ID that is returned after you commit or deploy the task. Long deploymentId = acsResponse.getData(); log.info(acsResponse.getData().toString()); }
The preceding sample code provides the settings of some parameters. For more information about parameters, see DeployFile.
Obtain the details of the deployment package.
After you deploy the task, the system returns a response that contains the ID of the related deployment package. You can call the GetDeployment operation to obtain the details of the deployment package based on the ID. If the value of the Status parameter in the response of the GetDeployment operation is 1, the task is deployed.
public void getDeployment() throws ClientException{ GetDeploymentRequest request = new GetDeploymentRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // The ID is the deployment package ID that is returned after you commit or deploy the task. You can call the GetDeployment operation to obtain the details of the related deployment package. request.setDeploymentId(2776067L); GetDeploymentResponse acsResponse = client.getAcsResponse(request); log.info(acsResponse.getData().toString()); }
The preceding sample code provides the settings of some parameters. For more information about parameters, see GetDeployment.
Modify the configuration of the task
After you create and configure a data synchronization task in Data Integration, you can call the UpdateDISyncTask operation to modify the configuration of the task. You can also use the TaskParam parameter to change the exclusive resource group that is used by the task. After you modify the configuration of a data synchronization task, you must commit and deploy the task again. For more information, see the Overview section in this topic.
Delete a data synchronization task
If you no longer require a data synchronization task, you can call the DeleteFile operation to delete the task.
Sample code
POM dependencies
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.20</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>3.4.1</version>
</dependency>
SDK for Java
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;
public class createofflineTask {
static Long createTask(String fileName) throws Exception {
Long projectId = 2043L;
String taskType = "DI_OFFLINE";
String taskContent = "{\n" +
" \"type\": \"job\",\n" +
" \"version\": \"2.0\",\n" +
" \"steps\": [\n" +
" {\n" +
" \"stepType\": \"mysql\",\n" +
" \"parameter\": {\n" +
" \"envType\": 0,\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"connection\": [\n" +
" {\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"table\": [\n" +
" \"user\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"where\": \"\",\n" +
" \"splitPk\": \"\",\n" +
" \"encoding\": \"UTF-8\"\n" +
" },\n" +
" \"name\": \"Reader\",\n" +
" \"category\": \"reader\"\n" +
" },\n" +
" {\n" +
" \"stepType\": \"odps\",\n" +
" \"parameter\": {\n" +
" \"partition\": \"pt=${bizdate}\",\n" +
" \"truncate\": true,\n" +
" \"datasource\": \"odps_source\",\n" +
" \"envType\": 0,\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"emptyAsNull\": false,\n" +
" \"tableComment\": \"null\",\n" +
" \"table\": \"user\"\n" +
" },\n" +
" \"name\": \"Writer\",\n" +
" \"category\": \"writer\"\n" +
" }\n" +
" ],\n" +
" \"setting\": {\n" +
" \"executeMode\": null,\n" +
" \"errorLimit\": {\n" +
" \"record\": \"\"\n" +
" },\n" +
" \"speed\": {\n" +
" \"concurrent\": 2,\n" +
" \"throttle\": false\n" +
" }\n" +
" },\n" +
" \"order\": {\n" +
" \"hops\": [\n" +
" {\n" +
" \"from\": \"Reader\",\n" +
" \"to\": \"Writer\"\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
request.setProjectId(projectId);
request.setTaskType(taskType);
request.setTaskContent(taskContent);
request.setTaskName(fileName);
request.setTaskParam("{\"FileFolderPath\":\"Business Flow/Test/Data Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
// Use an exclusive resource group for Data Integration.
CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
return response1.getData().getFileId();
}
public static void updateFile(Long fileId) throws Exception {
UpdateFileRequest request = new UpdateFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
request.setAutoRerunTimes(3);
request.setRerunMode("FAILURE_ALLOWED");
request.setCronExpress("00 30 05 * * ?");
request.setCycleType("DAY");
request.setResourceGroupIdentifier("S_res_group_XXX");
// Use an exclusive resource group for scheduling.
request.setInputList("dataworks_di_autotest_root");
request.setAutoParsing(true);
request.setDependentNodeIdList("5,10,15,20");
request.setDependentType("SELF");
request.setStartEffectDate(0L);
request.setEndEffectDate(4155787800000L);
request.setFileDescription("description");
request.setStop(false);
request.setParaValue("x=a y=b z=c");
request.setSchedulerType("NORMAL");
request.setAutoRerunIntervalMillis(120000);
UpdateFileResponse response1 = client.getAcsResponse(request);
}
static void deleteTask(Long fileId) throws Exception {
DeleteFileRequest request = new DeleteFileRequest();
Long projectId = 63845L;
request.setProjectId(projectId);
request.setFileId(fileId);
String akId = "XXX";
String akSecret = "XXXX";
String regionId = "cn-hangzhou";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
IAcsClient client;
client = new DefaultAcsClient(profile);
DeleteFileResponse response1 = client.getAcsResponse(request);
System.out.println(JSONObject.toJSONString(response1));
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XX";
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String taskName = "offline_job_0930_1648";
Long fileId = createTask(taskName); // Create a data synchronization task.
updateFile(fileId); // Configure scheduling properties for the data synchronization task.
}
}