全部產品
Search
文件中心

DataWorks:DataHub資料來源

更新時間:Jun 19, 2024

DataHub資料來源作為資料中樞,為您提供讀取和寫入DataHub資料庫的雙向通道,能夠快速解決海量資料的計算問題。本文為您介紹DataWorks的DataHub資料同步的能力支援情況。

支援的版本

  • DataHub Reader通過DataHub的Java SDK讀取DataHub中的資料,具體使用的Java SDK版本,如下所示。

    <dependency>
        <groupId>com.aliyun.DataHub</groupId>
        <artifactId>aliyun-sdk-DataHub</artifactId>
        <version>2.9.1</version>
    </dependency>
  • DataHub Writer通過DataHub服務的Java SDK向DataHub寫入資料,使用的Log ServiceJava SDK版本如下。

    <dependency>
        <groupId>com.aliyun.datahub</groupId>
        <artifactId>aliyun-sdk-datahub</artifactId>
        <version>2.5.1</version>
    </dependency>

使用限制

離線讀寫

STRING字串僅支援UTF-8編碼,單個STRING列最長允許1MB。

即時讀寫

  • 即時資料同步任務僅支援使用獨享Data Integration資源群組

  • 即時同步至DataHub時,按源端校正雜湊值,同一個雜湊值的資料會同步到同一個shard中。

全增量即時寫

運行同步任務後,產生的離線同步任務將全量資料寫入DataHub,待全量資料執行完成後,啟動即時同步任務,將源端增量資料即時同步至目標端。資料寫入格式如下:

  • 僅支援將資料寫入DataHub Tuple類型的Topic中。關於DataHub TUPLE資料類型說明,詳情請參見:資料類型介紹

  • 即時同步至DataHub會在源表欄位基礎上,新增5個附加欄位,並支援您在配任務配置時,自行添加額外的欄位。最終發送給DataHub的訊息格式,詳情請參見:附錄:DataHub訊息格式

支援的欄位類型

DataHub同步資料時,會根據DataHub Field的資料類型同步到對應的資料類型中,DataHub僅支援BIGINTSTRINGBOOLEANDOUBLETIMESTAMPDECIMAL資料類型。

資料同步任務開發

DataHub資料同步任務的配置入口和通用配置流程指導可參見下文的配置指導,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。

建立資料來源

在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見建立並管理資料來源

單表離線同步任務配置指導

單表、整庫即時同步任務配置指導

操作流程請參見DataStudio側即時同步任務配置

說明

DataHub不同資料類型對應操作的支援情況,不同資料類型的分區策略、資料格式及相關訊息樣本。詳情請參見:附錄:DataHub訊息格式

單表、整庫全增量(即時)同步配置指導

操作流程請參見Data Integration側同步任務配置

常見問題

寫入DataHub時,一次性寫入資料超限導致寫入失敗如何處理?

附錄:指令碼Demo與參數說明

附錄:離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要在任務指令碼中按照指令碼的統一格式要求編寫指令碼中的reader參數和writer參數,指令碼模式的統一要求請參見通過指令碼模式配置離線同步任務,以下為您介紹指令碼模式下的資料來源的Reader參數和Writer參數的指導詳情。

Reader指令碼Demo

{
    "type":"job",
    "version":"2.0",//版本號碼
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" //DataHub的endpoint。
                        "accessId": "xxx", //訪問DataHub的使用者accessId。
                        "accessKey": "xxx", //訪問DataHub的使用者accessKey。
                        "project": "xxx", //目標DataHub的專案名稱。
                        "topic": "xxx" //目標DataHub的topic名稱。
                        "batchSize": 1000, //一次讀取的資料量。
                        "beginDateTime": "20180910111214", //資料消費的開始時間位點。
                        "endDateTime": "20180910111614", //資料消費的結束時間位點。
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"//錯誤記錄數
        },
        "speed":{
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":1,//並發數
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader指令碼參數

參數

描述

是否必選

endpoint

DataHub的endpoint

accessId

訪問DataHub的使用者accessId

accessKey

訪問DataHub的使用者accessKey

project

目標DataHub的專案名稱。project是DataHub中的資源嵌入式管理單元,用於資源隔離和控制。

topic

目標DataHub的topic名稱。

batchSize

一次讀取的資料量,預設為1,024條。

beginDateTime

資料消費的開始時間位點。該參數是時間範圍(左閉右開)的左邊界,yyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。

說明

beginDateTimeendDateTime需要互相組合配套使用。

endDateTime

資料消費的結束時間位點。該參數是時間範圍(左閉右開)的右邊界,yyyyMMddHHmmss格式的時間字串,可以和DataWorks的調度時間參數配合使用。

說明

beginDateTimeendDateTime需要互相組合配套使用。

Writer指令碼Demo

{
    "type": "job",
    "version": "2.0",//版本號碼。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",//外掛程式名。
            "parameter": {
                "datasource": "",//資料來源。
                "topic": "",//Topic是DataHub訂閱和發布的最小單位,您可以用Topic來表示一類或者一種流資料。
                "maxRetryCount": 500,//任務失敗的重試的最多次數。
                "maxCommitSize": 1048576//待積累的資料Buffer大小達到maxCommitSize大小(單位Byte)時,批量提交至目的端。
                 //datahub側對於一次request請求寫入的資料條數限制是10000條,超出10000條資料會超出限制導致任務出錯,請根據您單條資料平均資料量*10000條資料的資料總量來從側方面進行單次寫入datahub的資料條數控制。比如每條資料10 k,那麼此參數的設定值要低於10*10000 k。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//錯誤記錄數。
        },
        "speed": {
            "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
            "concurrent":20, //作業並發數。
            "mbps":"12"//限流,此處1mbps = 1MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer指令碼參數

參數

描述

是否必選

預設值

accessId

DataHub的accessId

accessKey

DataHub的accessKey

endPoint

對DataHub資源的訪問請求,需要根據資源所屬服務,選擇正確的網域名稱。

maxRetryCount

任務失敗的最多重試次數。

mode

Value是STRING類型時,寫入的模式。

parseContent

解析內容。

project

專案(Project)是DataHub資料的基主要組織單元,一個Project下包含多個Topic。

說明

DataHub的專案空間與MaxCompute的專案相互獨立,您在MaxCompute中建立的專案不能複用於DataHub,需要單獨建立。

topic

Topic是DataHub訂閱和發布的最小單位,您可以用Topic來表示一類或者一種流資料。

maxCommitSize

為提高寫出效率,DataX會積累Buffer資料,待積累的資料大小達到maxCommitSize 大小(單位Byte)時,批量提交到目的端。預設是1,048,576,即1 MB資料。另外datahub側對於一次request請求寫入的資料條數限制是10000條,超出10000條資料會超出限制導致任務出錯,請根據您單條資料平均資料量*10000條的資料總量來從側方面進行寫入datahub的資料條數控制。

1MB