All Products
Search
Document Center

DataWorks:Change the data source of a synchronization task from MySQL to PolarDB by using API

Last Updated:Feb 27, 2026

Use the DataWorks SDK for Java to programmatically change the reader source of an offline data synchronization task from MySQL to PolarDB. The task destination remains MaxCompute.

When migrating databases from MySQL to PolarDB, existing data synchronization tasks in DataWorks still point to the original MySQL data source. Instead of manually reconfiguring each task in the console, use the DataWorks API to update task configurations.

Limitations

This method only supports changing the source from a MySQL data source to a PolarDB data source. Other data source type conversions are not supported.

Warning

Modifying the JSON configuration file through the API carries risks. Do not reuse the modified configuration file for other business purposes, and do not use this approach to modify other configuration files. Incorrect modifications may cause data synchronization task failures and data quality issues.

Prerequisites

Before you begin, make sure that you have:

  • Java Development Kit (JDK) installed

  • A Maven-based Java project

  • An Alibaba Cloud account with an AccessKey ID and AccessKey secret

  • A DataWorks workspace with the target data synchronization task already created

  • The PolarDB data source already registered in your DataWorks workspace

  • Sufficient permissions to call DataWorks API operations (ListFiles, GetFile, UpdateDISyncTask, SubmitFile, DeployFile, ListNodes, RunCycleDagNodes)

API call sequence

The following diagram shows the API call sequence:

ListFiles          Find the sync task file by path and name
    |
GetFile            Retrieve the current JSON configuration
    |
modifyContent      Replace MySQL reader config with PolarDB (local logic)
    |
UpdateDISyncTask   Save the updated configuration to DataWorks
    |
SubmitFile         Submit the file for deployment review
    |
GetDeployment      Check submission status
    |
DeployFile         Deploy the submitted file to production
    |
GetDeployment      Confirm deployment status
    |
ListNodes          Look up the node ID by node name
    |
RunCycleDagNodes   Trigger a retroactive run of the updated task

Each step is required because DataWorks separates the development lifecycle into distinct phases: edit, submit, deploy, and run. Skipping any step causes the change to remain in draft state or not take effect in production.

Add Maven dependencies

Add the following dependencies to your pom.xml file:

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-core</artifactId>
    <version>4.5.20</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>3.4.4</version>
</dependency>

Configuration changes

The modifyContent method finds the reader step (where category is reader and stepType is mysql) and updates three fields:

FieldLocationBefore (MySQL)After (PolarDB)
stepTypesteps[].stepTypemysqlpolardb
datasourcesteps[].parameter.datasourcemysql_from_polardbName of your PolarDB data source
datasourcesteps[].parameter.connection[].datasourcemysql_from_polardbName of your PolarDB data source

All other fields remain unchanged, including columns, table names, writer settings, and speed settings.

Before: MySQL reader configuration

{
    "stepType": "mysql",
    "parameter": {
        "envType": 0,
        "datasource": "mysql_from_polardb",
        "column": [
            "id",
            "name",
            "create_time",
            "create_user"
        ],
        "tableComment": "Test",
        "connection": [
            {
                "selectedDatabase": "polardb_db1",
                "datasource": "mysql_from_polardb",
                "table": [
                    "lcl_test_demo"
                ]
            }
        ],
        "where": "",
        "splitPk": "id",
        "encoding": "UTF-8"
    },
    "name": "Reader",
    "category": "reader"
}

After: PolarDB reader configuration

{
    "stepType": "polardb",
    "parameter": {
        "envType": 0,
        "datasource": "polardb",
        "column": [
            "id",
            "name",
            "create_time",
            "create_user"
        ],
        "tableComment": "Test",
        "connection": [
            {
                "selectedDatabase": "polardb_db1",
                "datasource": "polardb",
                "table": [
                    "lcl_test_demo"
                ]
            }
        ],
        "where": "",
        "splitPk": "id",
        "encoding": "UTF-8"
    },
    "name": "Reader",
    "category": "reader"
}
The writer step (stepType: odps, datasource: odps_source) remains unchanged. Only the reader step is modified.

Complete Java sample code

The following code demonstrates the full workflow: locate the task, retrieve and modify its configuration, deploy the changes, and trigger a run.

Replace the following placeholder values before running:

