全部產品
Search
文件中心

:最佳實務:資料ETL操作全流程實踐

更新時間:Nov 20, 2025

本文通過案例為您介紹如何通過DataWorks OpenAPI完成資料開發。

背景資訊

假設一個簡單的情境,開發人員想把RDS庫裡面的資料同步到一張MaxCompute分區表中,然後在自建系統的頁面上展示經過資料分析後的報表資料,我們可以通過DataWorks OpenAPI去完成整個鏈路的實現。具體步驟如下所示。

前提條件

已安裝DataWorks OpenAPI SDK,詳情請參考使用OpenAPI

說明

除了Java語言,我們還支援Python,PHP,C#,Go等語言,請您根據實際開發環境進行安裝。

注意事項

預設情況下我們不需要顯式地指定DataWorks OpenAPI的Endpoint,但是如果aliyun-java-sdk-core版本偏低,可能會找不到DataWorks OpenAPI的Endpoint,這時候可在不升級版本的情況下通過使用如下代碼進行請求。

// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
config.setEndpoint("dataworks.cn-hangzhou.aliyuncs.com");
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
com.aliyun.dataworks_public20200518.Client client = new com.aliyun.dataworks_public20200518.Client(config);

如上代碼是顯式地指定了DataWorks OpenAPI的Endpoint,dataworks.${regionId}.aliyuncs.com這樣的網域名稱格式在公網環境下可訪問,但是有些使用者需要在VPC環境下調用OpenAPI,那麼則需要把網域名稱dataworks.${regionId}.aliyuncs.com變更成dataworks-vpc.${regionId}.aliyuncs.com,這樣在VPC網路環境下即使不能訪問公網也能請求到DataWorks OpenAPI。RegionId(地區ID),詳情請參見地區和可用性區域

步驟一:建立RDS資料來源

整合租戶API可建立引擎、建立資料來源、查看專案空間等資訊。在這個業務情境中,MaxCompute分區表存在於MaxCompute引擎中,在DataWorks控制台建立MaxCompute工作空間後會自動建立好MaxCompute引擎的資料來源,所以只需要使用CreateConnection建立好RDS資料來源即可。

CreateDataSourceRequest createRequest = new CreateDataSourceRequest();
createRequest.setProjectId(-1L);
createRequest.setName("TEST_CONNECTION");
createRequest.setDataSourceType("mysql");
createRequest.setEnvType(1);
createRequest.setContent("{\"password\":\"12345\"}");
Long dataSourceId;

try {
    CreateDataSourceResponse createResponse = client.createDataSource(createRequest);
    Assert.assertNotNull(createResponse.getBody().getData());
    dataSourceId = createResponse.getBody().getData();

    UpdateDataSourceRequest updateRequest = new UpdateDataSourceRequest();
    updateRequest.setDataSourceId(dataSourceId);
    updateRequest.setDescription("1");
    updateRequest.setContent("{\"database\":\"xsaxsa\",\"username\":\"xsaxsa\",\"tag\":\"rds\",\"regionId\":\"cn-shanghai\",\"rdsOwnerId\":\"xasxsa\",\"password\":\"xsaxsa\",\"instanceName\":\"rm-xsaxsa\"}");
    UpdateDataSourceResponse updateDataSourceResponse = client.updateDataSource(updateRequest);
    Assert.assertTrue(updateDataSourceResponse.getBody().getData());

    DeleteDataSourceRequest deleteRequest = new DeleteDataSourceRequest();
    deleteRequest.setDataSourceId(dataSourceId);
    DeleteDataSourceResponse deleteResponse = client.deleteDataSource(deleteRequest);
    Assert.assertTrue(deleteResponse.getBody().getData());
} catch (Exception e) {
    e.printStackTrace();
    Assert.fail();
}

UpdateConnectionDeleteConnection可分別修改和刪除資料來源資訊。

說明

對專案空間的成員進行管理的API集是CreateProjectMemberDeleteProjectMemberRemoveProjectMemberFromRoleListProjectMembers

步驟二:任務開發和發布調度

整合資料開發API可管理檔案,並對檔案進行提交和發布後組建循環任務,周期任務會定時調度運行,建立不同類型的檔案是根據FileType這個欄位決定的,目前已支援非常多的FileType,通過營運中心的API ListProgramTypeCount可擷取所有已支援的節點。

// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
CreateFileRequest createFileRequest = new CreateFileRequest();
createFileRequest.setFileFolderPath("商務程序/POP介面測試/MaxCompute/test_folder_3");
createFileRequest.setProjectId(-1L);
createFileRequest.setFileName("create_file_name");
createFileRequest.setFileType(10);
String projectIdentifier = "請填入工作空間名稱";
createFileRequest.setInputList(projectIdentifier+"_root");
createFileRequest.setContent("SHOW TABLES;");
createFileRequest.setFileDescription("create file");
createFileRequest.setRerunMode("ALL_ALLOWED");
CreateFileResponse createFileResponse = client.createFile(createFileRequest);

content欄位儲存SQL指令碼、Shell指令碼、Data Integration的指令碼代碼,Data Integration的指令碼格式,請參見指令碼模式配置。使用CreateFile建立完指令碼後,如需修改可使用UpdateFileDeleteFile進行管理。與頁面上操作流程一致的是完成檔案開發後需要提交和發布檔案才會組建循環執行個體,這裡要注意的是需要輪詢SubmitFile返回的DeploymentId,只有當GetDeployment返回的狀態是完成時(status.finished())才表示部署成功。

// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
SubmitFileRequest request = new SubmitFileRequest();
request.setFileId(fileId);
request.setComment("submit file");
SubmitFileResponse submitFileResponse = client.submitFile(request);

//檢查提交結果
DeploymentStatus status = null;
GetDeploymentResponseBody.GetDeploymentResponseBodyDataDeployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
  GetDeploymentRequest getDeploymentRequest = new GetDeploymentRequest();
  getDeploymentRequest.setDeploymentId(submitFileResponse.getBody().getData());
  GetDeploymentResponse getDeploymentResponse = client.getDeployment(getDeploymentRequest);
  LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getBody().getRequestId());
  Assert.assertNotNull(getDeploymentResponse.getBody().getData());
  deployment = getDeploymentResponse.getBody().getData().getDeployment();
  Assert.assertNotNull(deployment);
  Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
  Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
  Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
  Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
  Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
  Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
  status = Enums.find(DeploymentStatus.class, deployment.getStatus());
  Assert.assertNotNull(status);
  if (status.finished()) {
    LOGGER.info("Deployment finished - FinalStatus[{}]", status);
    break;
  }
  LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
  retryTimes++;
  SleepUtils.seconds(10L);
}

如果是在標準模式的專案下開發,提交完成後,還需要發布檔案才能最終提交到調度成為周期任務。發布檔案使用DeployFile,和提交檔案一樣,也需要使用GetDeployment輪詢部署狀態。

  DeployFileRequest request = new DeployFileRequest();
request.setFileId(fileId);
request.setComment("deploy file");
DeployFileResponse deployFileResponse = client.deployFile(request);
Long deploymentId = deployFileResponse.getBody().getData();
//檢查發布部署結果
DeploymentStatus status = null;
GetDeploymentResponseBody.GetDeploymentResponseBodyDataDeployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
    GetDeploymentRequest getDeploymentRequest = new GetDeploymentRequest();
    getDeploymentRequest.setDeploymentId(deploymentId);
    GetDeploymentResponse getDeploymentResponse = client.getDeployment(getDeploymentRequest);
    LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getBody().getRequestId());
    Assert.assertNotNull(getDeploymentResponse.getBody().getData());
    deployment = getDeploymentResponse.getBody().getData().getDeployment();
    Assert.assertNotNull(deployment);
    LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
                        deploymentId, new Gson().toJson(deployment));
    Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
    Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
    status = Enums.find(DeploymentStatus.class, deployment.getStatus());
    Assert.assertNotNull(status);
    if (status.finished()) {
        LOGGER.info("Deployment finished - FinalStatus[{}]", status);
        break;
    }
    LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
    retryTimes++;
    SleepUtils.seconds(10L);
}
說明

資料開發API除了可對檔案管理外,還能管理檔案夾、資源、函數。

步驟三:配置營運監控

通過API完成周期任務的生產之後,會在DataWorks平台每天產生調度執行個體並被定時調度運行,使用營運中心API可對周期任務和周期執行個體進行營運操作,可通過GetNodeGetInstanceListInstances等API查看周期任務和周期執行個體,監控執行個體運行情況。樣本如下。

