All Products
Search
Document Center

DataWorks:Use API operations to create, modify, and delete a batch synchronization task

Last Updated:Jul 03, 2024

This topic describes how to use API operations to create, modify, and delete a batch synchronization task.

Prerequisites

Limits

Configure environment dependencies and perform account authentication

  • Configure Maven dependencies.

    1. Open the pom.xml file of your Maven project and add aliyun-java-sdk-core to the file.

      <dependency>
       <groupId>com.aliyun</groupId>
       <artifactId>aliyun-java-sdk-core</artifactId>
       <version>4.5.20</version>
      </dependency>
    2. Open the pom.xml file of the Maven project and add aliyun-java-sdk-dataworks-public to the file.

      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>3.3.18</version>
      </dependency>
    3. Open the pom.xml file of the Maven project and add credentials-java to the file. We recommend that you use the latest version of credentials-java.

      <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>credentials-java</artifactId>
          <version>0.2.11</version>
      </dependency>
      Note

      You can use the Alibaba Cloud Credentials tool to manage your AccessKey pair. For information about how to use the Alibaba Cloud Credentials tool, see Configure credentials.

  • Authenticate an account.

    Before you can use an API operation to create a data synchronization task, you must run the following code to authenticate the Alibaba Cloud account that you want to use to access DataWorks. If the account passes the authentication, you can perform subsequent operations. If the account fails the authentication, an error is returned, and you must resolve the issue based on the error.

    // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

Overview