PlaceholderDescription
akId, akSecretYour AccessKey ID and AccessKey secret. Store credentials securely and avoid hardcoding them in production code.
regionIdThe region of your DataWorks workspace (for example, cn-chengdu).
setProjectIdThe ID of your DataWorks workspace. The sample uses 1911L as an example.
folderPathThe folder path of your data synchronization task in DataWorks.
filenameThe name of the data synchronization task file.
Second and third arguments of modifyContentThe new step type (polardb) and the name of your PolarDB data source as registered in DataWorks.
package com.alibaba.eas;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class updateOfflineTask {

    // Step 1: Find the task file by folder path and file name
    public static ListFilesResponse.Data.File ListFiles(String filePath, String fileName) throws Exception {
        ListFilesRequest request = new ListFilesRequest();
        request.setProjectId(1911L);
        request.setFileFolderPath(filePath);
        request.setKeyword(fileName);
        ListFilesResponse response1 = client.getAcsResponse(request);
        for(int i = 0 ; i < response1.getData().getFiles().size(); ++i) {
            return response1.getData().getFiles().get(i);
        }
        return null;

    }

    // Step 2: Retrieve the JSON configuration of the task
    public static String GetFiles(Long fileId) throws Exception {
        GetFileRequest request = new GetFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        GetFileResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFile().getContent();
    }

    // Step 4: Save the modified configuration back to DataWorks
    // taskType must be "DI_OFFLINE" for offline (batch) synchronization tasks
    public static void UpdateDISyncTask(Long fileId, String content) throws Exception {
        UpdateDISyncTaskRequest request = new UpdateDISyncTaskRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        request.setTaskContent(content);
        request.setTaskType("DI_OFFLINE");
        UpdateDISyncTaskResponse response1 = client.getAcsResponse(request);
    }

    // Step 5: Submit the file for deployment review
    // Returns a deploymentId used to track the submission status
    public static  Long submitFile(Long fileId) throws  Exception {
        SubmitFileRequest request = new SubmitFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        SubmitFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    // Step 6: Check the status of a submission or deployment
    // Call this after submitFile or deploy to confirm completion
    public static  void getDeployment(Long deploymentId) throws Exception {
        GetDeploymentRequest request = new GetDeploymentRequest();
        request.setProjectId(1911L);
        request.setDeploymentId(deploymentId);
        GetDeploymentResponse acsResponse = client.getAcsResponse(request);
        System.out.println(acsResponse.getData().getDeployment().getStatus());
    }

    // Step 7: Deploy the submitted file to the production environment
    // Returns a deploymentId used to track the deployment status
    public static  Long deploy(Long fileId) throws Exception {
        DeployFileRequest request = new DeployFileRequest();
        request.setProjectId(1911L);
        request.setFileId(fileId);
        DeployFileResponse acsResponse = client.getAcsResponse(request);
        Long deploymentId = acsResponse.getData();
        return deploymentId;
    }

    // Step 8: Look up the node ID by node name in the production environment
    public static Long listNode(String nodeName) throws Exception {
        ListNodesRequest request = new ListNodesRequest();
        request.setProjectId(1911L);
        request.setNodeName(nodeName);
        request.setProjectEnv("PROD");
        ListNodesResponse acsResponse = client.getAcsResponse(request);
        List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
        return nodesItemList.get(0).getNodeId();
    }

    // Step 9: Trigger a retroactive run for the updated task
    // Adjust startBizDate and endBizDate to the date range you want to rerun
    public static void RunCycleDagNodes(Long nodeId) throws Exception {
        RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
        request.setIncludeNodeIds(nodeId.toString());
        request.setName("rerun_job");
        request.setParallelism(false);
        request.setProjectEnv("PROD");
        request.setRootNodeId(nodeId);
        request.setStartBizDate("2021-09-29 00:00:00");
        request.setEndBizDate("2021-09-29 00:00:00");
        request.setProjectEnv("PROD");
        RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
    }

    // Step 3: Modify the JSON configuration to replace MySQL reader with PolarDB
    // - Finds the reader step (category="reader", stepType="mysql")
    // - Updates stepType to the new value (e.g., "polardb")
    // - Updates datasource in both parameter and connection objects
    public static String modifyContent(String content, String newStepType, String newDatasource){
        JSONObject jsonObject = JSON.parseObject(content);
        JSONArray steps = jsonObject.getJSONArray("steps");
        if (steps != null) {
            for (int i = 0; i < steps.size(); ++i) {
                JSONObject step = steps.getJSONObject(i);
                if (step != null && step.getString("category") != null && "reader".equals(step.getString("category"))) {
                    if (step.getString("stepType") != null && "mysql".equals(step.getString("stepType"))) {
                        step.put("stepType", newStepType);
                        JSONObject parameter = step.getJSONObject("parameter");
                        if (parameter  != null) {
                            parameter.put("datasource", newDatasource);
                            JSONArray connections = parameter.getJSONArray("connection");
                            if (connections != null) {
                                for (int j = 0; j < connections.size(); ++j) {
                                    JSONObject connection = connections.getJSONObject(j);
                                    if (connection != null) {
                                        connection.put("datasource", newDatasource);
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }


        return  jsonObject.toJSONString();
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        // Initialize the API client with your credentials and region
        String akId = "XXX";
        String akSecret = "XXX";
        String regionId = "cn-chengdu";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);

        // Locate the data synchronization task file
        String folderPath = "Business Flow/Test Workflow/Data Integration/";
        String filename = "mysql_to_odps";
        ListFilesResponse.Data.File file = ListFiles(folderPath, filename);
        Long fileId = file.getFileId();
        System.out.println(file.getFileId());

        // Get the current task configuration and modify it
        String content = GetFiles(fileId);
        // "polardb" is the new step type; "polardb_datasource" is the PolarDB data source name
        // registered in your DataWorks workspace
        String contentModified = modifyContent(content, "polardb", "polardb_datasource");

        // Save, submit, and deploy the updated task
        UpdateDISyncTask(file.getFileId(), contentModified);
        Long deployId = submitFile(fileId);
        getDeployment(deployId);
        Thread.sleep(10000);   // Wait for submission to complete
        getDeployment(deployId);
        deployId = deploy(fileId);
        getDeployment(deployId);
        Thread.sleep(10000);   // Wait for deployment to complete
        getDeployment(deployId);

        // Trigger a retroactive run to execute the updated task
        Long nodeId = listNode(filename);
        RunCycleDagNodes(nodeId);
    }
}

API operations reference

API operationPurposeKey parameters
ListFilesFind the sync task file by folder path and keywordprojectId, fileFolderPath, keyword
GetFileRetrieve the JSON configuration of a task fileprojectId, fileId
UpdateDISyncTaskSave updated task configurationprojectId, fileId, taskContent, taskType (DI_OFFLINE)
SubmitFileSubmit the file for deployment reviewprojectId, fileId
GetDeploymentCheck submission or deployment statusprojectId, deploymentId
DeployFileDeploy the submitted file to productionprojectId, fileId
ListNodesFind the production node ID by nameprojectId, nodeName, projectEnv (PROD)
RunCycleDagNodesTrigger a retroactive runincludeNodeIds, name, parallelism, projectEnv, rootNodeId, startBizDate, endBizDate

Verify the result

After running the code:

  1. Check deployment status. The getDeployment output should show a success status. If the status indicates failure, check whether the PolarDB data source is correctly registered in your DataWorks workspace.

  2. Verify in the DataWorks console. Open the data synchronization task in Data Development and confirm that the reader source now shows PolarDB instead of MySQL.

  3. Check the retroactive run. In Operation Center, find the task instance triggered by RunCycleDagNodes and verify that it completed successfully with data written to the MaxCompute destination table.

Troubleshooting

ListFiles returns empty results

The folder path or filename does not match any file in the specified workspace. Double-check the folderPath value (for example, Business Flow/Test Workflow/Data Integration/) and the filename.

UpdateDISyncTask fails

Possible causes:

  • The taskContent JSON is malformed. Validate the JSON before submitting.

  • The PolarDB data source name specified in modifyContent does not match a registered data source in the workspace.

  • The taskType is not set to DI_OFFLINE.

Deployment status shows failure

  • The submitted file may have validation errors. Open the task in the DataWorks console and check for error messages.

  • Make sure the PolarDB data source connectivity is working before deploying.

RunCycleDagNodes does not run

  • The startBizDate and endBizDate values must use the format yyyy-MM-dd HH:mm:ss.

  • Verify that the node exists in the PROD environment by checking the ListNodes response.

Usage notes

  • The projectId (1911L in the sample) must match your DataWorks workspace ID. Find this value in the DataWorks console on the Workspace Details page.

  • The Thread.sleep(10000) calls (10 seconds) are simple wait intervals. In production, poll GetDeployment in a loop until the status indicates completion or failure, rather than relying on fixed wait times.

  • Store your AccessKey ID and AccessKey secret securely. Do not hardcode credentials in source files. Use environment variables or a credentials provider instead.

  • The endpoint follows the pattern dataworks.{regionId}.aliyuncs.com. Replace {regionId} with the region of your workspace.