Use the DataWorks SDK for Java to programmatically change the reader source of an offline data synchronization task from MySQL to PolarDB. The task destination remains MaxCompute.
When migrating databases from MySQL to PolarDB, existing data synchronization tasks in DataWorks still point to the original MySQL data source. Instead of manually reconfiguring each task in the console, use the DataWorks API to update task configurations.
Limitations
This method only supports changing the source from a MySQL data source to a PolarDB data source. Other data source type conversions are not supported.
Modifying the JSON configuration file through the API carries risks. Do not reuse the modified configuration file for other business purposes, and do not use this approach to modify other configuration files. Incorrect modifications may cause data synchronization task failures and data quality issues.
Prerequisites
Before you begin, make sure that you have:
Java Development Kit (JDK) installed
A Maven-based Java project
An Alibaba Cloud account with an AccessKey ID and AccessKey secret
A DataWorks workspace with the target data synchronization task already created
The PolarDB data source already registered in your DataWorks workspace
Sufficient permissions to call DataWorks API operations (
ListFiles,GetFile,UpdateDISyncTask,SubmitFile,DeployFile,ListNodes,RunCycleDagNodes)
API call sequence
The following diagram shows the API call sequence:
ListFiles Find the sync task file by path and name
|
GetFile Retrieve the current JSON configuration
|
modifyContent Replace MySQL reader config with PolarDB (local logic)
|
UpdateDISyncTask Save the updated configuration to DataWorks
|
SubmitFile Submit the file for deployment review
|
GetDeployment Check submission status
|
DeployFile Deploy the submitted file to production
|
GetDeployment Confirm deployment status
|
ListNodes Look up the node ID by node name
|
RunCycleDagNodes Trigger a retroactive run of the updated taskEach step is required because DataWorks separates the development lifecycle into distinct phases: edit, submit, deploy, and run. Skipping any step causes the change to remain in draft state or not take effect in production.
Add Maven dependencies
Add the following dependencies to your pom.xml file:
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.20</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>3.4.4</version>
</dependency>Configuration changes
The modifyContent method finds the reader step (where category is reader and stepType is mysql) and updates three fields:
| Field | Location | Before (MySQL) | After (PolarDB) |
|---|---|---|---|
stepType | steps[].stepType | mysql | polardb |
datasource | steps[].parameter.datasource | mysql_from_polardb | Name of your PolarDB data source |
datasource | steps[].parameter.connection[].datasource | mysql_from_polardb | Name of your PolarDB data source |
All other fields remain unchanged, including columns, table names, writer settings, and speed settings.
Before: MySQL reader configuration
{
"stepType": "mysql",
"parameter": {
"envType": 0,
"datasource": "mysql_from_polardb",
"column": [
"id",
"name",
"create_time",
"create_user"
],
"tableComment": "Test",
"connection": [
{
"selectedDatabase": "polardb_db1",
"datasource": "mysql_from_polardb",
"table": [
"lcl_test_demo"
]
}
],
"where": "",
"splitPk": "id",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
}After: PolarDB reader configuration
{
"stepType": "polardb",
"parameter": {
"envType": 0,
"datasource": "polardb",
"column": [
"id",
"name",
"create_time",
"create_user"
],
"tableComment": "Test",
"connection": [
{
"selectedDatabase": "polardb_db1",
"datasource": "polardb",
"table": [
"lcl_test_demo"
]
}
],
"where": "",
"splitPk": "id",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
}The writer step (stepType: odps,datasource: odps_source) remains unchanged. Only the reader step is modified.
Complete Java sample code
The following code demonstrates the full workflow: locate the task, retrieve and modify its configuration, deploy the changes, and trigger a run.
Replace the following placeholder values before running:
| Placeholder | Description |
|---|---|
akId, akSecret | Your AccessKey ID and AccessKey secret. Store credentials securely and avoid hardcoding them in production code. |
regionId | The region of your DataWorks workspace (for example, cn-chengdu). |
setProjectId | The ID of your DataWorks workspace. The sample uses 1911L as an example. |
folderPath | The folder path of your data synchronization task in DataWorks. |
filename | The name of the data synchronization task file. |
Second and third arguments of modifyContent | The new step type (polardb) and the name of your PolarDB data source as registered in DataWorks. |
package com.alibaba.eas;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;
public class updateOfflineTask {
// Step 1: Find the task file by folder path and file name
public static ListFilesResponse.Data.File ListFiles(String filePath, String fileName) throws Exception {
ListFilesRequest request = new ListFilesRequest();
request.setProjectId(1911L);
request.setFileFolderPath(filePath);
request.setKeyword(fileName);
ListFilesResponse response1 = client.getAcsResponse(request);
for(int i = 0 ; i < response1.getData().getFiles().size(); ++i) {
return response1.getData().getFiles().get(i);
}
return null;
}
// Step 2: Retrieve the JSON configuration of the task
public static String GetFiles(Long fileId) throws Exception {
GetFileRequest request = new GetFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
GetFileResponse response1 = client.getAcsResponse(request);
return response1.getData().getFile().getContent();
}
// Step 4: Save the modified configuration back to DataWorks
// taskType must be "DI_OFFLINE" for offline (batch) synchronization tasks
public static void UpdateDISyncTask(Long fileId, String content) throws Exception {
UpdateDISyncTaskRequest request = new UpdateDISyncTaskRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
request.setTaskContent(content);
request.setTaskType("DI_OFFLINE");
UpdateDISyncTaskResponse response1 = client.getAcsResponse(request);
}
// Step 5: Submit the file for deployment review
// Returns a deploymentId used to track the submission status
public static Long submitFile(Long fileId) throws Exception {
SubmitFileRequest request = new SubmitFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
SubmitFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
// Step 6: Check the status of a submission or deployment
// Call this after submitFile or deploy to confirm completion
public static void getDeployment(Long deploymentId) throws Exception {
GetDeploymentRequest request = new GetDeploymentRequest();
request.setProjectId(1911L);
request.setDeploymentId(deploymentId);
GetDeploymentResponse acsResponse = client.getAcsResponse(request);
System.out.println(acsResponse.getData().getDeployment().getStatus());
}
// Step 7: Deploy the submitted file to the production environment
// Returns a deploymentId used to track the deployment status
public static Long deploy(Long fileId) throws Exception {
DeployFileRequest request = new DeployFileRequest();
request.setProjectId(1911L);
request.setFileId(fileId);
DeployFileResponse acsResponse = client.getAcsResponse(request);
Long deploymentId = acsResponse.getData();
return deploymentId;
}
// Step 8: Look up the node ID by node name in the production environment
public static Long listNode(String nodeName) throws Exception {
ListNodesRequest request = new ListNodesRequest();
request.setProjectId(1911L);
request.setNodeName(nodeName);
request.setProjectEnv("PROD");
ListNodesResponse acsResponse = client.getAcsResponse(request);
List<ListNodesResponse.Data.NodesItem> nodesItemList = acsResponse.getData().getNodes();
return nodesItemList.get(0).getNodeId();
}
// Step 9: Trigger a retroactive run for the updated task
// Adjust startBizDate and endBizDate to the date range you want to rerun
public static void RunCycleDagNodes(Long nodeId) throws Exception {
RunCycleDagNodesRequest request = new RunCycleDagNodesRequest();
request.setIncludeNodeIds(nodeId.toString());
request.setName("rerun_job");
request.setParallelism(false);
request.setProjectEnv("PROD");
request.setRootNodeId(nodeId);
request.setStartBizDate("2021-09-29 00:00:00");
request.setEndBizDate("2021-09-29 00:00:00");
request.setProjectEnv("PROD");
RunCycleDagNodesResponse acsResponse = client.getAcsResponse(request);
}
// Step 3: Modify the JSON configuration to replace MySQL reader with PolarDB
// - Finds the reader step (category="reader", stepType="mysql")
// - Updates stepType to the new value (e.g., "polardb")
// - Updates datasource in both parameter and connection objects
public static String modifyContent(String content, String newStepType, String newDatasource){
JSONObject jsonObject = JSON.parseObject(content);
JSONArray steps = jsonObject.getJSONArray("steps");
if (steps != null) {
for (int i = 0; i < steps.size(); ++i) {
JSONObject step = steps.getJSONObject(i);
if (step != null && step.getString("category") != null && "reader".equals(step.getString("category"))) {
if (step.getString("stepType") != null && "mysql".equals(step.getString("stepType"))) {
step.put("stepType", newStepType);
JSONObject parameter = step.getJSONObject("parameter");
if (parameter != null) {
parameter.put("datasource", newDatasource);
JSONArray connections = parameter.getJSONArray("connection");
if (connections != null) {
for (int j = 0; j < connections.size(); ++j) {
JSONObject connection = connections.getJSONObject(j);
if (connection != null) {
connection.put("datasource", newDatasource);
}
}
}
}
}
}
}
}
return jsonObject.toJSONString();
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
// Initialize the API client with your credentials and region
String akId = "XXX";
String akSecret = "XXX";
String regionId = "cn-chengdu";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
// Locate the data synchronization task file
String folderPath = "Business Flow/Test Workflow/Data Integration/";
String filename = "mysql_to_odps";
ListFilesResponse.Data.File file = ListFiles(folderPath, filename);
Long fileId = file.getFileId();
System.out.println(file.getFileId());
// Get the current task configuration and modify it
String content = GetFiles(fileId);
// "polardb" is the new step type; "polardb_datasource" is the PolarDB data source name
// registered in your DataWorks workspace
String contentModified = modifyContent(content, "polardb", "polardb_datasource");
// Save, submit, and deploy the updated task
UpdateDISyncTask(file.getFileId(), contentModified);
Long deployId = submitFile(fileId);
getDeployment(deployId);
Thread.sleep(10000); // Wait for submission to complete
getDeployment(deployId);
deployId = deploy(fileId);
getDeployment(deployId);
Thread.sleep(10000); // Wait for deployment to complete
getDeployment(deployId);
// Trigger a retroactive run to execute the updated task
Long nodeId = listNode(filename);
RunCycleDagNodes(nodeId);
}
}API operations reference
| API operation | Purpose | Key parameters |
|---|---|---|
ListFiles | Find the sync task file by folder path and keyword | projectId, fileFolderPath, keyword |
GetFile | Retrieve the JSON configuration of a task file | projectId, fileId |
UpdateDISyncTask | Save updated task configuration | projectId, fileId, taskContent, taskType (DI_OFFLINE) |
SubmitFile | Submit the file for deployment review | projectId, fileId |
GetDeployment | Check submission or deployment status | projectId, deploymentId |
DeployFile | Deploy the submitted file to production | projectId, fileId |
ListNodes | Find the production node ID by name | projectId, nodeName, projectEnv (PROD) |
RunCycleDagNodes | Trigger a retroactive run | includeNodeIds, name, parallelism, projectEnv, rootNodeId, startBizDate, endBizDate |
Verify the result
After running the code:
Check deployment status. The
getDeploymentoutput should show a success status. If the status indicates failure, check whether the PolarDB data source is correctly registered in your DataWorks workspace.Verify in the DataWorks console. Open the data synchronization task in Data Development and confirm that the reader source now shows PolarDB instead of MySQL.
Check the retroactive run. In Operation Center, find the task instance triggered by
RunCycleDagNodesand verify that it completed successfully with data written to the MaxCompute destination table.
Troubleshooting
ListFiles returns empty results
The folder path or filename does not match any file in the specified workspace. Double-check the folderPath value (for example, Business Flow/Test Workflow/Data Integration/) and the filename.
UpdateDISyncTask fails
Possible causes:
The
taskContentJSON is malformed. Validate the JSON before submitting.The PolarDB data source name specified in
modifyContentdoes not match a registered data source in the workspace.The
taskTypeis not set toDI_OFFLINE.
Deployment status shows failure
The submitted file may have validation errors. Open the task in the DataWorks console and check for error messages.
Make sure the PolarDB data source connectivity is working before deploying.
RunCycleDagNodes does not run
The
startBizDateandendBizDatevalues must use the formatyyyy-MM-dd HH:mm:ss.Verify that the node exists in the
PRODenvironment by checking theListNodesresponse.
Usage notes
The
projectId(1911Lin the sample) must match your DataWorks workspace ID. Find this value in the DataWorks console on the Workspace Details page.The
Thread.sleep(10000)calls (10 seconds) are simple wait intervals. In production, pollGetDeploymentin a loop until the status indicates completion or failure, rather than relying on fixed wait times.Store your AccessKey ID and AccessKey secret securely. Do not hardcode credentials in source files. Use environment variables or a credentials provider instead.
The endpoint follows the pattern
dataworks.{regionId}.aliyuncs.com. Replace{regionId}with the region of your workspace.