DataHub数据源作为数据中枢,为您提供读取和写入DataHub数据库的双向通道,能够快速解决海量数据的计算问题。本文为您介绍DataWorks的DataHub数据同步的能力支持情况。
支持的版本
DataHub Reader通过DataHub的Java SDK读取DataHub中的数据,具体使用的Java SDK版本,如下所示。
<dependency>
<groupId>com.aliyun.DataHub</groupId>
<artifactId>aliyun-sdk-DataHub</artifactId>
<version>2.9.1</version>
</dependency>
DataHub Writer通过DataHub服务的Java SDK向DataHub写入数据,使用的日志服务Java SDK版本如下。
<dependency>
<groupId>com.aliyun.datahub</groupId>
<artifactId>aliyun-sdk-datahub</artifactId>
<version>2.5.1</version>
</dependency>
使用限制
离线读写
STRING字符串仅支持UTF-8编码,单个STRING列最长允许1MB。
实时读写
全增量实时写
运行同步任务后,生成的离线同步任务将全量数据写入DataHub,待全量数据执行完成后,启动实时同步任务,将源端增量数据实时同步至目标端。数据写入格式如下:
支持的字段类型
DataHub同步数据时,会根据DataHub Field的数据类型同步到对应的数据类型中,DataHub仅支持BIGINT、STRING、BOOLEAN、DOUBLE、TIMESTAMP、DECIMAL数据类型。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源,详细的配置参数解释可在配置界面查看对应参数的文案提示。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
单表离线同步任务配置指导
单表、整库实时同步任务配置指导
单表、整库全增量(实时)同步配置指导
操作流程请参见数据集成侧同步任务配置。
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
{
"type":"job",
"version":"2.0",//版本号
"steps":[
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "xxx" //DataHub的endpoint。
"accessId": "xxx", //访问DataHub的用户accessId。
"accessKey": "xxx", //访问DataHub的用户accessKey。
"project": "xxx", //目标DataHub的项目名称。
"topic": "xxx" //目标DataHub的topic名称。
"batchSize": 1000, //一次读取的数据量。
"beginDateTime": "20180910111214", //数据消费的开始时间位点。
"endDateTime": "20180910111614", //数据消费的结束时间位点。
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1,//并发数
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本参数
参数 | 描述 | 是否必选 |
endpoint | DataHub的endpoint。 | 是 |
accessId | 访问DataHub的用户accessId。 | 是 |
accessKey | 访问DataHub的用户accessKey。 | 是 |
project | 目标DataHub的项目名称。project是DataHub中的资源管理单元,用于资源隔离和控制。 | 是 |
topic | 目标DataHub的topic名称。 | 是 |
batchSize | 一次读取的数据量,默认为1,024条。 | 否 |
beginDateTime | 数据消费的开始时间位点。该参数是时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。
说明 beginDateTime和endDateTime需要互相组合配套使用。 | 是 |
endDateTime | 数据消费的结束时间位点。该参数是时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。
说明 beginDateTime和endDateTime需要互相组合配套使用。 | 是 |
Writer脚本Demo
{
"type": "job",
"version": "2.0",//版本号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",//插件名。
"parameter": {
"datasource": "",//数据源。
"topic": "",//Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。
"maxRetryCount": 500,//任务失败的重试的最多次数。
"maxCommitSize": 1048576//待积累的数据Buffer大小达到maxCommitSize大小(单位Byte)时,批量提交至目的端。
//datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条数据的数据总量来从侧方面进行单次写入datahub的数据条数控制。比如每条数据10 k,那么此参数的设置值要低于10*10000 k。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""//错误记录数。
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":20, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
accessId | DataHub的accessId。 | 是 | 无 |
accessKey | DataHub的accessKey。 | 是 | 无 |
endPoint | 对DataHub资源的访问请求,需要根据资源所属服务,选择正确的域名。 | 是 | 无 |
maxRetryCount | 任务失败的最多重试次数。 | 否 | 无 |
mode | Value是STRING类型时,写入的模式。 | 是 | 无 |
parseContent | 解析内容。 | 是 | 无 |
project | 项目(Project)是DataHub数据的基本组织单元,一个Project下包含多个Topic。
说明 DataHub的项目空间与MaxCompute的项目相互独立,您在MaxCompute中创建的项目不能复用于DataHub,需要单独创建。 | 是 | 无 |
topic | Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。 | 是 | 无 |
maxCommitSize | 为提高写出效率,DataX会积累Buffer数据,待积累的数据大小达到maxCommitSize 大小(单位Byte)时,批量提交到目的端。默认是1,048,576,即1 MB数据。另外datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条的数据总量来从侧方面进行写入datahub的数据条数控制。 | 否 | 1MB |