Use API operations to create and manage a real-time synchronization task used to synchronize data from a MySQL database to Hologres

Updated at: 2024-10-12 09:54

This topic describes how to use API operations to create and manage a real-time synchronization task used to synchronize data from a MySQL database to Hologres. You can follow the instructions provided in this topic to learn how to create, start, view, stop, and delete a real-time synchronization task. This helps you synchronize data between two systems in an efficient and accurate manner and master the methods to manage synchronization tasks. For example, you can stop and delete synchronization tasks that are no longer required.

Prerequisites

Configure environment dependencies and perform account authentication

  • Configure Maven dependencies.

    Open the pom.xml file of your Maven project and add the following content to the file:

    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>dataworks_public20200518</artifactId>
      <version>5.6.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>tea-openapi</artifactId>
      <version>0.3.2</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>tea-console</artifactId>
      <version>0.0.1</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>tea-util</artifactId>
      <version>0.2.21</version>
    </dependency>
    Note

    You can obtain the dataworks_public20200518 dependency in the preceding code from the Maven central repository.

  • Configure environment variables for an Alibaba Cloud AccessKey pair. The environment variables are used for identity authentication when you call API operations. For more information, see Configure environment variables in Linux, macOS, and Windows.

Procedure

Step 1: Create a real-time synchronization task

Call the CreateDIJob operation to create a real-time synchronization task. The following sample code shows only the configuration of some parameters. For information about more parameters, see CreateDIJob.

public static Long createDIJob()throws Exception{
        System.out.println("create job started ...");
        CreateDIJobRequest request = new CreateDIJobRequest();
        // Configuration of basic environments
        request.setProjectId(3058L);
        request.setJobName("api"+System.currentTimeMillis());
        request.setMigrationType("FullAndRealtimeIncremental");
        request.setSourceDataSourceType("MySQL");
        request.setDestinationDataSourceType("Hologres");

        // Configuration of data sources
        CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
        srcDatasourceSetting.setDataSourceName("dw_mysql_online");
        Map<String, String> props = new HashMap<>();
        props.put("TimeZone", "Asia/Shanghai");
        props.put("Encoding", "utf-8");
        srcDatasourceSetting.setDataSourceProperties(props);
        request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
        dstDatasourceSettings.setDataSourceName("dw_holo_test");
        request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));

        // Configuration of a resource group
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
        offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
        realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
        request.setResourceSettings(resourceSettings);

        // Table mappings
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        datasource.setObjectType("Datasource");
        datasource.setExpression("dw_mysql_online");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        database.setObjectType("Database");
        database.setExpression("cx_db_1");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        table.setObjectType("Table");
        table.setExpression("cx_table_1");
        tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
        request.setTableMappings(Arrays.asList(tableMapping));

        // Execution
        com.aliyun.dataworks_public20200518.models.CreateDIJobResponse createDIJobResponse = createClient().createDIJob(request);
        System.out.println("create job finished, response is...");
        System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
        System.out.println("create job finished, response job id is" + createDIJobResponse.getBody().getDIJobId());
        return createDIJobResponse.getBody().getDIJobId();
    }
Note

You can record the value of the DIJobId parameter in the response parameters. The DIJobId parameter indicates the ID of the real-time synchronization task. The ID is required when you start, view, stop, or delete the real-time synchronization task.

Step 2: Start the real-time synchronization task

Call the StartDIJob operation to start the real-time synchronization task. The following sample code shows only the configuration of some parameters. For information about more parameters, see StartDIJob.

