全部产品
Search
文档中心

大数据开发治理平台 DataWorks:通过OpenAPI创建MySQL到Hologres的整库实时同步任务

更新时间:Sep 24, 2024

本文介绍如何使用OpenAPI实现从MySQL数据源到Hologres目标源的整库实时数据同步。通过本文,您将学会如何创建、启动、监控、停止以及删除数据集成任务,确保数据在两个系统间高效准确地同步流转,并掌握妥善管理这些任务的方法,包括适时停止与彻底删除不再需要的任务。

前提条件

配置环境依赖及账号认证

  • 配置Maven依赖。

    打开Maven项目下的pom.xml文件,添加如下内容。

    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>dataworks_public20200518</artifactId>
      <version>5.6.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>tea-openapi</artifactId>
      <version>0.3.2</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>tea-console</artifactId>
      <version>0.0.1</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>tea-util</artifactId>
      <version>0.2.21</version>
    </dependency>
    说明

    以上dataworks_public20200518依赖可通过Maven中央存储库获取。

  • 配置阿里云账号AccessKey环境变量,用于调用接口时身份认证。具体操作,请参见在Linux、macOS和Windows系统配置环境变量

操作步骤

步骤一:创建数据集成任务

调用CreateDIJob接口,创建数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见CreateDIJob

public static Long createDIJob()throws Exception{
        System.out.println("create job started ...");
        CreateDIJobRequest request = new CreateDIJobRequest();
        // 基础环境配置
        request.setProjectId(3058L);
        request.setJobName("api"+System.currentTimeMillis());
        request.setMigrationType("FullAndRealtimeIncremental");
        request.setSourceDataSourceType("MySQL");
        request.setDestinationDataSourceType("Hologres");

        // 数据源配置
        CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
        srcDatasourceSetting.setDataSourceName("dw_mysql_online");
        Map<String, String> props = new HashMap<>();
        props.put("TimeZone", "Asia/Shanghai");
        props.put("Encoding", "utf-8");
        srcDatasourceSetting.setDataSourceProperties(props);
        request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
        dstDatasourceSettings.setDataSourceName("dw_holo_test");
        request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));

        // 资源组配置
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
        offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
        realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
        request.setResourceSettings(resourceSettings);

        // 表映射
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        datasource.setObjectType("Datasource");
        datasource.setExpression("dw_mysql_online");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        database.setObjectType("Database");
        database.setExpression("cx_db_1");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        table.setObjectType("Table");
        table.setExpression("cx_table_1");
        tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
        request.setTableMappings(Arrays.asList(tableMapping));

        // 执行
        com.aliyun.dataworks_public20200518.models.CreateDIJobResponse createDIJobResponse = createClient().createDIJob(request);
        System.out.println("create job finished, response is...");
        System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
        System.out.println("create job finished, response job id is" + createDIJobResponse.getBody().getDIJobId());
        return createDIJobResponse.getBody().getDIJobId();
    }
说明

记录返回参数中的数据集成任务 IDDIJobId,可用于任务后续的启动、查看、停止、删除等操作。

步骤二:启动数据集成任务

调用StartDIJob接口,启动数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见StartDIJob

