All Products
Search
Document Center

DataWorks:Best practices for performing ETL operations for data

Last Updated:Dec 25, 2023

This topic describes how to call DataWorks API operations to develop data.

Background information

For example, developers want to synchronize data from an RDS database to a partitioned MaxCompute table, and display analysis results in reports on the page of a self-managed system. The developers can call DataWorks API operations to complete the data development process. The process consists of the following steps:

Prerequisites

DataWorks OpenAPI SDK is installed. For more information, see Install SDKs for Java.

Note

In addition to Java, DataWorks OpenAPI SDKs for programming languages such as Python, PHP, C#, and Go are supported. You can install DataWorks OpenAPI SDK for a specific programming language based on your business requirements.

Precautions

By default, you do not need to explicitly specify the endpoint of DataWorks API operations. However, if aliyun-java-sdk-core of an earlier version is used, the endpoint of DataWorks API operations may not be found. In this case, you can use the following code to make a request without upgrading the version of aliyun-java-sdk-core.

// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");
IAcsClient client = new DefaultAcsClient(profile);

The preceding code explicitly specifies the endpoint of DataWorks API operations. An endpoint in the dataworks.${regionId}.aliyuncs.com format can be accessed over the Internet. If you want to call DataWorks API operations in a virtual private cloud (VPC) environment, you must specify an endpoint in the dataworks-vpc.${regionId}.aliyuncs.com instead of the dataworks.${regionId}.aliyuncs.com format. This way, you can request DataWorks API operations in a VPC environment in which you cannot access the Internet. regionId indicates the ID of a region. For more information, see Regions and Zones.

Step 1: Add an RDS data source

The API operations that are related to tenants can be called to create a compute engine, a data source, and view information about objects such as workspaces. In the business scenario in this topic, a partitioned MaxCompute table is stored in a MaxCompute compute engine. In the DataWorks console, when you associate a MaxCompute compute engine with your workspace, DataWorks generates a default MaxCompute data source in the workspace. Therefore, you need to only call the CreateConnection operation to create an RDS data source.

CreateConnectionRequest createRequest = new CreateConnectionRequest();

        createRequest.setProjectId(-1L);
        createRequest.setName("TEST_CONNECTION");
        createRequest.setConnectionType("MYSQL");
        createRequest.setEnvType(1);
        createRequest.setContent("{\"password\":\"12345\"}");
        Long connectionId;

        try {
            CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);
            Assert.assertNotNull(createResponse.getData());
            connectionId = createResponse.getData();

            UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();
            updateRequest.setConnectionId(connectionId);
            updateRequest.setDescription("1");
            UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);
            Assert.assertTrue(acsResponse.getData());

            DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();
            deleteRequest.setConnectionId(connectionId);
            DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);
            Assert.assertTrue(deleteResponse.getData());
        } catch (ClientException e) {
            e.printStackTrace();
            Assert.fail();
        }

You can call the UpdateConnection operation to modify the information about a data source. You can call the DeleteConnection operation to delete the information about a data source.

Note

The API operations that can be called to manage members in a workspace are CreateProjectMember, DeleteProjectMember, RemoveProjectMemberFromRole, and ListProjectMembers.

Step 2: Develop, deploy, and schedule tasks

You can call the DataStudio API operations to manage files, and commit and deploy files to generate auto triggered tasks. The auto triggered tasks are scheduled to run at specific points in time. The FileType field determines the type of a file. The FileType field has multiple values. You can call the ListProgramTypeCount operation in Operation Center to query the supported types of tasks.

        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        IAcsClient client = new DefaultAcsClient(profile);
        CreateFileRequest createFileRequest = new CreateFileRequest();
        createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());
        createFileRequest.setInputList(projectIdentifier+"_root");
        createFileRequest.setContent(content);
        createFileRequest.setFileName("create_file_" + caseId);
        createFileRequest.setFileFolderPath("Workflow/POP API test/MaxCompute/test_folder_3");
        createFileRequest.setFileDescription("create file " + caseId);
        createFileRequest.setRerunMode("ALL_ALLOWED");
        CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);