Procedure

  1. Create a data synchronization task.

    You can call the CreateDISyncTask operation to create a data synchronization task. The following sample code provides the settings of some parameters. For more information about parameters, see CreateDISyncTask.

    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment of the same table\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_source\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment of the same table\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"Business Flow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String regionId = "cn-hangzhou";
            // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. Call the UpdateFile operation to configure scheduling settings for the task.

    The following table describes the request parameters of the operation.

    Parameter

    Type

    Required

    Example

    Description

    Action

    String

    Yes

    UpdateFile

    The operation that you want to perform.

    FileFolderPath

    String

    No

    Business Flow/1/Data Integration/Folder 1/Folder 2

    The path of the file.

    ProjectId

    Long

    No

    10000

    The ID of the DataWorks workspace. You can log on to the DataWorks console and go to the Workspace page to obtain the workspace ID.

    FileName

    String

    No

    ods_user_info_d

    The name of the file. You can set the FileName parameter to another value to change the file name.

    You can call the ListFiles operation to query the ID of the file whose name you want to change. Then, you can set the FileId parameter to the ID and set the FileName parameter to a new value when you call the UpdateFile operation.

    FileDescription

    String

    No

    File description

    The description of the file.

    Content

    String

    No

    SELECT "1";

    The code of the file. The code format varies based on the file type. To view the code format for a specific file type, go to Operation Center, right-click a task of the file type, and then select View Code.

    AutoRerunTimes

    Integer

    Yes

    3

    The number of automatic reruns that are allowed after an error occurs.

    AutoRerunIntervalMillis

    Integer

    No

    120000

    The interval between two consecutive automatic reruns after an error occurs. Unit: milliseconds. Maximum value: 1800000 (30 minutes).

    This parameter corresponds to the Rerun interval parameter that is displayed after the Auto Rerun upon Failure check box is selected in the Schedule section of the Properties tab in the DataWorks console.

    The interval that you specify in the DataWorks console is measured in minutes. Pay attention to the conversion between the units of time when you call the operation.

    RerunMode

    String

    No

    ALL_ALLOWED

    Specifies whether the task that corresponds to the file can be rerun. Valid values:

    • ALL_ALLOWED: The task can be rerun regardless of whether it is successfully run or fails to run.

    • FAILURE_ALLOWED: The task can be rerun only after it fails to run.

    • ALL_DENIED: The task cannot be rerun regardless of whether it is successfully run or fails to run.

    This parameter is equivalent to the Rerun parameter in the Schedule section of the Properties panel in the DataWorks console.

    Stop

    Boolean

    No

    false

    Specifies whether to suspend the scheduling of the task. Valid values:

    • true

    • false

    Setting this parameter to true is equivalent to setting the Recurrence parameter to Skip Execution in the Schedule section of the Properties tab in the DataWorks console.

    ParaValue

    String

    No

    x=a y=b z=c

    The scheduling parameters of the task.

    Configuring this parameter is equivalent to configuring scheduling parameters in the Scheduling Parameter section of the Properties tab in the DataWorks console. For more information, see Configure scheduling parameters.

    StartEffectDate

    Long

    No

    936923400000

    The start time of automatic scheduling. Set this parameter to a UNIX timestamp representing the number of milliseconds that have elapsed since January 1, 1970, 00:00:00 UTC.

    Configuring this parameter is equivalent to specifying a start time after you select Specified Time for the Validity Period parameter in the Schedule section of the Properties tab in the DataWorks console.

    EndEffectDate

    Long

    No

    4155787800000

    The end time of automatic scheduling. Set this parameter to a UNIX timestamp representing the number of milliseconds that have elapsed since January 1, 1970, 00:00:00 UTC.

    Configuring this parameter is equivalent to specifying an end time after you select Specified Time for the Validity Period parameter in the Schedule section of the Properties tab in the DataWorks console.

    CronExpress

    String

    No

    00 00-59/5 1-23 * * ?

    The cron expression that represents the periodic scheduling policy of the task. This parameter corresponds to the Cron Expression parameter in the Schedule section of the Properties tab in the DataWorks console. After you configure the Scheduling Cycle and Scheduled time parameters in the DataWorks console, DataWorks automatically generates a value for the Cron Expression parameter.

    The following code provides a configuration example:

    • Cron expression for a task that is scheduled to run at 05:30 every day: 00 30 05 * * ?

    • Cron expression for a task that is scheduled to run at the fifteenth minute of each hour: 00 15 * * * ?

    • Cron expression for a task that is scheduled to run every 10 minutes: 00 00/10 * * * ?

    • Cron expression for a task that is scheduled to run every 10 minutes from 08:00 to 17:00 every day: 00 00-59/10 8-23 * * * ?

    • Cron expression for a task that is scheduled to run at 00:20 on the first day of each month: 00 20 00 1 * ?

    • Cron expression for a task that is scheduled to run every three months starting from 00:10 on January 1: 00 10 00 1 1-12/3 ?

    • Cron expression for a task that is scheduled to run at 00:05 every Tuesday and Friday: 00 05 00 * * 2,5

    The scheduling system of DataWorks imposes the following limits on cron expressions:

    • The minimum interval that can be specified in a cron expression to schedule a task is 5 minutes.

    • The earliest time a task can be scheduled to run every day is 00:05.

    CycleType

    String

    No

    NOT_DAY

    The type of the scheduling cycle of the task. Valid values: NOT_DAY and DAY. The value NOT_DAY indicates that the task is scheduled to run by minute or hour. The value DAY indicates that the task is scheduled to run by day, week, or month.

    This parameter corresponds to the Scheduling Cycle parameter in the Schedule section of the Properties panel in the DataWorks console.

    DependentType

    String

    No

    USER_DEFINE

    The type of the cross-cycle scheduling dependency of the task. Valid values:

    • SELF: The instance generated for the task in the current cycle depends on the instance generated for the task in the previous cycle.

    • CHILD: The instance generated for the task in the current cycle depends on the instances generated for the descendant tasks at the nearest level of the task in the previous cycle.

    • USER_DEFINE: The instance generated for the task in the current cycle depends on the instances generated for one or more specified tasks in the previous cycle.

    • NONE: No cross-cycle scheduling dependency type is specified for the task.

    DependentNodeIdList

    String

    No

    5,10,15,20

    The ID of the task on which the task that corresponds to the file depends when the DependentType parameter is set to USER_DEFINE. If you specify multiple IDs, separate them with commas (,).

    The value of this parameter corresponds to the ID of the task that you specified after you select Other Nodes for the Cross-Cycle Dependency (Original Previous-Cycle Dependency) parameter in the Dependencies section of the Properties tab in the DataWorks console.

    InputList

    String

    No

    project_root,project.file1,project.001_out

    The output names of the parent files on which the current file depends. If you specify multiple output names, separate them with commas (,).

    This parameter corresponds to the Output Name of Ancestor Node parameter in the Parent Nodes subsection of the Dependencies section of the Properties tab in the DataWorks console.

    Note

    You must configure this parameter when you call the CreateDISyncTask or UpdateFile operation to create a batch synchronization task.

    ProjectIdentifier

    String

    No

    dw_project

    The name of the DataWorks workspace. You can log on to the DataWorks console and go to the Workspace page to obtain the workspace name.

    You must configure either this parameter or the ProjectId parameter to determine the DataWorks workspace to which the operation is applied.

    FileId

    Long

    Yes

    100000001

    The ID of the file. You can call the ListFiles operation to obtain the ID.

    OutputList

    String

    No

    dw_project.ods_user_info_d

    The output name of the file.

    This parameter corresponds to the Output Name parameter in the Output Name of Current Node subsection of the Dependencies section of the Properties tab in the DataWorks console.

    ResourceGroupIdentifier

    String

    No

    default_group

    The identifier of the resource group that is used to run the task that corresponds to the file. You can call the ListResourceGroups operation to query the available resource groups in the workspace.

    ConnectionName

    String

    No

    odps_source

    The name of the data source that is used to run the task. You can call the ListDataSources operation to query the available data sources in the workspace.

    Owner

    String

    No

    18023848927592

    The ID of the file owner.

    AutoParsing

    Boolean

    No

    true

    Specifies whether to enable the automatic parsing feature for the file. Valid values:

    • true: enables the automatic parsing feature for the file.

    • false: disables the automatic parsing feature for the file.

    This parameter corresponds to the Parse Input and Output from Code button in the Dependencies section of the Properties tab in the DataWorks console.

    SchedulerType

    String

    No

    NORMAL

    The scheduling type of the task. Valid values:

    • NORMAL: The task is an auto triggered task.

    • MANUAL: The task is a manually triggered task. Manually triggered tasks cannot be automatically scheduled. You can go to the Manually Triggered Workflows pane to view manually triggered tasks.

    • PAUSE: The task is a paused task.

    • SKIP: The task is a dry-run task. Dry-run tasks are started as scheduled but the scheduling system sets the status of the tasks to successful when the scheduling system starts to run the tasks.

    AdvancedSettings

    String

    No

    {"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}

    The advanced configurations of the task.

    This parameter is valid only for an EMR Spark Streaming task or an EMR Streaming SQL task. This parameter corresponds to the Advanced Settings tab of the task in the DataWorks console.

    The value of this parameter must be in the JSON format.

    StartImmediately

    Boolean

    No

    true

    Specifies whether to run a task immediately after the task is deployed. Valid values:

    • true: runs the task immediately after the task is deployed.

    • false: does not run the task immediately after the task is deployed.

    This parameter is valid only for an EMR Spark Streaming task or an EMR Streaming SQL task. This parameter corresponds to the Start Method parameter in the Schedule section of the Configure tab in the DataWorks console.

    InputParameters

    String

    No

    [{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]

    The input parameters of the task. The value of this parameter must be in the JSON format. For more information about the input parameters, see the InputContextParameterList parameter in the Response parameters section of the GetFile operation.

    This parameter corresponds to the Input Parameters subsection of the Input and Output Parameters section of the Properties tab in the DataWorks console.

    OutputParameters

    String

    No

    [{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]

    The output parameters of the task. The value of this parameter must be in the JSON format. For more information about the output parameters, see the OutputContextParameterList parameter in the Response parameters section of the GetFile operation.

    This parameter corresponds to the Output Parameters subsection of the Input and Output Parameters section of the Properties tab in the DataWorks console.

  3. Commit the task.

    Call the SubmitFile operation to commit the task to the development environment of the scheduling system. After you commit the task, the system returns a response that contains the ID of the related deployment package. You can call the GetDeployment operation to obtain the details of the deployment package based on the ID.

     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The ID is the value of the FileId parameter that is returned when you create the task. 
            request.setFileId(501576542L);
            request.setComment("Comment");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
          // The ID is the deployment package ID that is returned after you commit or deploy the task. 
          Long deploymentId = acsResponse.getData();
            log.info(acsResponse.toString());
    }

    The preceding sample code provides the settings of some parameters. For more information about parameters, see SubmitFile and GetDeployment.

  4. Deploy the task to the production environment.

    Call the DeployFile operation to deploy the task to the production environment.

    Note

    If your workspace is in standard mode, you must perform this step.

     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("Comment");
            // You must configure the NodeId or FileId parameter. The value of NodeId is the value of the Node ID parameter in the General section of the Properties tab of the task in the DataWorks console. 
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
             // The ID is the deployment package ID that is returned after you commit or deploy the task. 
            Long deploymentId = acsResponse.getData();
            log.info(acsResponse.getData().toString());
        }

    The preceding sample code provides the settings of some parameters. For more information about parameters, see DeployFile.

  5. Obtain the details of the deployment package.

    After you deploy the task, the system returns a response that contains the ID of the related deployment package. You can call the GetDeployment operation to obtain the details of the deployment package based on the ID. If the value of the Status parameter in the response of the GetDeployment operation is 1, the task is deployed.

    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The ID is the deployment package ID that is returned after you commit or deploy the task. You can call the GetDeployment operation to obtain the details of the related deployment package. 
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }

    The preceding sample code provides the settings of some parameters. For more information about parameters, see GetDeployment.

