DataHub資料來源作為資料中樞,為您提供讀取和寫入DataHub資料庫的雙向通道,能夠快速解決海量資料的計算問題。本文為您介紹DataWorks的DataHub資料同步的能力支援情況。
支援的版本
DataHub Reader通過DataHub的Java SDK讀取DataHub中的資料,具體使用的Java SDK版本,如下所示。
<dependency>
<groupId>com.aliyun.DataHub</groupId>
<artifactId>aliyun-sdk-DataHub</artifactId>
<version>2.9.1</version>
</dependency>
DataHub Writer通過DataHub服務的Java SDK向DataHub寫入資料,使用的Log ServiceJava SDK版本如下。
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.5.1</version>
</dependency>
使用限制
離線讀寫
STRING字串僅支援UTF-8編碼,單個STRING列最長允許1MB。
即時讀寫
全增量即時寫
運行同步任務後,產生的離線同步任務將全量資料寫入DataHub,待全量資料執行完成後,啟動即時同步任務,將源端增量資料即時同步至目標端。資料寫入格式如下:
支援的欄位類型
DataHub同步資料時,會根據DataHub Field的資料類型同步到對應的資料類型中,DataHub僅支援BIGINT、STRING、BOOLEAN、DOUBLE、TIMESTAMP、DECIMAL資料類型。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
{
"type":"job",
"version":"2.0",//版本號碼
"steps":[
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "xxx" //DataHub的endpoint。
"accessId": "xxx", //訪問DataHub的使用者accessId。
"accessKey": "xxx", //訪問DataHub的使用者accessKey。
"project": "xxx", //目標DataHub的專案名稱。
"topic": "xxx" //目標DataHub的topic名稱。
"batchSize": 1000, //一次讀取的資料量。
"beginDateTime": "20180910111214", //資料消費的開始時間位點。
"endDateTime": "20180910111614", //資料消費的結束時間位點。
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting":{
"errorLimit":{
"record":"0"//錯誤記錄數
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//並發數
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader指令碼參數
參數 | 描述 | 是否必選 |
endpoint | DataHub的endpoint。 | 是 |
accessId | 訪問DataHub的使用者accessId。 | 是 |
accessKey | 訪問DataHub的使用者accessKey。 | 是 |
project | 目標DataHub的專案名稱。project是DataHub中的資源嵌入式管理單元,用於資源隔離和控制。 | 是 |
topic | 目標DataHub的topic名稱。 | 是 |
batchSize | 一次讀取的資料量,預設為1,024條。 | 否 |
beginDateTime | 資料消費的開始時間位點。該參數是時間範圍(左閉右開)的左邊界,yyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。
說明 beginDateTime和endDateTime需要互相組合配套使用。 | 是 |
endDateTime | 資料消費的結束時間位點。該參數是時間範圍(左閉右開)的右邊界,yyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。
說明 beginDateTime和endDateTime需要互相組合配套使用。 | 是 |
Writer指令碼Demo
{
"type": "job",
"version": "2.0",//版本號碼。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",//外掛程式名。
"parameter": {
"datasource": "",//資料來源。
"topic": "",//Topic是DataHub訂閱和發布的最小單位,您可以用Topic來表示一類或者一種流資料。
"maxRetryCount": 500,//任務失敗的重試的最多次數。
"maxCommitSize": 1048576//待積累的資料Buffer大小達到maxCommitSize大小(單位Byte)時,批量提交至目的端。
//datahub側對於一次request請求寫入的資料條數限制是10000條,超出10000條資料會超出限制導致任務出錯,請根據您單條資料平均資料量*10000條資料的資料總量來從側方面進行單次寫入datahub的資料條數控制。比如每條資料10 k,那麼此參數的設定值要低於10*10000 k。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""//錯誤記錄數。
},
"speed": {
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":20, //作業並發數。
"mbps":"12"//限流,此處1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
accessId | DataHub的accessId。 | 是 | 無 |
accessKey | DataHub的accessKey。 | 是 | 無 |
endPoint | 對DataHub資源的訪問請求,需要根據資源所屬服務,選擇正確的網域名稱。 | 是 | 無 |
maxRetryCount | 任務失敗的最多重試次數。 | 否 | 無 |
mode | Value是STRING類型時,寫入的模式。 | 是 | 無 |
parseContent | 解析內容。 | 是 | 無 |
project | 專案(Project)是DataHub資料的基主要組織單元,一個Project下包含多個Topic。
說明 DataHub的專案空間與MaxCompute的專案相互獨立,您在MaxCompute中建立的專案不能複用於DataHub,需要單獨建立。 | 是 | 無 |
topic | Topic是DataHub訂閱和發布的最小單位,您可以用Topic來表示一類或者一種流資料。 | 是 | 無 |
maxCommitSize | 為提高寫出效率,DataX會積累Buffer資料,待積累的資料大小達到maxCommitSize 大小(單位Byte)時,批量提交到目的端。預設是1,048,576,即1 MB資料。另外datahub側對於一次request請求寫入的資料條數限制是10000條,超出10000條資料會超出限制導致任務出錯,請根據您單條資料平均資料量*10000條的資料總量來從側方面進行寫入datahub的資料條數控制。 | 否 | 1MB |