This topic describes how to use Open API to create, update, and delete Data Integration tasks that synchronize data from a source to a destination.
Prerequisites
A workflow is created. For more information, see Create a scheduled workflow.
The data sources required for the synchronization task are created.
Limitations
You can call the CreateDISyncTask operation to create a Data Integration task. You can configure the task content only in the code editor.
You cannot use Open API to create a workflow in DataWorks. Instead, you must use an existing workflow for the data synchronization task.
Obtain an SDK
Obtain the latest SDK from the Alibaba Cloud Open API Portal.
Procedure
After setting up your environment, follow these steps to create and manage a data synchronization task with the DataWorks API:
Steps
Create a data integration task.
Call the CreateDISyncTask operation to create the task. The following sample code shows how to configure several key parameters. For more information about the parameters, see CreateDISyncTask.
public void createFile() throws ClientException{ CreateDISyncTaskRequest request = new CreateDISyncTaskRequest(); request.setProjectId(181565L); request.setTaskType("DI_OFFLINE"); request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment for the table same\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_source\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment for the table same\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}"); request.setTaskParam("{\"FileFolderPath\":\"Workflow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}"); request.setTaskName("new_di_task_0607_1416"); String regionId = "cn-hangzhou"; // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com"); IAcsClient client; client = new DefaultAcsClient(profile); CreateDISyncTaskResponse response1 = client.getAcsResponse(request); Gson gson1 = new Gson(); System.out.println(gson1.toJson(response1)); }Call the UpdateFile operation to update the scheduling parameters.
The following table describes the request parameters.
Name
Type
Required
Example
Description
Action
String
Yes
UpdateFile
The operation to perform.
FileFolderPath
String
No
Workflows/MyFirstWorkflow/DataIntegration/Folder1/Folder2
The path of the file.
ProjectId
Long
No
10000
The ID of the DataWorks workspace. Go to the workspace management page in the DataWorks Console to find the workspace ID.
FileName
String
No
ods_user_info_d
The name of the file. To rename a file, set a new value for this parameter.
You can find the required
FileIdby using theListFilesoperation.FileDescription
String
No
This is a file description
The description of the file.
Content
String
No
SELECT "1";
The code of the file. The code format varies based on the file type (
fileType). To see an example, right-click a task in Operation Center and select View Code.AutoRerunTimes
Integer
Yes
3
The number of times to automatically rerun the task after an error.
AutoRerunIntervalMillis
Integer
No
120000
The interval between automatic reruns, in milliseconds. The maximum value is 1,800,000 (30 minutes).
This parameter corresponds to rerun interval on the Scheduling tab in the DataWorks Console.
The console displays this value in minutes. Ensure that you convert the unit when using the API.
RerunMode
String
No
ALL_ALLOWED
The rerun policy. Valid values:
ALL_ALLOWED: Reruns are allowed after both successful and failed runs.
FAILURE_ALLOWED: Reruns are allowed only after a failed run.
ALL_DENIED: Reruns are not allowed.
This parameter corresponds to rerun property on the Scheduling tab in the DataWorks console.
Stop
Boolean
No
false
Specifies whether to pause the scheduling of the task. Valid values:
true: Pauses the scheduling.
false: Does not pause the scheduling.
This parameter corresponds to the pause scheduling setting on the Scheduling tab in the DataWorks Console.
ParaValue
String
No
x=a y=b z=c
The scheduling parameters.
This parameter corresponds to parameters on the Scheduling tab in the DataWorks Console. For information about how to configure scheduling parameters, see Configure scheduling parameters.
StartEffectDate
Long
No
936923400000
The time to start automatic scheduling, specified as a Unix timestamp in milliseconds.
This parameter corresponds to the start time specified for effective date on the Scheduling tab in the DataWorks console.
EndEffectDate
Long
No
4155787800000
The time to stop automatic scheduling, specified as a Unix timestamp in milliseconds.
This parameter corresponds to the start time specified for effective date on the Scheduling tab in the DataWorks console.
CronExpress
String
No
00 00-59/5 1-23 * * ?
The cron expression for periodic scheduling. This parameter corresponds to cron expression on the Scheduling tab in the DataWorks Console. After you configure the schedule and scheduled time, DataWorks automatically generates a cron expression.
Examples:
Run the task at 05:30 every day:
00 30 05 * * ?.Run the task at 15 minutes past every hour:
00 15 * * * ?.Run the task every 10 minutes:
00 00/10 * * * ?.Run the task every 10 minutes from 08:00 to 17:00 every day:
00 00-59/10 8-17 * * * ?.Run the task at 00:20 on the first day of each month:
00 20 00 1 * ?.Run the task every 3 months, starting from 00:10 on January 1:
00 10 00 1 1-12/3 ?.Run the task at 00:05 on every Tuesday and Friday:
00 05 00 * * 2,5.
The DataWorks scheduling system has the following limits for cron expressions:
The minimum scheduling interval is 5 minutes.
The earliest time that a task can be scheduled to run is 00:05 each day.
CycleType
String
No
NOT_DAY
The type of the scheduling cycle. Valid values are
NOT_DAY(minute or hour) andDAY(day, week, or month).This parameter corresponds to the schedule on the Scheduling tab in the DataWorks Console.
DependentType
String
No
USER_DEFINE
The cross-cycle dependency mode. Valid values:
SELF: The current node is selected as the dependent node.
CHILD: The first-level child nodes are selected as dependent nodes.
USER_DEFINE: Other nodes are selected as dependent nodes.
NONE: No dependent nodes are selected. The task does not depend on the previous cycle.
DependentNodeIdList
String
No
5,10,15,20
If
DependentTypeisUSER_DEFINE, this parameter is required. Specify the IDs of dependent nodes, separated by commas (,).
This parameter corresponds to the settings for USER_DEFINE that appear when you configure cross-cycle dependency on the Scheduling tab in the DataWorks console.InputList
String
No
project_root,project.file1,project.001_out
The output names of the upstream nodes this node depends on. Separate multiple names with commas (,).
This parameter corresponds to upstream node output name on the Scheduling tab in the DataWorks console.NoteThis parameter is required when creating a batch synchronization task with the
CreateDISyncTaskorUpdateFileoperation.ProjectIdentifier
String
No
dw_project
The name of the DataWorks workspace. You can log on to the DataWorks Console and go to the workspace configuration page to obtain the workspace name.
You must set either this parameter or theProjectIdparameter to specify the DataWorks workspace.FileId
Long
Yes
100000001
The ID of the file. You can call the
ListFilesoperation to obtain the file ID.OutputList
String
No
dw_project.ods_user_info_d
The output name of the file.
This parameter corresponds to current node output name on the Scheduling tab in the DataWorks console.ResourceGroupIdentifier
String
No
default_group
The resource group where the task runs after it is deployed. You can call the
ListResourceGroupsoperation to obtain the available resource groups in the workspace.ConnectionName
String
No
odps_source
The identifier of the data source that the task uses for execution. You can call the
ListDataSourcesoperation to obtain the list of available data sources.Owner
String
No
18023848927592
The user ID of the file owner.
AutoParsing
Boolean
No
true
Specifies whether to enable automatic parsing for the file. Valid values:
true: The file automatically parses code.false: The file does not automatically parse code.
This parameter corresponds to parse input/output from code on the Scheduling tab in the DataWorks Console.
SchedulerType
String
No
NORMAL
The type of scheduling. Valid values:
NORMAL: A normally scheduled task.
MANUAL: A manual task. It runs only when manually triggered and is not scheduled automatically.
PAUSE: A paused task.
SKIP: A dry-run task. Dry-run tasks are scheduled on a regular basis, but the tasks are immediately set to successful when the scheduling starts.
AdvancedSettings
String
No
{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}
The advanced settings for the task.
This parameter corresponds to advanced settings on the editing page of an EMR Spark Streaming or EMR Streaming SQL task in the DataWorks Console.
Currently, this parameter is supported only for EMR Spark Streaming and EMR Streaming SQL tasks. The parameter value must be in the JSON format.
StartImmediately
Boolean
No
true
Specifies whether to start the task immediately after it is deployed. Valid values:
true: Starts the task immediately after it is deployed.
false: Does not start the task immediately after it is deployed.
This parameter corresponds to the startup mode on the Scheduling tab on the editing page of an EMR Spark Streaming or EMR Streaming SQL task in the DataWorks Console.
InputParameters
String
No
[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]
The context input parameters for the node. The value is a JSON string. For information about the fields, see the
InputContextParameterListparameter returned by theGetFileoperation.This parameter corresponds to current node input parameters on the Scheduling tab in the DataWorks Console.
OutputParameters
String
No
[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]
The context output parameters for the node. The value is a JSON string. For information about the fields, see the
OutputContextParameterListparameter returned by theGetFileoperation.
This parameter corresponds to current node output parameters on the Scheduling tab in the DataWorks Console.Submit the Data Integration task.
Call the
SubmitFileoperation to submit the Data Integration task to the development environment of the scheduling system. The response returns adeploymentId. Use this ID with theGetDeploymentoperation to retrieve deployment details.public void submitFile() throws ClientException{ SubmitFileRequest request = new SubmitFileRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // This node ID is the ID that is returned when you create the node. It corresponds to the file_id in the File table of the database. request.setFileId(501576542L); request.setComment("Comment"); SubmitFileResponse acsResponse = client.getAcsResponse(request); // The DeploymentId is the return value of the commit or publish operation. Long deploymentId = acsResponse.getData(); log.info(acsResponse.toString()); }The preceding code provides an example of how to configure some of the parameters. For more information about the parameters, see SubmitFile and GetDeployment.
Deploy the synchronization task to the production environment.
Call the
DeployFileoperation to publish the Data Integration synchronization task to the production environment.NoteThis operation applies only to workspaces in Standard mode.
public void deploy() throws ClientException{ DeployFileRequest request = new DeployFileRequest(); request.setProjectIdentifier("zxy_8221431"); request.setFileId(501576542L); request.setComment("Comment"); // Specify either NodeId or file_id. The value of NodeId is the node ID in the basic properties of the scheduling configuration. request.setNodeId(700004537241L); DeployFileResponse acsResponse = client.getAcsResponse(request); // The DeploymentId is the return value of the commit or publish operation. Long deploymentId = acsResponse.getData(); log.info(acsResponse.getData().toString()); }The preceding code provides an example of how to configure some of the parameters. For more information about the parameters, see DeployFile.
Obtain the details of the deployment package.
The operation returns a
deploymentId. Use this ID with theGetDeploymentoperation to check the deployment status. AStatusof1indicates a successful deployment.public void getDeployment() throws ClientException{ GetDeploymentRequest request = new GetDeploymentRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // The DeploymentId is the return value of the commit or publish operation. Call the GetDeployment operation to get the details of this deployment. request.setDeploymentId(2776067L); GetDeploymentResponse acsResponse = client.getAcsResponse(request); log.info(acsResponse.getData().toString()); }The preceding code provides an example of how to configure some of the parameters. For more information about the parameters, see GetDeployment.
Modify the configuration of a synchronization task
To modify a task, call the UpdateDISyncTask operation to update the Content of the task or update the exclusive resource group by using the TaskParam parameter. After updating the task, you must submit and deploy it again. For more information, see Procedure.
Delete a synchronization task
To delete a Data Integration synchronization task, call the DeleteFile operation.
Sample code
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks_public20200518.Client;
import com.aliyun.dataworks_public20200518.models.*;
import com.aliyun.teaopenapi.models.Config;
public class createofflineTask {
static Long createTask(String fileName) throws Exception {
Long projectId = 2043L;
String taskType = "DI_OFFLINE";
String taskContent = "{\n" +
" \"type\": \"job\",\n" +
" \"version\": \"2.0\",\n" +
" \"steps\": [\n" +
" {\n" +
" \"stepType\": \"mysql\",\n" +
" \"parameter\": {\n" +
" \"envType\": 0,\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"connection\": [\n" +
" {\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"table\": [\n" +
" \"user\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"where\": \"\",\n" +
" \"splitPk\": \"\",\n" +
" \"encoding\": \"UTF-8\"\n" +
" },\n" +
" \"name\": \"Reader\",\n" +
" \"category\": \"reader\"\n" +
" },\n" +
" {\n" +
" \"stepType\": \"odps\",\n" +
" \"parameter\": {\n" +
" \"partition\": \"pt=${bizdate}\",\n" +
" \"truncate\": true,\n" +
" \"datasource\": \"odps_source\",\n" +
" \"envType\": 0,\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"emptyAsNull\": false,\n" +
" \"tableComment\": \"null\",\n" +
" \"table\": \"user\"\n" +
" },\n" +
" \"name\": \"Writer\",\n" +
" \"category\": \"writer\"\n" +
" }\n" +
" ],\n" +
" \"setting\": {\n" +
" \"executeMode\": null,\n" +
" \"errorLimit\": {\n" +
" \"record\": \"\"\n" +
" },\n" +
" \"speed\": {\n" +
" \"concurrent\": 2,\n" +
" \"throttle\": false\n" +
" }\n" +
" },\n" +
" \"order\": {\n" +
" \"hops\": [\n" +
" {\n" +
" \"from\": \"Reader\",\n" +
" \"to\": \"Writer\"\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
request.setProjectId(projectId);
request.setTaskType(taskType);
request.setTaskContent(taskContent);
request.setTaskName(fileName);
request.setTaskParam("{\"FileFolderPath\":\"Workflow/Automated_Testing_Workspace_Do_Not_Modify/Data_Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
// Use an exclusive resource group for Data Integration.
CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
return response1.getData().getFileId();
}
public static void updateFile(Long fileId) throws Exception {
UpdateFileRequest request = new UpdateFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
request.setAutoRerunTimes(3);
request.setRerunMode("FAILURE_ALLOWED");
request.setCronExpress("00 30 05 * * ?");
request.setCycleType("DAY");
request.setResourceGroupIdentifier("S_res_group_XXX");
// Use an exclusive resource group for scheduling.
request.setInputList("dataworks_di_autotest_root");
request.setAutoParsing(true);
request.setDependentNodeIdList("5,10,15,20");
request.setDependentType("SELF");
request.setStartEffectDate(0L);
request.setEndEffectDate(4155787800000L);
request.setFileDescription("description");
request.setStop(false);
request.setParaValue("x=a y=b z=c");
request.setSchedulerType("NORMAL");
request.setAutoRerunIntervalMillis(120000);
UpdateFileResponse response1 = client.getAcsResponse(request);
}
static void deleteTask(Long fileId) throws Exception {
DeleteFileRequest request = new DeleteFileRequest();
Long projectId = 63845L;
request.setProjectId(projectId);
request.setFileId(fileId);
String akId = "XXX";
String akSecret = "XXXX";
String regionId = "cn-hangzhou";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
IAcsClient client;
client = new DefaultAcsClient(profile);
DeleteFileResponse response1 = client.getAcsResponse(request);
System.out.println(JSONObject.toJSONString(response1));
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XX";
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String taskName = "offline_job_0930_1648";
Long fileId = createTask(taskName); // Create a data integration task.
updateFile(fileId); // Modify the scheduling properties of the data integration task.
}
}