Modify the configuration of the task

After you create and configure a data synchronization task in Data Integration, you can call the UpdateDISyncTask operation to modify the configuration of the task. You can also use the TaskParam parameter to change the exclusive resource group that is used by the task. After you modify the configuration of a data synchronization task, you must commit and deploy the task again. For more information, see the Overview section in this topic.

Delete a data synchronization task

If you no longer require a data synchronization task, you can call the DeleteFile operation to delete the task.

Sample code

POM dependencies

<dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>3.4.1</version>
 </dependency>

SDK for Java

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_source\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"Business Flow/Test/Data Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // Use an exclusive resource group for Data Integration. 
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // Use an exclusive resource group for scheduling.
        request.setInputList("dataworks_di_autotest_root");

        request.setAutoParsing(true);
        request.setDependentNodeIdList("5,10,15,20");
        request.setDependentType("SELF");
        request.setStartEffectDate(0L);
        request.setEndEffectDate(4155787800000L);
        request.setFileDescription("description");
        request.setStop(false);
        request.setParaValue("x=a y=b z=c");
        request.setSchedulerType("NORMAL");
        request.setAutoRerunIntervalMillis(120000);
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }
    static void deleteTask(Long fileId) throws Exception {
        DeleteFileRequest request = new DeleteFileRequest();
        Long projectId = 63845L;
        request.setProjectId(projectId);
        request.setFileId(fileId);


        String akId = "XXX";
        String akSecret = "XXXX";
        String regionId = "cn-hangzhou";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
        IAcsClient client;

        client = new DefaultAcsClient(profile);

        DeleteFileResponse response1 = client.getAcsResponse(request);
        System.out.println(JSONObject.toJSONString(response1));
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); // Create a data synchronization task.
        updateFile(fileId);   // Configure scheduling properties for the data synchronization task.
    }
}