public static void startJob(Long jobId) throws Exception {
        StartDIJobRequest start = new StartDIJobRequest();
        start.setDIJobId(jobId);
        StartDIJobResponse response = createClient().startDIJob(start);
        System.out.println("start job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

(可选)步骤三:查看数据集成任务

启动任务后,可以定期检查任务的状态,以确保同步过程按预期进行。您可通过调用GetDIJob接口,查看数据集成任务状态。以下代码仅展示了部分参数的配置,更多详细参数请参见GetDIJob

public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
        System.out.println("get job started, jobId=" + jobId);
        // 获取详情
        GetDIJobRequest request = new GetDIJobRequest();
        request.setDIJobId(jobId);
        request.setWithDetails(detail);
        GetDIJobResponse response = createClient().getDIJob(request);
        System.out.println("get job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
        return response.getBody().getData();
    }

其他步骤

停止数据集成任务

如需停止数据集成任务。您可以通过调用StopDIJob接口,停止数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见StopDIJob

public static void stopJob(Long jobId) throws Exception {
        System.out.println("stop job started, jobId=" + jobId);
        StopDIJobRequest stop = new StopDIJobRequest();
        stop.setDIJobId(jobId);
        StopDIJobResponse response = createClient().stopDIJob(stop);
        System.out.println("stop job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

删除数据集成任务

任务下线,您可以通过调用DeleteDIJob接口,来删除下线数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见DeleteDIJob

public static void deleteJob(Long jobId) throws Exception {
        System.out.println("delete job started, jobId=" + jobId);
        DeleteDIJobRequest request = new DeleteDIJobRequest();
        request.setDIJobId(jobId);
        DeleteDIJobResponse response = createClient().deleteDIJob(request);
        System.out.println("delete job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

Sample代码

您可以根据以下提供的POM依赖信息和JAVA SDK示例代码,完成从MySQL到Hologres的整库实时同步任务的创建。

创建POM依赖

<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>dataworks_public20200518</artifactId>
  <version>5.6.0</version>
</dependency>
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>tea-openapi</artifactId>
  <version>0.3.2</version>
</dependency>
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>tea-console</artifactId>
  <version>0.0.1</version>
</dependency>
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>tea-util</artifactId>
  <version>0.2.21</version>
</dependency>

调用Java SDK示例代码

package com.aliyun.sample;

import com.aliyun.tea.*;

import com.aliyun.dataworks_public20200518.*;
import com.aliyun.dataworks_public20200518.models.*;

public class Sample {

    private static final String AK_ID = "XX";
    private static final String AK_SECRET = "XXX";
    private static final String ENDPOINT = "dataworks.cn-shanghai.aliyuncs.com";
    public static Client createClient() throws Exception {
        Config config = new Config();
        config.setAccessKeyId(AK_ID);
        config.setAccessKeySecret(AK_SECRET);
        config.setEndpoint(ENDPOINT);
        Client client = new Client(config);
        return client;
    }


    public static Long createDIJob()throws Exception{
        System.out.println("create job started ...");
        CreateDIJobRequest request = new CreateDIJobRequest();
        // 基础环境配置
        request.setProjectId(3058L);
        request.setJobName("api"+System.currentTimeMillis());
        request.setMigrationType("FullAndRealtimeIncremental");
        request.setSourceDataSourceType("MySQL");
        request.setDestinationDataSourceType("Hologres");

        // 数据源配置
        CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
        srcDatasourceSetting.setDataSourceName("dw_mysql_online");
        Map<String, String> props = new HashMap<>();
        props.put("TimeZone", "Asia/Shanghai");
        props.put("Encoding", "utf-8");
        srcDatasourceSetting.setDataSourceProperties(props);
        request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
        dstDatasourceSettings.setDataSourceName("dw_holo_test");
        request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));

        // 资源组配置
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
        offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
        realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
        request.setResourceSettings(resourceSettings);

        // 表映射
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        datasource.setObjectType("Datasource");
        datasource.setExpression("dw_mysql_online");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        database.setObjectType("Database");
        database.setExpression("cx_db_1");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        table.setObjectType("Table");
        table.setExpression("cx_table_1");
        tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
        request.setTableMappings(Arrays.asList(tableMapping));


        // 执行
        com.aliyun.dataworks_public20200518.models.CreateDIJobResponse createDIJobResponse = createClient().createDIJob(request);
        System.out.println("create job finished, response is...");
        System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
        System.out.println("create job finished, response job id is" + createDIJobResponse.getBody().getDIJobId());
        return createDIJobResponse.getBody().getDIJobId();
    }

    public static void startJob(Long jobId) throws Exception {
        StartDIJobRequest start = new StartDIJobRequest();
        start.setDIJobId(jobId);
        StartDIJobResponse response = createClient().startDIJob(start);
        System.out.println("start job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }
    public static void stopJob(Long jobId) throws Exception {
        System.out.println("stop job started, jobId=" + jobId);
        StopDIJobRequest stop = new StopDIJobRequest();
        stop.setDIJobId(jobId);
        StopDIJobResponse response = createClient().stopDIJob(stop);
        System.out.println("stop job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }
    public static void deleteJob(Long jobId) throws Exception {
        System.out.println("delete job started, jobId=" + jobId);
        DeleteDIJobRequest request = new DeleteDIJobRequest();
        request.setDIJobId(jobId);
        DeleteDIJobResponse response = createClient().deleteDIJob(request);
        System.out.println("delete job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

    public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
        System.out.println("get job started, jobId=" + jobId);
        // 获取详情
        GetDIJobRequest request = new GetDIJobRequest();
        request.setDIJobId(jobId);
        request.setWithDetails(detail);
        GetDIJobResponse response = createClient().getDIJob(request);
        System.out.println("get job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
        return response.getBody().getData();
    }
    public static void main(String[] args) throws Exception {
        try {
            // 创建任务
            Long jobId = createDIJob(); 
            // 查看任务
            getJob(jobId, true);   
            // 启动任务   
            startJob(jobId);          
            Thread.sleep(100000);   
            //停止任务  
            stopJob(13308L);          
            Thread.sleep(10000); 
            //删除任务
            deleteJob(13308L);        

        } catch (Exception e){
        }
    }
}