GetInstanceRequest request = new GetInstanceRequest();
request.setInstanceId(INSTANCE_ID);
request.setProjectEnv(PROJECT_ENV);
try {
  GetInstanceResponse response = client.getInstance(request);
  Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
  BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
  Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
  Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
  Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
  Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
  Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
  Assert.assertEquals("", bizInstanceDto.getParamValues());
  Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
  Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
  Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
  Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
  Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
} catch (Exception e) {
  e.printStackTrace();
  Assert.fail();
}

如果執行個體運行異常可通過RestartInstanceSetSuccessInstanceSuspendInstanceResumeInstance處理。使用CreateRemindUpdateRemind等API可建立自訂警示規則,確保每Apsara Infrastructure Management Framework線順利產出,一旦異常可警示通知到人工介入。

CreateRemindRequest createRemindRequest = new CreateRemindRequest();
createRemindRequest.setRemindName("REMIND_CREATE_TEST");
createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
createRemindRequest.setRemindType(RemindType.ERROR.name());
createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
createRemindRequest.setDndEnd("08:00");
createRemindRequest.setNodeIds("-1");
createRemindRequest.setMaxAlertTimes(1);
createRemindRequest.setAlertInterval(1800);
createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
try {
    CreateRemindResponse createResponse = client.createRemind(createRemindRequest);
    MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
    Assert.assertTrue(createResponse.getData() > 0);
} catch (Exception ex) {
    ex.printStackTrace();
    return;
}
說明

營運中心主要提供周期任務、手動商務程序、基準查詢、警示配置和查詢等相關API。

步驟四:配置資料品質監控

在這個業務情境中,我們通過前面介紹的API已經可以每天定時把資料從RDS同步到MaxCompute的表中了。如果我們擔心產生髒資料或者資料缺失影響到線上業務,可通過資料品質API,整合DataWorks資料品質監控能力,當表資料產出異常時,可以立刻觸發並通知規則訂閱人。

CreateQualityRuleRequest request = new CreateQualityRuleRequest();
request.setBlockType(0);
request.setComment("test-createTemplateRuleSuccess");
request.setCriticalThreshold("50");
request.setEntityId(entityId);
request.setOperator("abs");
request.setPredictType(0);
request.setProjectName(PROJECT_NAME);
request.setProperty("table_count");
request.setPropertyType("table");
request.setRuleName("createTemplateRuleSuccess");
request.setRuleType(0);
request.setTemplateId(7);
request.setWarningThreshold("10");
try {
  CreateQualityRuleResponse response = client.createQualityRule(request);
  Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
  Long templateRuleId = Long.parseLong(data.toString());
  Assert.assertTrue(templateRuleId > 0);
  return templateRuleId;
} catch (Exception e) {
  e.printStackTrace();
  Assert.assertFalse(true);
  return null;
}
說明

CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等資料品質API集可管理資料品質規則,具體詳情請參見對應的API。

步驟五:產生資料服務API

我們通過中繼資料API完成了表建立,通過資料開發API完成檔案和周期任務建立,通過資料品質和營運中心API配置好了監控規則,MaxCompute分區表資料亦可順利產生,這時候我們還需要最後一個步驟把MaxCompute分區表的資料通過資料服務OpenAPI產生一個資料服務API向系統提供資料服務。

CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
createRequest.setTenantId(tenantId);
createRequest.setProjectId(projectId);
createRequest.setApiMode(apiMode);
createRequest.setApiName(apiName);
createRequest.setApiPath(apiPath);
createRequest.setApiDescription("test");
createRequest.setGroupId(groupId);
createRequest.setVisibleRange(visibleRange);
createRequest.setTimeout(10000);
createRequest.setProtocols(protocols);
createRequest.setRequestMethod(requestMethod);
createRequest.setResponseContentType(responseType);

CreateDataServiceApiResponse createResponse = client.createDataServiceApi(createRequest);
Long apiId = createResponse.getBody().getData();
Assert.assertNotNull(apiId);

GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
getRequest.setTenantId(tenantId);
getRequest.setProjectId(projectId);
getRequest.setApiId(apiId);
GetDataServiceApiResponse getResponse = client.getDataServiceApi(getRequest);
GetDataServiceApiResponseBody.GetDataServiceApiResponseBodyData data = getResponse.getBody().getData();
Assert.assertEquals(apiId, data.getApiId());
Assert.assertEquals(0L, data.getFolderId().longValue());

使用CreateDataServiceApiPublishDataServiceApi可把表資料轉換成資料服務API,那麼整個資料生產鏈路就完成了,整合以上的DataWorks OpenAPI即完成了本地系統和雲上系統的無縫對接。