本文為您介紹如何使用OpenAPI建立、修改、刪除Data Integration同步任務,同步來源端資料至去向端。
前提條件
使用限制
調用CreateDISyncTask建立Data Integration同步任務,僅支援使用指令碼模式配置同步任務內容,詳情請參見通過指令碼模式配置離線同步任務。
DataWorks暫不支援使用OpenAPI建立商務程序,您需要使用現有的商務程序建立資料同步任務。
配置環境依賴及帳號認證
配置Maven依賴。
開啟Maven專案下的pom.xml檔案,添加
aliyun-java-sdk-core
。<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.20</version> </dependency>
開啟Maven專案下的pom.xml檔案,添加
aliyun-java-sdk-dataworks-public
。<dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>3.3.18</version> </dependency>
開啟Maven專案,添加
credentials-java
(推薦使用Credentials最新版本)。<dependency> <groupId>com.aliyun</groupId> <artifactId>credentials-java</artifactId> <version>0.2.11</version> </dependency>
說明通過阿里雲Credentials工具託管AK,關於Credentials工具使用說明,詳情請參見:身分識別驗證配置。
用戶端認證。
使用OpenAPI建立資料同步任務前,需要調用如下語句對登入阿里雲的帳號相關資訊進行認證。如果阿里雲的帳號資訊認證通過,則繼續執行後續任務,如果認證不通過,則該調用會報錯,您需要根據實際報錯處理相關問題。
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
配置流程
完成上述配置環境依賴及帳號認證後,您可以通過OpenAPI調用相關介面,建立資料同步任務,同步來源端資料至去向端。配置流程如下:
配置步驟
建立Data Integration任務。
調用CreateDISyncTask介面,建立Data Integration任務。如下代碼僅樣本部分參數的配置,更多參數詳情請參見CreateDISyncTask。
public void createFile() throws ClientException{ CreateDISyncTaskRequest request = new CreateDISyncTaskRequest(); request.setProjectId(181565L); request.setTaskType("DI_OFFLINE"); request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"same表comment\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_source\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"same表comment\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}"); request.setTaskParam("{\"FileFolderPath\":\"商務程序/new_biz/Data Integration\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}"); request.setTaskName("new_di_task_0607_1416"); String regionId = "cn-hangzhou"; // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659 IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com"); IAcsClient client; client = new DefaultAcsClient(profile); CreateDISyncTaskResponse response1 = client.getAcsResponse(request); Gson gson1 = new Gson(); System.out.println(gson1.toJson(response1)); }
調用UpdateFile介面,更新調度參數。
請求參數配置詳情如下表所示:
名稱
類型
是否必選
樣本值
描述
Action
String
是
UpdateFile
要執行的操作。
FileFolderPath
String
否
商務程序/第一個商務程序/Data Integration/檔案夾1/檔案夾2
檔案所在的路徑。
ProjectId
Long
否
10000
DataWorks工作空間的ID。您可以登入DataWorks控制台,進入工作空間管理頁面擷取ID。
FileName
String
否
ods_user_info_d
檔案的名稱。您可以通過重新設定FileName的值來修改檔案名稱。
例如,使用ListFiles介面查詢目標目錄下的檔案ID,通過UpdateFile介面,輸入查詢的檔案ID至FileId參數,並配置FileName的參數值,即可修改相應檔案的名稱。
FileDescription
String
否
這裡是檔案描述
檔案的描述。
Content
String
否
SELECT "1";
檔案代碼內容,不同代碼類型(fileType)的檔案,代碼格式不同。您可以在營運中心,按右鍵對應類型的任務,選擇查看代碼,查看具體的代碼格式。
AutoRerunTimes
Integer
是
3
檔案出錯後,自動重跑的次數。
AutoRerunIntervalMillis
Integer
否
120000
出錯自動重跑時間間隔,單位為毫秒。最大為1800000毫秒(30分鐘)。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>時間屬性>出錯自動重跑”的“重跑間隔”配置對應。
控制台中“重跑間隔”的時間單位為分鐘,請在調用時注意轉換時間。
RerunMode
String
否
ALL_ALLOWED
重跑屬性。取值如下:
ALL_ALLOWED:運行成功或失敗後皆可重跑。
FAILURE_ALLOWED:運行成功後不可重跑,運行失敗後可以重跑。
ALL_DENIED:運行成功或失敗皆不可重跑。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>時間屬性>重跑屬性”配置內容對應。
Stop
Boolean
否
false
是否暫停調度,取值如下:
true:暫停調度。
false:不暫停調度。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>時間屬性>調度類型”配置為“暫停調度”時對應。
ParaValue
String
否
x=a y=b z=c
調度參數。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>參數”對應。您可以參考調度參數配置。
StartEffectDate
Long
否
936923400000
開始自動調度的毫秒時間戳記。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>時間屬性>生效日期”配置的開始時間的毫秒時間戳記對應。
EndEffectDate
Long
否
4155787800000
停止自動調度的時間戳記,單位為毫秒。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>時間屬性>生效日期”配置的結束時間的毫秒時間戳記對應。
CronExpress
String
否
00 00-59/5 1-23 * * ?
周期調度的cron運算式,該參數與DataWorks控制台中,資料開發工作單位的“調度配置>時間屬性>cron運算式”對應。配置完“調度周期”及“定時調度時間”後,DataWorks會自動產生相應cron運算式。
樣本如下:
每天淩晨5點30分定時調度:
00 30 05 * * ?
。每個小時的第15分鐘定時調度:
00 15 * * * ?
。每隔十分鐘調度一次:
00 00/10 * * * ?
。每天8點到17點,每隔十分鐘調度一次:
00 00-59/10 8-23 * * * ?
。每月的1日0點20分自動調度:
00 20 00 1 * ?
。從1月1日0點10分開始,每過3個月調度一次:
00 10 00 1 1-12/3 ?
。每周二、周五的0點5分自動調度:
00 05 00 * * 2,5
。
由於DataWorks調度系統的規則,cron運算式有以下限制:
最短調度間隔時間為5分鐘。
每天最早調度時間為0點5分。
CycleType
String
否
NOT_DAY
調度周期的類型,包括NOT_DAY(分鐘、小時)和DAY(日、周、月)。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>時間屬性>調度周期”對應。
DependentType
String
否
USER_DEFINE
依賴上一周期的方式。取值如下:
SELF:依賴項選擇本節點。
CHILD:依賴項選擇一級子節點。
USER_DEFINE:依賴項選擇其他節點。
NONE:未選擇依賴項,即不會依賴上一周期。
DependentNodeIdList
String
否
5,10,15,20
當DependentType參數配置為USER_DEFINE時,用於設定當前檔案具體依賴的節點ID。依賴多個節點時,使用英文逗號(,)分隔。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>調度依賴”配置為“跨周期依賴(原上一周期)”後,依賴項選擇“其他節點”時配置的內容對應。
InputList
String
否
project_root,project.file1,project.001_out
檔案依賴的上遊檔案的輸出名稱。依賴多個輸出時,使用英文逗號(,)分隔。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>調度依賴”中的“上遊節點輸出名”對應。
說明通過CreateDISyncTask和UpdateFile方式建立離線同步任務時此參數必須配置。
ProjectIdentifier
String
否
dw_project
DataWorks工作空間的名稱。您可以登入DataWorks控制台,進入工作空間配置頁面擷取工作空間名稱。
該參數與ProjectId參數,二者必須設定其一,用來確定本次API叫用作業的DataWorks工作空間。
FileId
Long
是
100000001
檔案的ID。您可以調用ListFiles介面擷取檔案ID。
OutputList
String
否
dw_project.ods_user_info_d
檔案的輸出。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>調度依賴”中的“本節點輸出名稱”對應。
ResourceGroupIdentifier
String
否
default_group
檔案發布成任務後,任務執行時對應的資源群組。您可以調用ListResourceGroups擷取工作空間可用的資源群組列表。
ConnectionName
String
否
odps_source
檔案對應任務執行時,任務使用的資料來源標識符。您可以調用ListDataSources擷取可用的資料來源列表。
Owner
String
否
18023848927592
檔案所有者的使用者ID。
AutoParsing
Boolean
否
true
檔案是否開啟自動解析功能。取值如下:
true:檔案會自動解析代碼。
false:檔案不會自動解析代碼。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>調度依賴”中的“從代碼解析輸入輸出”對應。
SchedulerType
String
否
NORMAL
調度的類型,取值如下:
NORMAL:正常調度任務。
MANUAL:手動任務,不會被日常調度,對應手動商務程序下的節點。
PAUSE:暫停任務。
SKIP:空跑任務,被日常調度,但啟動調度時直接被置為成功。
AdvancedSettings
String
否
{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}
任務的進階配置。
該參數與DataWorks控制台中,EMR Spark Streaming和EMR Streaming SQL資料開發工作單位,編輯頁面右側導覽列的“進階設定”對應。
當前僅EMR Spark Streaming和EMR Streaming SQL任務支援配置該參數,並且參數為JSON格式。
StartImmediately
Boolean
否
true
發布後是否立即啟動。取值如下:
true:發布後立即啟動。
false:發布後暫不啟動。
該參數與DataWorks控制台中,EMR Spark Streaming和EMR Streaming SQL資料開發工作單位,編輯頁面右側導覽列的“配置>時間屬性>啟動方式”對應。
InputParameters
String
否
[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]
節點的上下文輸入參數。參數為JSON格式,包含的欄位可參考GetFile介面傳回值中的InputContextParameterList參數結構。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>節點上下文>本節點輸入參數”對應。
OutputParameters
String
否
[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]
節點的上下文輸出參數。參數為JSON格式,包含的欄位可參考GetFile介面傳回值中的OutputContextParameterList參數結構。
該參數與DataWorks控制台中,資料開發工作單位的“調度配置>節點上下文>本節點輸出參數”對應。
提交Data Integration任務。
調用SubmitFile介面,提交Data Integration任務至調度系統的開發環境。任務提交後,Response會返回deploymentId,您可以調用GetDeployment介面,通過deploymentId擷取本次發布包的詳細資料。
public void submitFile() throws ClientException{ SubmitFileRequest request = new SubmitFileRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); // 此節點ID為建立節點時返回的ID,對應資料庫File表的file_id。 request.setFileId(501576542L); request.setComment("備忘"); SubmitFileResponse acsResponse = client.getAcsResponse(request); //DeploymentId為提交或發布的傳回值。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.toString()); }
上述代碼僅樣本部分參數的配置,更多參數詳情請參見SubmitFile、GetDeployment。
發布同步任務到生產環境。
調用DeployFile介面,發布Data Integration同步任務至生產環境。
說明僅標準模式的工作空間涉及執行該發佈動作。
public void deploy() throws ClientException{ DeployFileRequest request = new DeployFileRequest(); request.setProjectIdentifier("zxy_8221431"); request.setFileId(501576542L); request.setComment("備忘"); //NodeId和file_id二選一。NodeId的值為調度配置中基礎屬性的節點ID。 request.setNodeId(700004537241L); DeployFileResponse acsResponse = client.getAcsResponse(request); //DeploymentId為提交或發布的傳回值。 Long deploymentId = acsResponse.getData(); log.info(acsResponse.getData().toString()); }
上述代碼僅樣本部分參數的配置,更多參數詳情請參見DeployFile。
擷取發布包詳情。
任務發布後,Response會返回deploymentId,您可以調用GetDeployment介面,通過deploymentId擷取本次發布包的詳細資料。當GetDeployment介面的返回參數Status取值為1時,則表示此次發布成功。
public void getDeployment() throws ClientException{ GetDeploymentRequest request = new GetDeploymentRequest(); request.setProjectId(78837L); request.setProjectIdentifier("zxy_8221431"); //DeploymentId為提交或發布的傳回值。調用GetDeployment介面,擷取本次發布的具體情況。 request.setDeploymentId(2776067L); GetDeploymentResponse acsResponse = client.getAcsResponse(request); log.info(acsResponse.getData().toString()); }
上述代碼僅樣本部分參數的配置,更多參數詳情請參見GetDeployment。
修改同步任務的相關配置
成功建立Data Integration同步任務後,您可以調用UpdateDISyncTask介面更新任務的Content,或通過TaskParam來更新使用的獨享資源群組。更新後,您需要重新提交、發布同步任務,詳情請參見配置流程。
刪除同步任務
建立Data Integration同步任務後,您可以調用DeleteFile介面刪除同步任務。
Sample代碼
POM 依賴
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.20</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>3.4.1</version>
</dependency>
Java Sdk調用代碼
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;
public class createofflineTask {
static Long createTask(String fileName) throws Exception {
Long projectId = 2043L;
String taskType = "DI_OFFLINE";
String taskContent = "{\n" +
" \"type\": \"job\",\n" +
" \"version\": \"2.0\",\n" +
" \"steps\": [\n" +
" {\n" +
" \"stepType\": \"mysql\",\n" +
" \"parameter\": {\n" +
" \"envType\": 0,\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"connection\": [\n" +
" {\n" +
" \"datasource\": \"mysql_autotest_dev\",\n" +
" \"table\": [\n" +
" \"user\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"where\": \"\",\n" +
" \"splitPk\": \"\",\n" +
" \"encoding\": \"UTF-8\"\n" +
" },\n" +
" \"name\": \"Reader\",\n" +
" \"category\": \"reader\"\n" +
" },\n" +
" {\n" +
" \"stepType\": \"odps\",\n" +
" \"parameter\": {\n" +
" \"partition\": \"pt=${bizdate}\",\n" +
" \"truncate\": true,\n" +
" \"datasource\": \"odps_source\",\n" +
" \"envType\": 0,\n" +
" \"column\": [\n" +
" \"id\",\n" +
" \"name\"\n" +
" ],\n" +
" \"emptyAsNull\": false,\n" +
" \"tableComment\": \"null\",\n" +
" \"table\": \"user\"\n" +
" },\n" +
" \"name\": \"Writer\",\n" +
" \"category\": \"writer\"\n" +
" }\n" +
" ],\n" +
" \"setting\": {\n" +
" \"executeMode\": null,\n" +
" \"errorLimit\": {\n" +
" \"record\": \"\"\n" +
" },\n" +
" \"speed\": {\n" +
" \"concurrent\": 2,\n" +
" \"throttle\": false\n" +
" }\n" +
" },\n" +
" \"order\": {\n" +
" \"hops\": [\n" +
" {\n" +
" \"from\": \"Reader\",\n" +
" \"to\": \"Writer\"\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
request.setProjectId(projectId);
request.setTaskType(taskType);
request.setTaskContent(taskContent);
request.setTaskName(fileName);
request.setTaskParam("{\"FileFolderPath\":\"商務程序/自動化測試空間_勿動/Data Integration\",\"ResourceGroup\":\"S_res_group_XXX\"}");
// 使用Data Integration獨享資源群組,
CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
return response1.getData().getFileId();
}
public static void updateFile(Long fileId) throws Exception {
UpdateFileRequest request = new UpdateFileRequest();
request.setProjectId(2043L);
request.setFileId(fileId);
request.setAutoRerunTimes(3);
request.setRerunMode("FAILURE_ALLOWED");
request.setCronExpress("00 30 05 * * ?");
request.setCycleType("DAY");
request.setResourceGroupIdentifier("S_res_group_XXX");
// 使用調度獨享資源群組
request.setInputList("dataworks_di_autotest_root");
request.setAutoParsing(true);
request.setDependentNodeIdList("5,10,15,20");
request.setDependentType("SELF");
request.setStartEffectDate(0L);
request.setEndEffectDate(4155787800000L);
request.setFileDescription("description");
request.setStop(false);
request.setParaValue("x=a y=b z=c");
request.setSchedulerType("NORMAL");
request.setAutoRerunIntervalMillis(120000);
UpdateFileResponse response1 = client.getAcsResponse(request);
}
static void deleteTask(Long fileId) throws Exception {
DeleteFileRequest request = new DeleteFileRequest();
Long projectId = 63845L;
request.setProjectId(projectId);
request.setFileId(fileId);
String akId = "XXX";
String akSecret = "XXXX";
String regionId = "cn-hangzhou";
IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
IAcsClient client;
client = new DefaultAcsClient(profile);
DeleteFileResponse response1 = client.getAcsResponse(request);
System.out.println(JSONObject.toJSONString(response1));
}
static IAcsClient client;
public static void main(String[] args) throws Exception {
String akId = "XX";
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.https://www.alibabacloud.com/help/zh/alibaba-cloud-sdk-262060/latest/configure-credentials-378659
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");
client = new DefaultAcsClient(profile);
String taskName = "offline_job_0930_1648";
Long fileId = createTask(taskName); // 建立Data Integration任務
updateFile(fileId); // 修改Data Integration任務的調度屬性
}
}