全部产品
Search
文档中心

大数据开发治理平台 DataWorks:DataHub数据源

更新时间:Oct 23, 2024

DataHub数据源作为数据中枢,为您提供读取和写入DataHub数据库的双向通道,能够快速解决海量数据的计算问题。本文为您介绍DataWorks的DataHub数据同步的能力支持情况。

支持的版本

  • DataHub Reader通过DataHub的Java SDK读取DataHub中的数据,具体使用的Java SDK版本,如下所示。

    <dependency>
        <groupId>com.aliyun.DataHub</groupId>
        <artifactId>aliyun-sdk-DataHub</artifactId>
        <version>2.9.1</version>
    </dependency>
  • DataHub Writer通过DataHub服务的Java SDK向DataHub写入数据,使用的日志服务Java SDK版本如下。

    <dependency>
        <groupId>com.aliyun.datahub</groupId>
        <artifactId>aliyun-sdk-datahub</artifactId>
        <version>2.5.1</version>
    </dependency>

使用限制

离线读写

STRING字符串仅支持UTF-8编码,单个STRING列最长允许1MB。

实时读写

全增量实时写

运行同步任务后,生成的离线同步任务将全量数据写入DataHub,待全量数据执行完成后,启动实时同步任务,将源端增量数据实时同步至目标端。数据写入格式如下:

  • 仅支持将数据写入DataHub Tuple类型的Topic中。关于DataHub TUPLE数据类型说明,详情请参见:数据类型介绍

  • 实时同步至DataHub会在源表字段基础上,新增5个附加字段,并支持您在配任务配置时,自行添加额外的字段。最终发送给DataHub的消息格式,详情请参见:附录:DataHub消息格式

支持的字段类型

DataHub同步数据时,会根据DataHub Field的数据类型同步到对应的数据类型中,DataHub仅支持BIGINTSTRINGBOOLEANDOUBLETIMESTAMPDECIMAL数据类型。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

单表、整库实时同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置

说明

DataHub不同数据类型对应操作的支持情况,不同数据类型的分片策略、数据格式及相关消息示例。详情请参见:附录:DataHub消息格式

单表、整库全增量(实时)同步配置指导

操作流程请参见数据集成侧同步任务配置

常见问题

写入DataHub时,一次性写入数据超限导致写入失败如何处理?

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
    "type":"job",
    "version":"2.0",//版本号
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" //DataHub的endpoint。
                        "accessId": "xxx", //访问DataHub的用户accessId。
                        "accessKey": "xxx", //访问DataHub的用户accessKey。
                        "project": "xxx", //目标DataHub的项目名称。
                        "topic": "xxx" //目标DataHub的topic名称。
                        "batchSize": 1000, //一次读取的数据量。
                        "beginDateTime": "20180910111214", //数据消费的开始时间位点。
                        "endDateTime": "20180910111614", //数据消费的结束时间位点。
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,//并发数
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader脚本参数

参数

描述

是否必选

endpoint

DataHub的endpoint

accessId

访问DataHub的用户accessId

accessKey

访问DataHub的用户accessKey

project

目标DataHub的项目名称。project是DataHub中的资源管理单元,用于资源隔离和控制。

topic

目标DataHub的topic名称。

batchSize

一次读取的数据量,默认为1,024条。

beginDateTime

数据消费的开始时间位点。该参数是时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。

说明

beginDateTimeendDateTime需要互相组合配套使用。

endDateTime

数据消费的结束时间位点。该参数是时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。

说明

beginDateTimeendDateTime需要互相组合配套使用。

Writer脚本Demo

{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",//插件名。
            "parameter": {
                "datasource": "",//数据源。
                "topic": "",//Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。
                "maxRetryCount": 500,//任务失败的重试的最多次数。
                "maxCommitSize": 1048576//待积累的数据Buffer大小达到maxCommitSize大小(单位Byte)时,批量提交至目的端。
                 //datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条数据的数据总量来从侧方面进行单次写入datahub的数据条数控制。比如每条数据10 k,那么此参数的设置值要低于10*10000 k。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//错误记录数。
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":20, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer脚本参数

参数

描述

是否必选

默认值

accessId

DataHub的accessId

accessKey

DataHub的accessKey

endPoint

对DataHub资源的访问请求,需要根据资源所属服务,选择正确的域名。

maxRetryCount

任务失败的最多重试次数。

mode

Value是STRING类型时,写入的模式。

parseContent

解析内容。

project

项目(Project)是DataHub数据的基本组织单元,一个Project下包含多个Topic。

说明

DataHub的项目空间与MaxCompute的项目相互独立,您在MaxCompute中创建的项目不能复用于DataHub,需要单独创建。

topic

Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。

maxCommitSize

为提高写出效率,DataX会积累Buffer数据,待积累的数据大小达到maxCommitSize 大小(单位Byte)时,批量提交到目的端。默认是1,048,576,即1 MB数据。另外datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条的数据总量来从侧方面进行写入datahub的数据条数控制。

1MB