本文介绍如何使用OpenAPI实现从MySQL数据源到Hologres目标源的整库实时数据同步。通过本文,您将学会如何创建、启动、监控、停止以及删除数据集成任务,确保数据在两个系统间高效准确地同步流转,并掌握妥善管理这些任务的方法,包括适时停止与彻底删除不再需要的任务。
前提条件
已创建Maven项目,详情请参见Create a new Maven project。
已创建同步任务需要的MySQL数据源和Hologres数据源,详情请参见MySQL数据源和Hologres数据源。
配置环境依赖及账号认证
配置Maven依赖。
打开Maven项目下的pom.xml文件,添加如下内容。
<dependency> <groupId>com.aliyun</groupId> <artifactId>dataworks_public20200518</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-openapi</artifactId> <version>0.3.2</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-console</artifactId> <version>0.0.1</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-util</artifactId> <version>0.2.21</version> </dependency>
说明以上
dataworks_public20200518
依赖可通过Maven中央存储库获取。配置阿里云账号AccessKey环境变量,用于调用接口时身份认证。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。
操作步骤
步骤一:创建数据集成任务
调用CreateDIJob
接口,创建数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见CreateDIJob。
public static Long createDIJob()throws Exception{
System.out.println("create job started ...");
CreateDIJobRequest request = new CreateDIJobRequest();
// 基础环境配置
request.setProjectId(3058L);
request.setJobName("api"+System.currentTimeMillis());
request.setMigrationType("FullAndRealtimeIncremental");
request.setSourceDataSourceType("MySQL");
request.setDestinationDataSourceType("Hologres");
// 数据源配置
CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
srcDatasourceSetting.setDataSourceName("dw_mysql_online");
Map<String, String> props = new HashMap<>();
props.put("TimeZone", "Asia/Shanghai");
props.put("Encoding", "utf-8");
srcDatasourceSetting.setDataSourceProperties(props);
request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
dstDatasourceSettings.setDataSourceName("dw_holo_test");
request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));
// 资源组配置
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings();
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
request.setResourceSettings(resourceSettings);
// 表映射
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings();
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
datasource.setObjectType("Datasource");
datasource.setExpression("dw_mysql_online");
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
database.setObjectType("Database");
database.setExpression("cx_db_1");
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
table.setObjectType("Table");
table.setExpression("cx_table_1");
tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
request.setTableMappings(Arrays.asList(tableMapping));
// 执行
com.aliyun.dataworks_public20200518.models.CreateDIJobResponse createDIJobResponse = createClient().createDIJob(request);
System.out.println("create job finished, response is...");
System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
System.out.println("create job finished, response job id is" + createDIJobResponse.getBody().getDIJobId());
return createDIJobResponse.getBody().getDIJobId();
}
记录返回参数中的数据集成任务 IDDIJobId
,可用于任务后续的启动、查看、停止、删除等操作。
步骤二:启动数据集成任务
调用StartDIJob
接口,启动数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见StartDIJob。
public static void startJob(Long jobId) throws Exception {
StartDIJobRequest start = new StartDIJobRequest();
start.setDIJobId(jobId);
StartDIJobResponse response = createClient().startDIJob(start);
System.out.println("start job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
(可选)步骤三:查看数据集成任务
启动任务后,可以定期检查任务的状态,以确保同步过程按预期进行。您可通过调用GetDIJob
接口,查看数据集成任务状态。以下代码仅展示了部分参数的配置,更多详细参数请参见GetDIJob。
public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
System.out.println("get job started, jobId=" + jobId);
// 获取详情
GetDIJobRequest request = new GetDIJobRequest();
request.setDIJobId(jobId);
request.setWithDetails(detail);
GetDIJobResponse response = createClient().getDIJob(request);
System.out.println("get job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
return response.getBody().getData();
}
其他步骤
停止数据集成任务
如需停止数据集成任务。您可以通过调用StopDIJob
接口,停止数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见StopDIJob。
public static void stopJob(Long jobId) throws Exception {
System.out.println("stop job started, jobId=" + jobId);
StopDIJobRequest stop = new StopDIJobRequest();
stop.setDIJobId(jobId);
StopDIJobResponse response = createClient().stopDIJob(stop);
System.out.println("stop job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
删除数据集成任务
任务下线,您可以通过调用DeleteDIJob
接口,来删除下线数据集成任务。以下代码仅展示了部分参数的配置,更多详细参数请参见DeleteDIJob。
public static void deleteJob(Long jobId) throws Exception {
System.out.println("delete job started, jobId=" + jobId);
DeleteDIJobRequest request = new DeleteDIJobRequest();
request.setDIJobId(jobId);
DeleteDIJobResponse response = createClient().deleteDIJob(request);
System.out.println("delete job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
Sample代码
您可以根据以下提供的POM依赖信息和JAVA SDK示例代码,完成从MySQL到Hologres的整库实时同步任务的创建。
创建POM依赖
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dataworks_public20200518</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-openapi</artifactId>
<version>0.3.2</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-console</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>tea-util</artifactId>
<version>0.2.21</version>
</dependency>
调用Java SDK示例代码
package com.aliyun.sample;
import com.aliyun.tea.*;
import com.aliyun.dataworks_public20200518.*;
import com.aliyun.dataworks_public20200518.models.*;
public class Sample {
private static final String AK_ID = "XX";
private static final String AK_SECRET = "XXX";
private static final String ENDPOINT = "dataworks.cn-shanghai.aliyuncs.com";
public static Client createClient() throws Exception {
Config config = new Config();
config.setAccessKeyId(AK_ID);
config.setAccessKeySecret(AK_SECRET);
config.setEndpoint(ENDPOINT);
Client client = new Client(config);
return client;
}
public static Long createDIJob()throws Exception{
System.out.println("create job started ...");
CreateDIJobRequest request = new CreateDIJobRequest();
// 基础环境配置
request.setProjectId(3058L);
request.setJobName("api"+System.currentTimeMillis());
request.setMigrationType("FullAndRealtimeIncremental");
request.setSourceDataSourceType("MySQL");
request.setDestinationDataSourceType("Hologres");
// 数据源配置
CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings srcDatasourceSetting = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestSourceDataSourceSettings();
srcDatasourceSetting.setDataSourceName("dw_mysql_online");
Map<String, String> props = new HashMap<>();
props.put("TimeZone", "Asia/Shanghai");
props.put("Encoding", "utf-8");
srcDatasourceSetting.setDataSourceProperties(props);
request.setSourceDataSourceSettings(Arrays.asList(srcDatasourceSetting));
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings dstDatasourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestDestinationDataSourceSettings();
dstDatasourceSettings.setDataSourceName("dw_holo_test");
request.setDestinationDataSourceSettings(Arrays.asList(dstDatasourceSettings));
// 资源组配置
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings resourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettings();
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings offlineResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsOfflineResourceSettings();
offlineResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setOfflineResourceSettings(offlineResourceSettings);
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings realtimeResourceSettings = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestResourceSettingsRealtimeResourceSettings();
realtimeResourceSettings.setResourceGroupIdentifier("S_res_group_195820716552192_1695299272182");
resourceSettings.setRealtimeResourceSettings(realtimeResourceSettings);
request.setResourceSettings(resourceSettings);
// 表映射
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings tableMapping = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappings();
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules datasource = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
datasource.setObjectType("Datasource");
datasource.setExpression("dw_mysql_online");
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules database = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
database.setObjectType("Database");
database.setExpression("cx_db_1");
com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules table = new com.aliyun.dataworks_public20200518.models.CreateDIJobRequest.CreateDIJobRequestTableMappingsSourceObjectSelectionRules();
table.setObjectType("Table");
table.setExpression("cx_table_1");
tableMapping.setSourceObjectSelectionRules(Arrays.asList(datasource, database, table));
request.setTableMappings(Arrays.asList(tableMapping));
// 执行
com.aliyun.dataworks_public20200518.models.CreateDIJobResponse createDIJobResponse = createClient().createDIJob(request);
System.out.println("create job finished, response is...");
System.out.println(new Gson().toJson(createDIJobResponse.getBody()));
System.out.println("create job finished, response job id is" + createDIJobResponse.getBody().getDIJobId());
return createDIJobResponse.getBody().getDIJobId();
}
public static void startJob(Long jobId) throws Exception {
StartDIJobRequest start = new StartDIJobRequest();
start.setDIJobId(jobId);
StartDIJobResponse response = createClient().startDIJob(start);
System.out.println("start job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
public static void stopJob(Long jobId) throws Exception {
System.out.println("stop job started, jobId=" + jobId);
StopDIJobRequest stop = new StopDIJobRequest();
stop.setDIJobId(jobId);
StopDIJobResponse response = createClient().stopDIJob(stop);
System.out.println("stop job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
public static void deleteJob(Long jobId) throws Exception {
System.out.println("delete job started, jobId=" + jobId);
DeleteDIJobRequest request = new DeleteDIJobRequest();
request.setDIJobId(jobId);
DeleteDIJobResponse response = createClient().deleteDIJob(request);
System.out.println("delete job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
}
public static GetDIJobResponseBody.GetDIJobResponseBodyData getJob(Long jobId, boolean detail) throws Exception {
System.out.println("get job started, jobId=" + jobId);
// 获取详情
GetDIJobRequest request = new GetDIJobRequest();
request.setDIJobId(jobId);
request.setWithDetails(detail);
GetDIJobResponse response = createClient().getDIJob(request);
System.out.println("get job finished, response is...");
System.out.println(new Gson().toJson(response.getBody()));
return response.getBody().getData();
}
public static void main(String[] args) throws Exception {
try {
// 创建任务
Long jobId = createDIJob();
// 查看任务
getJob(jobId, true);
// 启动任务
startJob(jobId);
Thread.sleep(100000);
//停止任务
stopJob(13308L);
Thread.sleep(10000);
//删除任务
deleteJob(13308L);
} catch (Exception e){
}
}
}