All Products
Search
Document Center

DataWorks:Manage Data Integration tasks with Open API

Last Updated:Jan 07, 2026

This topic describes how to use Open API to create, update, and delete Data Integration tasks that synchronize data from a source to a destination.

Prerequisites

  • A workflow is created. For more information, see Create a scheduled workflow.

  • The data sources required for the synchronization task are created.

Limitations

  • You can call the CreateDISyncTask operation to create a Data Integration task. You can configure the task content only in the code editor.

  • You cannot use Open API to create a workflow in DataWorks. Instead, you must use an existing workflow for the data synchronization task.

Obtain an SDK

Obtain the latest SDK from the Alibaba Cloud Open API Portal.

Procedure

After setting up your environment, follow these steps to create and manage a data synchronization task with the DataWorks API:

  1. Create a data integration task.

  2. Configure scheduling dependencies for the task.

  3. Submit the Data Integration task.

  4. Deploy the synchronization task to the production environment.

Steps

  1. Create a data integration task.

    Call the CreateDISyncTask operation to create the task. The following sample code shows how to configure several key parameters. For more information about the parameters, see CreateDISyncTask.

    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"Comment for the table same\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_source\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"Comment for the table same\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"Workflow/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String regionId = "cn-hangzhou";
            // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. Call the UpdateFile operation to update the scheduling parameters.

    The following table describes the request parameters.

    Name

    Type

    Required

    Example

    Description

    Action

    String

    Yes

    UpdateFile

    The operation to perform.

    FileFolderPath

    String

    No

    Workflows/MyFirstWorkflow/DataIntegration/Folder1/Folder2

    The path of the file.

    ProjectId

    Long

    No

    10000

    The ID of the DataWorks workspace. Go to the workspace management page in the DataWorks Console to find the workspace ID.

    FileName

    String

    No

    ods_user_info_d

    The name of the file. To rename a file, set a new value for this parameter.

    You can find the required FileId by using the ListFiles operation.

    FileDescription

    String

    No

    This is a file description

    The description of the file.

    Content

    String

    No

    SELECT "1";

    The code of the file. The code format varies based on the file type (fileType). To see an example, right-click a task in Operation Center and select View Code.

    AutoRerunTimes

    Integer

    Yes

    3

    The number of times to automatically rerun the task after an error.

    AutoRerunIntervalMillis

    Integer

    No

    120000

    The interval between automatic reruns, in milliseconds. The maximum value is 1,800,000 (30 minutes).

    This parameter corresponds to rerun interval on the Scheduling tab in the DataWorks Console.

    The console displays this value in minutes. Ensure that you convert the unit when using the API.

    RerunMode

    String

    No

    ALL_ALLOWED

    The rerun policy. Valid values:

    • ALL_ALLOWED: Reruns are allowed after both successful and failed runs.

    • FAILURE_ALLOWED: Reruns are allowed only after a failed run.

    • ALL_DENIED: Reruns are not allowed.

    This parameter corresponds to rerun property on the Scheduling tab in the DataWorks console.

    Stop

    Boolean

    No

    false

    Specifies whether to pause the scheduling of the task. Valid values:

    • true: Pauses the scheduling.

    • false: Does not pause the scheduling.

    This parameter corresponds to the pause scheduling setting on the Scheduling tab in the DataWorks Console.

    ParaValue

    String

    No

    x=a y=b z=c

    The scheduling parameters.

    This parameter corresponds to parameters on the Scheduling tab in the DataWorks Console. For information about how to configure scheduling parameters, see Configure scheduling parameters.

    StartEffectDate

    Long

    No

    936923400000

    The time to start automatic scheduling, specified as a Unix timestamp in milliseconds.

    This parameter corresponds to the start time specified for effective date on the Scheduling tab in the DataWorks console.

    EndEffectDate

    Long

    No

    4155787800000

    The time to stop automatic scheduling, specified as a Unix timestamp in milliseconds.

    This parameter corresponds to the start time specified for effective date on the Scheduling tab in the DataWorks console.

    CronExpress

    String

    No

    00 00-59/5 1-23 * * ?

    The cron expression for periodic scheduling. This parameter corresponds to cron expression on the Scheduling tab in the DataWorks Console. After you configure the schedule and scheduled time, DataWorks automatically generates a cron expression.

    Examples:

    • Run the task at 05:30 every day: 00 30 05 * * ?.

    • Run the task at 15 minutes past every hour: 00 15 * * * ?.

    • Run the task every 10 minutes: 00 00/10 * * * ?.

    • Run the task every 10 minutes from 08:00 to 17:00 every day: 00 00-59/10 8-17 * * * ?.

    • Run the task at 00:20 on the first day of each month: 00 20 00 1 * ?.

    • Run the task every 3 months, starting from 00:10 on January 1: 00 10 00 1 1-12/3 ?.

    • Run the task at 00:05 on every Tuesday and Friday: 00 05 00 * * 2,5.

    The DataWorks scheduling system has the following limits for cron expressions:

    • The minimum scheduling interval is 5 minutes.

    • The earliest time that a task can be scheduled to run is 00:05 each day.

    CycleType

    String

    No

    NOT_DAY

    The type of the scheduling cycle. Valid values are NOT_DAY (minute or hour) and DAY (day, week, or month).

    This parameter corresponds to the schedule on the Scheduling tab in the DataWorks Console.

    DependentType

    String

    No

    USER_DEFINE

    The cross-cycle dependency mode. Valid values:

    • SELF: The current node is selected as the dependent node.

    • CHILD: The first-level child nodes are selected as dependent nodes.

    • USER_DEFINE: Other nodes are selected as dependent nodes.

    • NONE: No dependent nodes are selected. The task does not depend on the previous cycle.

    DependentNodeIdList

    String

    No

    5,10,15,20

    If DependentType is USER_DEFINE, this parameter is required. Specify the IDs of dependent nodes, separated by commas (,).
    This parameter corresponds to the settings for USER_DEFINE that appear when you configure cross-cycle dependency on the Scheduling tab in the DataWorks console.

    InputList

    String

    No

    project_root,project.file1,project.001_out

    The output names of the upstream nodes this node depends on. Separate multiple names with commas (,).
    This parameter corresponds to upstream node output name on the Scheduling tab in the DataWorks console.

    Note

    This parameter is required when creating a batch synchronization task with the CreateDISyncTask or UpdateFile operation.

    ProjectIdentifier

    String

    No

    dw_project

    The name of the DataWorks workspace. You can log on to the DataWorks Console and go to the workspace configuration page to obtain the workspace name.
    You must set either this parameter or the ProjectId parameter to specify the DataWorks workspace.

    FileId

    Long

    Yes

    100000001

    The ID of the file. You can call the ListFiles operation to obtain the file ID.

    OutputList

    String

    No

    dw_project.ods_user_info_d

    The output name of the file.
    This parameter corresponds to current node output name on the Scheduling tab in the DataWorks console.

    ResourceGroupIdentifier

    String

    No

    default_group

    The resource group where the task runs after it is deployed. You can call the ListResourceGroups operation to obtain the available resource groups in the workspace.

    ConnectionName

    String

    No

    odps_source

    The identifier of the data source that the task uses for execution. You can call the ListDataSources operation to obtain the list of available data sources.

    Owner

    String

    No

    18023848927592

    The user ID of the file owner.

    AutoParsing

    Boolean

    No

    true

    Specifies whether to enable automatic parsing for the file. Valid values:

    • true: The file automatically parses code.

    • false: The file does not automatically parse code.

    This parameter corresponds to parse input/output from code on the Scheduling tab in the DataWorks Console.

    SchedulerType

    String

    No

    NORMAL

    The type of scheduling. Valid values:

    • NORMAL: A normally scheduled task.

    • MANUAL: A manual task. It runs only when manually triggered and is not scheduled automatically.

    • PAUSE: A paused task.

    • SKIP: A dry-run task. Dry-run tasks are scheduled on a regular basis, but the tasks are immediately set to successful when the scheduling starts.

    AdvancedSettings

    String

    No

    {"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}

    The advanced settings for the task.

    This parameter corresponds to advanced settings on the editing page of an EMR Spark Streaming or EMR Streaming SQL task in the DataWorks Console.

    Currently, this parameter is supported only for EMR Spark Streaming and EMR Streaming SQL tasks. The parameter value must be in the JSON format.

    StartImmediately

    Boolean

    No

    true

    Specifies whether to start the task immediately after it is deployed. Valid values:

    • true: Starts the task immediately after it is deployed.

    • false: Does not start the task immediately after it is deployed.

    This parameter corresponds to the startup mode on the Scheduling tab on the editing page of an EMR Spark Streaming or EMR Streaming SQL task in the DataWorks Console.

    InputParameters

    String

    No

    [{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]

    The context input parameters for the node. The value is a JSON string. For information about the fields, see the InputContextParameterList parameter returned by the GetFile operation.

    This parameter corresponds to current node input parameters on the Scheduling tab in the DataWorks Console.

    OutputParameters

    String

    No

    [{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]

    The context output parameters for the node. The value is a JSON string. For information about the fields, see the OutputContextParameterList parameter returned by the GetFile operation.
    This parameter corresponds to current node output parameters on the Scheduling tab in the DataWorks Console.

  3. Submit the Data Integration task.

    Call the SubmitFile operation to submit the Data Integration task to the development environment of the scheduling system. The response returns a deploymentId. Use this ID with the GetDeployment operation to retrieve deployment details.

     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // This node ID is the ID that is returned when you create the node. It corresponds to the file_id in the File table of the database.
            request.setFileId(501576542L);
            request.setComment("Comment");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
          // The DeploymentId is the return value of the commit or publish operation.
          Long deploymentId = acsResponse.getData();
            log.info(acsResponse.toString());
    }

    The preceding code provides an example of how to configure some of the parameters. For more information about the parameters, see SubmitFile and GetDeployment.

  4. Deploy the synchronization task to the production environment.

    Call the DeployFile operation to publish the Data Integration synchronization task to the production environment.

    Note

    This operation applies only to workspaces in Standard mode.

     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("Comment");
            // Specify either NodeId or file_id. The value of NodeId is the node ID in the basic properties of the scheduling configuration.
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
             // The DeploymentId is the return value of the commit or publish operation.
            Long deploymentId = acsResponse.getData();
            log.info(acsResponse.getData().toString());
        }

    The preceding code provides an example of how to configure some of the parameters. For more information about the parameters, see DeployFile.

  5. Obtain the details of the deployment package.

    The operation returns a deploymentId. Use this ID with the GetDeployment operation to check the deployment status. A Status of 1 indicates a successful deployment.

    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // The DeploymentId is the return value of the commit or publish operation. Call the GetDeployment operation to get the details of this deployment.
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }

    The preceding code provides an example of how to configure some of the parameters. For more information about the parameters, see GetDeployment.