public static void startJob(Long jobId) throws Exception {
        StartDIJobRequest start = new StartDIJobRequest();
        start.setDIJobId(jobId);
        StartDIJobResponse response = createClient().startDIJob(start);
        System.out.println("start job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

Step 3 (Optional): View the real-time synchronization task

After you start the real-time synchronization task, you can periodically check the status of the task to ensure that data is synchronized as expected. You can call the GetDIJob operation to view the status of the real-time synchronization task. The following sample code shows only the configuration of some parameters. For information about more parameters, see GetDIJob.

public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
        System.out.println("get job started, jobId=" + jobId);
        // Obtain details.
        GetDIJobRequest request = new GetDIJobRequest();
        request.setDIJobId(jobId);
        request.setWithDetails(detail);
        GetDIJobResponse response = createClient().getDIJob(request);
        System.out.println("get job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
        return response.getBody().getData();
    }

Other operations

Stop the real-time synchronization task

If you want to stop the real-time synchronization task, you can call the StopDIJob operation. The following code shows only the configuration of some parameters. For information about more parameters, see StopDIJob.

public static void stopJob(Long jobId) throws Exception {
        System.out.println("stop job started, jobId=" + jobId);
        StopDIJobRequest stop = new StopDIJobRequest();
        stop.setDIJobId(jobId);
        StopDIJobResponse response = createClient().stopDIJob(stop);
        System.out.println("stop job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

Delete the real-time synchronization task

If you want to delete the real-time synchronization task after the task is undeployed, you can call the DeleteDIJob operation. The following sample code shows only the configuration of some parameters. For information about more parameters, see DeleteDIJob.

public static void deleteJob(Long jobId) throws Exception {
        System.out.println("delete job started, jobId=" + jobId);
        DeleteDIJobRequest request = new DeleteDIJobRequest();
        request.setDIJobId(jobId);
        DeleteDIJobResponse response = createClient().deleteDIJob(request);
        System.out.println("delete job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

Sample code

You can create a real-time synchronization task used to synchronize data from a MySQL database to Hologres based on the following Project Object Model (POM) dependency information and sample code for calling an SDK for Java.

POM dependencies
Sample code for calling an SDK for Java
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>dataworks_public20200518</artifactId>
  <version>5.6.0</version>
</dependency>
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>tea-openapi</artifactId>
  <version>0.3.2</version>
</dependency>
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>tea-console</artifactId>
  <version>0.0.1</version>
</dependency>
<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>tea-util</artifactId>
  <version>0.2.21</version>
</dependency>
package com.aliyun.sample;

import com.aliyun.tea.*;

import com.aliyun.dataworks_public20200518.*;
import com.aliyun.dataworks_public20200518.models.*;

public class Sample {

    private static final String AK_ID = "XX";
    private static final String AK_SECRET = "XXX";
    private static final String ENDPOINT = "dataworks.cn-shanghai.aliyuncs.com";
    public static Client createClient() throws Exception {
        Config config = new Config();
        config.setAccessKeyId(AK_ID);
        config.setAccessKeySecret(AK_SECRET);
        config.setEndpoint(ENDPOINT);
        Client client = new Client(config);
        return client;
    }


    public static Long createDIJob()throws Exception{
        System.out.println("create job started ...");
        CreateDIJobRequest request = new CreateDIJobRequest();
        // Configuration of basic environments
        request.setProjectId(3058L);
        request.setJobName("api"+System.currentTimeMillis());
        request.setMigrationType("FullAndRealtimeIncremental");
        request.setSourceDataSourceType("MySQL");
        request.setDestinationDataSourceType("Hologres");

        // Configuration of data sources
        CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
        srcDatasourceSetting.setDataSourceName("dw_mysql_online");
        Map<String, String> props = new HashMap<>();
        props.put("TimeZone", "Asia/Shanghai");
        props.put("Encoding", "utf-8");
        srcDatasourceSetting.setDataSourceProperties(props);
        request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
        dstDatasourceSettings.setDataSourceName("dw_holo_test");
        request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));

        // Configuration of a resource group
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
        offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
        realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
        resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
        request.setResourceSettings(resourceSettings);

        // Table mappings
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings();
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        datasource.setObjectType("Datasource");
        datasource.setExpression("dw_mysql_online");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        database.setObjectType("Database");
        database.setExpression("cx_db_1");
        com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
        table.setObjectType("Table");
        table.setExpression("cx_table_1");
        tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
        request.setTableMappings(Arrays.asList(tableMapping));


        // Execution
        com.aliyun.dataworks_public20200518.models.CreateDIJobResponse createDIJobResponse = createClient().createDIJob(request);
        System.out.println("create job finished, response is...");
        System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
        System.out.println("create job finished, response job id is" + createDIJobResponse.getBody().getDIJobId());
        return createDIJobResponse.getBody().getDIJobId();
    }

    public static void startJob(Long jobId) throws Exception {
        StartDIJobRequest start = new StartDIJobRequest();
        start.setDIJobId(jobId);
        StartDIJobResponse response = createClient().startDIJob(start);
        System.out.println("start job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }
    public static void stopJob(Long jobId) throws Exception {
        System.out.println("stop job started, jobId=" + jobId);
        StopDIJobRequest stop = new StopDIJobRequest();
        stop.setDIJobId(jobId);
        StopDIJobResponse response = createClient().stopDIJob(stop);
        System.out.println("stop job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }
    public static void deleteJob(Long jobId) throws Exception {
        System.out.println("delete job started, jobId=" + jobId);
        DeleteDIJobRequest request = new DeleteDIJobRequest();
        request.setDIJobId(jobId);
        DeleteDIJobResponse response = createClient().deleteDIJob(request);
        System.out.println("delete job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
    }

    public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
        System.out.println("get job started, jobId=" + jobId);
        // Obtain details.
        GetDIJobRequest request = new GetDIJobRequest();
        request.setDIJobId(jobId);
        request.setWithDetails(detail);
        GetDIJobResponse response = createClient().getDIJob(request);
        System.out.println("get job finished, response is...");
        System.out.println(new Gson().toJson(response.getBody()));
        return response.getBody().getData();
    }
    public static void main(String[] args) throws Exception {
        try {
            // Create a task.
            Long jobId = createDIJob(); 
            // View the task.
            getJob(jobId, true);   
            // Start the task.   
            startJob(jobId);          
            Thread.sleep(100000);   
            // Stop the task.  
            stopJob(13308L);          
            Thread.sleep(10000); 
            // Delete the task.
            deleteJob(13308L);        

        } catch (Exception e){
        }
    }
}

  • On this page (1, T)
  • Prerequisites
  • Configure environment dependencies and perform account authentication
  • Procedure
  • Step 1: Create a real-time synchronization task
  • Step 2: Start the real-time synchronization task
  • Step 3 (Optional): View the real-time synchronization task
  • Other operations
  • Stop the real-time synchronization task
  • Delete the real-time synchronization task
  • Sample code
Feedback
phone Contact Us

Chat now with Alibaba Cloud Customer Service to assist you in finding the right products and services to meet your needs.

alicare alicarealicarealicare