The content field is used to store SQL task scripts, Shell task scripts, and Data Integration task scripts. For information about the script format of a Data Integration task, see Configure a batch synchronization task by using the code editor. After you call the CreateFile operation to create a script, you can call the UpdateFile operation to modify the script and the DeleteFile operation to delete the script. Similar to the procedure in the DataWorks console, you must commit and deploy a file to generate auto triggered task instances after you develop the file in the code editor. Note that you must perform a round robin for the value of DeploymentId that is returned by the SubmitFile operation. The file is committed only when status.finished() is returned by the GetDeployment operation.

        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        IAcsClient client = new DefaultAcsClient(profile);
        SubmitFileRequest request = new SubmitFileRequest();
        request.setFileId(fileId);
        request.setComment("submit file");
        SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);

        // Check the file commit result.
        DeploymentStatus status = null;
        GetDeploymentResponse.Data.Deployment deployment = null;
        int retryTimes = 0;
        while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
            Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
            Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
        }        
        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/doc-detail/378657.html
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));      
        IAcsClient client = new DefaultAcsClient(profile);
        SubmitFileRequest request = new SubmitFileRequest();
        request.setFileId(fileId);
        request.setComment("submit file");
        SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);

        // Check the file commit result.
        DeploymentStatus status = null;
        GetDeploymentResponse.Data.Deployment deployment = null;
        int retryTimes = 0;
        while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
            Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
            Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
        }

In a workspace in standard mode, after you develop and commit a file, you must deploy the file to the scheduling system generate auto triggered tasks. Similar to committing a file, you must perform a round robin for the value returned by the DeployFile operation and the value returned by the GetDeployment operation.

    DeployFileRequest request = new DeployFileRequest();
    request.setFileId(fileId);
    request.setComment("deploy file");
    DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);
    // Check the file deployment result.
    DeploymentStatus status = null;
    GetDeploymentResponse.Data.Deployment deployment = null;
    int retryTimes = 0;
    while (retryTimes < 6) {
            GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);
            GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
            LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
            Assert.assertNotNull(getDeploymentResponse.getData());
            deployment = getDeploymentResponse.getData().getDeployment();
            Assert.assertNotNull(deployment);
            LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
                    deploymentId, new Gson().toJson(deployment));
            Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
            Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
            status = Enums.find(DeploymentStatus.class, deployment.getStatus());
            Assert.assertNotNull(status);
            if (status.finished()) {
                LOGGER.info("Deployment finished - FinalStatus[{}]", status);
                break;
            }
            LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
            retryTimes++;
            SleepUtils.seconds(10L);
    }
Note

You can also call the DataStudio API operations to manage folders, resources, and functions.

Step 3: Configure parameters for O&M and monitoring

After you call API operations to generate auto triggered tasks, DataWorks generates instances for the tasks every day, and the instances are scheduled to run at specific points in time. Then, you can call Operation Center API operations to perform O&M operations on the auto triggered tasks and instances generated for the tasks. For example, you can call the GetNode, GetInstance, and ListInstances operations to view the auto triggered tasks and instances generated for the tasks and monitor the status of the instances. Sample code:

        GetInstanceRequest request = new GetInstanceRequest();
        request.setInstanceId(INSTANCE_ID);
        request.setProjectEnv(PROJECT_ENV);
        try {
            GetInstanceResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
            BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
            Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
            Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
            Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
            Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
            Assert.assertEquals("", bizInstanceDto.getParamValues());
            Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
            Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
            Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
            Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
            Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }

If an instance is not run as expected, you can call the RestartInstance, SetSuccessInstance, SuspendInstance, or ResumeInstance operation to handle the exception. You can call API operations such as CreateRemind or UpdateRemind to create a custom alert rule. This way, baseline data can be generated every day. If an exception occurs, an alert notification is sent to an alert contact.

        CreateRemindRequest createRemindRequest = new CreateRemindRequest();
        createRemindRequest.setRemindName("REMIND_CREATE_TEST");
        createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
        createRemindRequest.setRemindType(RemindType.ERROR.name());
        createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
        createRemindRequest.setDndEnd("08:00");
        createRemindRequest.setNodeIds("-1");
        createRemindRequest.setMaxAlertTimes(1);
        createRemindRequest.setAlertInterval(1800);
        createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
        createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
        try { 
            CreateRemindResponse createResponse = client.getAcsResponse(createRemindRequest);
            MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
            Assert.assertTrue(createResponse.getData() > 0);
        } catch (Exception ex) {
            ex.printStackTrace();
            return;
        }
Note

Operation Center provides API operations for auto triggered tasks, manually triggered workflows, baseline queries, and configuration and queries of alert rules.

Step 4: Configure a data quality monitoring rule

In the business scenario in this topic, you can call the preceding API operations to synchronize data from an RDS database to a partitioned MaxCompute table at a scheduled point in time every day. You can call Data Quality API operations to prevent the generation of dirty data or missing of data from affecting your business. When a data quality monitoring rule is triggered by a table data generation exception, a notification is sent to the person that subscribed to the rule at the earliest opportunity.

        CreateQualityRuleRequest request = new CreateQualityRuleRequest();
        request.setBlockType(0);
        request.setComment("test-createTemplateRuleSuccess");
        request.setCriticalThreshold("50");
        request.setEntityId(entityId);
        request.setOperator("abs");
        request.setPredictType(0);
        request.setProjectName(PROJECT_NAME);
        request.setProperty("table_count");
        request.setPropertyType("table");
        request.setRuleName("createTemplateRuleSuccess");
        request.setRuleType(0);
        request.setTemplateId(7);
        request.setWarningThreshold("10");
        try {
            CreateQualityRuleResponse response = client.getAcsResponse(request);
            Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
            Long templateRuleId = Long.parseLong(data.toString());
            Assert.assertTrue(templateRuleId > 0);
            return templateRuleId;
        } catch (Exception e) {
            e.printStackTrace();
            Assert.assertFalse(true);
            return null;
        }
Note

You can call Data Quality API operations, such as CreateQualityRule, GetQualityFollower, and CreateQualityRelativeNode, to manage data quality monitoring rules. For more information about Data Quality API operations, see the documentation.

Step 5: Generate a DataService Studio API

After you call the API operations in the DataWorks service modules to complete the process of synchronizing data from an RDS database to a partitioned MaxCompute table, you can call the DataService Studio API operations to generate a DataService Studio API by using the data in the partitioned MaxCompute table. The generated DataService Studio API can be used to provide the DataService Studio service for DataWorks users.

        CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
        createRequest.setTenantId(tenantId);
        createRequest.setProjectId(projectId);
        createRequest.setApiMode(apiMode);
        createRequest.setApiName(apiName);
        createRequest.setApiPath(apiPath);
        createRequest.setApiDescription("test");
        createRequest.setGroupId(groupId);
        createRequest.setVisibleRange(visibleRange);
        createRequest.setTimeout(10000);
        createRequest.setProtocols(protocols);
        createRequest.setRequestMethod(requestMethod);
        createRequest.setResponseContentType(responseType);

        CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);
        Long apiId = createResponse.getData();
        Assert.assertNotNull(apiId);

        GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
        getRequest.setTenantId(tenantId);
        getRequest.setProjectId(projectId);
        getRequest.setApiId(apiId);
        GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);
        GetDataServiceApiResponse.Data data = getResponse.getData();
        Assert.assertEquals(apiId, data.getApiId());
        Assert.assertEquals(0L, data.getFolderId().longValue());

You can call the CreateDataServiceApi and PublishDataServiceApi operations to convert table data into DataService Studio APIs. The end-to-end data generation is complete. The preceding API operations help achieve seamless integration between your on-premises system and DataWorks.