Modify the configuration of a synchronization task

To modify a task, call the UpdateDISyncTask operation to update the Content of the task or update the exclusive resource group by using the TaskParam parameter. After updating the task, you must submit and deploy it again. For more information, see Procedure.

Delete a synchronization task

To delete a Data Integration synchronization task, call the DeleteFile operation.

Sample code

import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks_public20200518.Client;
import com.aliyun.dataworks_public20200518.models.*;
import com.aliyun.teaopenapi.models.Config;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_source\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"Workflow/Automated_Testing_Workspace_Do_Not_Modify/Data_Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // Use an exclusive resource group for Data Integration.
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // Use an exclusive resource group for scheduling.
        request.setInputList("dataworks_di_autotest_root");

        request.setAutoParsing(true);
        request.setDependentNodeIdList("5,10,15,20");
        request.setDependentType("SELF");
        request.setStartEffectDate(0L);
        request.setEndEffectDate(4155787800000L);
        request.setFileDescription("description");
        request.setStop(false);
        request.setParaValue("x=a y=b z=c");
        request.setSchedulerType("NORMAL");
        request.setAutoRerunIntervalMillis(120000);
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }
    static void deleteTask(Long fileId) throws Exception {
        DeleteFileRequest request = new DeleteFileRequest();
        Long projectId = 63845L;
        request.setProjectId(projectId);
        request.setFileId(fileId);


        String akId = "XXX";
        String akSecret = "XXXX";
        String regionId = "cn-hangzhou";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
        IAcsClient client;

        client = new DefaultAcsClient(profile);

        DeleteFileResponse response1 = client.getAcsResponse(request);
        System.out.println(JSONObject.toJSONString(response1));
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); // Create a data integration task.
        updateFile(fileId);   // Modify the scheduling properties of the data integration task.
    }
}