全部产品
Search
文档中心

大数据开发治理平台 DataWorks:Tablestore数据源

更新时间:Oct 23, 2024

表格存储Tablestore是构建在阿里云飞天分布式系统之上的NoSQL数据存储服务,Tablestore数据源为您提供读取和写入Tablestore双向通道的功能,本文为您介绍DataWorks的Tablestore数据同步的能力支持情况。

使用限制

  • Tablestore Reader和Writer插件实现了从Tablestore读取和写入数据,包含行模式列模式两种数据读取与写入方式,可针对宽表与时序表进行数据读取与写入。

    • 列模式:在Tablestore多版本模型下,表中的数据组织为 > > 版本三级的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多个版本,每个版本都有一个特定的时间戳(版本号)。列模式会将数据导出为(主键值,列名,时间戳,列值)的四元组格式,列模式下导入的数据也是(主键值,列名,时间戳,列值)的四元组格式。

    • 行模式:该模式将用户每次更新的记录,抽取成行的形式导出,即(主键值,列值)的格式。

      行模式下每一行数据对应TableStore表中的一条数据。写入行模式的数据包含主键列列值、普通列列值两部分。

  • Tablestore列由主键列primaryKey+普通列column组成,源端列顺序需要和Tablestore目的端主键列+普通列保持一致,否则会产生列映射错误。

  • Tablestore Reader会根据一张表中待读取的数据的范围,按照数据同步并发的数目N,将范围等分为N份Task。每个Task都会有一个Tablestore Reader线程来执行。

支持的字段类型

目前Tablestore Reader和Tablestore Writer支持所有Tablestore类型,其针对Tablestore类型的转换列表,如下所示。

类型分类

Tablestore数据类型

整数类

INTEGER

浮点类

DOUBLE

字符串类

STRING

布尔类

BOOLEAN

二进制类

BINARY

说明
  • Tablestore本身不支持日期型类型。应用层通常使用Long保存时间的Unix TimeStamp。

  • 您需要将INTEGER类型的数据,在脚本模式中配置为INT类型,DataWorks会将其转换为INTEGER类型。如果您直接配置为INTEGER类型,日志将会报错,导致任务无法顺利完成。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

附录一:Reader脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

行模式读取宽表配置

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"ots",//插件名。
            "parameter":{
                "datasource":"",//数据源。
                "newVersion":"true",//使用新版otsreader
                "mode": "normal",// 行模式读取数据
                "isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
                "column":[//字段。
                    {
                        "name":"column1"//字段名。
                    },
                    {
                        "name":"column2"
                    },
                    {
                        "name":"column3"
                    },
                    {
                        "name":"column4"
                    },
                    {
                        "name":"column5"
                    }
                ],
                "range":{
                    "split":[
                        {
                            "type":"STRING",
                            "value":"beginValue"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint1"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint2"
                        },
                        {
                            "type":"STRING",
                            "value":"splitPoint3"
                        },
                        {
                            "type":"STRING",
                            "value":"endValue"
                        }
                    ],
                    "end":[
                        {
                            "type":"STRING",
                            "value":"endValue"
                        },
                        {
                            "type":"INT",
                            "value":"100"
                        },
                        {
                            "type":"INF_MAX"
                        },
                        {
                            "type":"INF_MAX"
                        }
                    ],
                    "begin":[
                        {
                            "type":"STRING",
                            "value":"beginValue"
                        },
                        {
                            "type":"INT",
                            "value":"0"
                        },
                        {
                            "type":"INF_MIN"
                        },
                        {
                            "type":"INF_MIN"
                        }
                    ]
                },
                "table":""//表名。
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
            "concurrent":1 //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

行模式读时序表配置

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"ots",//插件名。
            "parameter":{
                "datasource":"",//数据源。
                "table": "",//表名
                // 读时序数据mode必须为normal
                "mode": "normal",
                // 读时序数据newVersion必须为true
                "newVersion": "true",
                // 配置该表为时序表
                "isTimeseriesTable":"true",
                // measurementName:配置需要读取时序数据的度量名称,非必需,为空则读取全表数据
                "measurementName":"measurement_1",
                "column": [
                  {
                    "name": "_m_name"
                  },
                  {
                    "name": "tagA",
                    "is_timeseries_tag":"true"
                  },
                  {
                    "name": "double_0",
                    "type":"DOUBLE"
                  },
                  {
                    "name": "string_0",
                    "type":"STRING"
                  },
                  {
                    "name": "long_0",
                    "type":"INT"
                  },
                  {
                    "name": "binary_0",
                    "type":"BINARY"
                  },
                  {
                    "name": "bool_0",
                    "type":"BOOL"
                  },
                  {
                    "type":"STRING",
                    "value":"testString"
                  }
                ]
            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
            "concurrent":1 //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

列模式读取宽表配置

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"ots",//插件名。
            "parameter":{
                "datasource":"",//数据源。
                "table":"",//表名
                "newVersion":"true",//新版otsreader
                "mode": "multiversion",//多版本模式
                "column":[//配置需要导出的列名称(必须是非主键列)
                    {"name":"mobile"},
                    {"name":"name"},
                    {"name":"age"},
                    {"name":"salary"},
                    {"name":"marry"}
                ],
                "range":{//导出的范围
                    "begin":[
                        {"type":"INF_MIN"},
                        {"type":"INF_MAX"}
                    ],
                    "end":[
                        {"type":"INF_MAX"},
                        {"type":"INF_MIN"}
                    ],
                    "split":[
                    ]
                },

            },
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
            "concurrent":1 //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader脚本参数通用配置

参数

描述

是否必选

默认值

endpoint

Tablestore Server的EndPoint(服务地址),详情请参见服务地址

accessId

Tablestore的AccessKey ID。

accessKey

Tablestore的AccessKey Secret。

instanceName

Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。

您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。

实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。

table

所选取的需要抽取的表名称,这里有且只能填写一张表。在Tablestore不存在多表同步的需求。

newVersion

定义了使用的Tablestore Reader插件的版本。

  • false:旧版本Tablestore Reader,仅支持行模式读取宽表

  • true:新版本Tablestore Reader,支持行模式列模式时序表宽表

新版Tablestore Reader除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Reader。

新版本插件配置兼容了旧版本插件的配置,即旧任务增加newVersion=true配置之后,可以正常运行。

false

mode

定义了读取数据的模式,当前支持两种模式:

  • normal:行模式读取数据,数据格式为{主键列值,普通列值}。

  • multiVersion:列模式读取数据,数据格式为{主键列,普通列名,时间戳,普通列名对应列值}。

本配置仅在新版Tablestore Reader(newVersion:true)配置下生效。

旧版Tablestore Reader会忽略mode配置,并仅支持行模式读取。

normal

isTimeseriesTable

定义了操作的数据表是否为时序数据表:

  • false:数据表为普通的宽表。

  • true:数据表为时序数据表。

本配置仅在newVersion:true & mode:normal配置下生效。

旧版Tablestore Reader不支持时序表,且时序表无法按列模式读取。

false

Reader脚本参数附加配置

Tablestore Reader支持行模式读取宽表、行模式读取时序表、列模式读取宽表。下面为您介绍各模式的附加配置。

行模式读宽表参数

参数

描述

是否必选

默认值

column

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。由于Tablestore本身是NoSQL系统,在Tablestore Reader抽取数据过程中,必须指定相应的字段名称。

  • 支持普通的列读取,例如{"name":"col1"}。

  • 支持部分列读取,如果您不配置该列,则Tablestore Reader不予读取。

  • 支持常量列读取,例如{"type":"STRING", "value":"DataX"}。使用type描述常量类型,目前支持String、Int、Double、Bool、Binary(使用Base64编码填写)、INF_MIN(Tablestore的系统限定最小值,如果使用该值,您不能填写value属性,否则报错)、INF_MAX(Tablestore的系统限定最大值,如果使用该值,您不能填写value属性,否则报错)。

  • 不支持函数或者自定义表达式,由于Tablestore本身不提供类似SQL的函数或者表达式功能,Tablestore Reader也不能提供函数或表达式列功能。

beginend

beginend配置项用于表示抽取Tablestore表数据的范围。

beginend描述的是Tablestore PrimaryKey的区间分布状态,对于无限大小的区间,您可以使用{"type":"INF_MIN"}{"type":"INF_MAX"}分别指代beginend,其中type表示抽取数据的类型。

说明
  • beginend配置项默认值为[INF_MIN,INF_MAX),即读取全部数据。

  • 当配置的beginend数量小于主键数量时,依据最左主键匹配原则,未配置的主键范围默认为[INF_MIN,INF_MAX)

  • 当只配置了beginend中的一项时,导出的数据范围为[begin, INF_MAX)[INF_MIN, end)

  • 当仅有一个主键列时,beginend配置导出数据遵循左闭右开规则。

  • 当有多个主键列时,最后一个主键列导出遵循左闭右开规则,其他主键列为左右闭区间。

例如,对一张主键为[Hundreds, Tens,Ones]的三主键Tablestore表进行数据抽取,表中的数据的主键分别为:(0,0,0)(0,0,1)(0,0,2)(0,0,3)......(9,9,8)(9,9,9),共1000列。beginend的配置如下所示。

  • 示例一:抽取Hundreds的范围为[3,5]Tens的范围为[4,6]。抽取的数据主键包含(3,4,0)(3,4,1)...(4,4,0),(4,0,1)...(5,6,8)(5,6,9)。配置如下:

    "range": {
          "begin": [
            {"type":"INT", "value":"3"},  //指定抽取Hundreds的最小值。
            {"type":"INT", "value":"4"}  //指定抽取Tens的最小值。
          ],
          "end": [
            {"type":"INT", "value":"5"}, //指定抽取Hundreds的最大值。
            {"type":"INT", "value":"6"} //指定抽取Tens的最大值。
          ]
        }
  • 示例二:抽取Hundreds的范围为[3,5]Tens的范围为[4,6]Ones范围为[5,7)。抽取的数据主键包含(3,4,5)(3,4,6)...(4,4,5),(4,4,6)...(5,6,5)(5,6,6)。配置如下:

    "range": {
          "begin": [
            {"type":"INT", "value":"3"},  //指定抽取Hundreds的最小值。
            {"type":"INT", "value":"4"},  //指定抽取Tens的最小值。
            {"type":"INT", "value":"5"}  //指定抽取Ones的最小值。
          ],
          "end": [
            {"type":"INT", "value":"5"}, //指定抽取Hundreds的最大值。
            {"type":"INT", "value":"6"}, //指定抽取Tens的最大值。
            {"type":"INT", "value":"7"}  //指定抽取Ones的最大值。
          ]
        }

(INF_MIN,INF_MAX)

split

该配置项属于高级配置项,是您自己定义切分配置信息,普通情况下不建议使用。

通过配置split参数可以自己定义分片的数据范围,通常在Tablestore 数据存储发生热点,可以使用自定义的切分规则,以下任务配置为例:

{
  "range": {
    "begin": [{"type": "INF_MIN"}],
    "end":   [{"type": "INF_MAX"}],
    "split": [
      {"type": "STRING","value": "1"},
      {"type": "STRING","value": "2"},
      {"type": "STRING","value": "3"},
      {"type": "STRING","value": "4"},
      {"type": "STRING","value": "5"}
    ]
  }

任务运行时,数据会切分成6个段,并发读取。分段数量建议比任务并发数大。

// 第 1 段
[最小值, 1)
// 第 2 段
[1, 2)
// 第 3 段
[2, 3)
// 第 4 段
[3, 4)
// 第 5 段
[4, 5)
// 第 6 段
[5, 最大值)

当split参数未配置时,使用自动切分逻辑。

自动切分逻辑会找出 Partition Key最大、最小值,并进行均匀分段。

Partition Key支持整型和字符串,整型使用整除来分段,字符串使用第一个字符的unicode码来分段。

行模式读时序表参数

参数

描述

是否必选

默认值

column

column是一个数组,每个元素表示一列,可配置常量列与普通列两种类型。

对于常量列,需要配置以下字段:

  • type:字段值类型,必需配置。目前支持string、int、double、bool、binary。

  • value : 字段值,必需配置。

对于普通列,需要配置以下字段:

  • name :列名,必需配置。以下为预定义的字段。

    • 时间线的'度量名称'使用_m_name标识,数据类型为String。

    • 时间线的'数据源'使用_data_source标识,数据类型为String。

    • 时间线的'标签'使用_tags标识,数据类型为String。

    • 时间线的'时间戳'使用_time标识,数据类型为Long。

  • is_timeseries_tag:该列是否为tags字段内部的键值,非必需配置,默认为false。

  • type:字段值类型,非必需配置,默认为string。目前支持 string、int、double、bool、binary。

读出四列数据的脚本示例:

"column": [
  {
    "name": "_m_name"               // 时间线的度量名称字段
  },
  {
    "name": "tag_key",                // 时间线中标签字段内tag_key对应的value
    "is_timeseries_tag":"true"
  },
  {
    "name": "string_column",        // fields中名称为string_column
    "type":"string"                    // 且数据类型为string的列
  },
  {
    "value": "constant_value",        // 常量列,值固定为"constant_value"
    "type":"string"
  }
],

measurementName

配置需要读取时间线的度量名称。若不配置则读取全表数据。

timeRange

请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为毫秒。格式如下:

"timeRange":{
    // begin:非必选,默认为0,取值范围是0~LONG_MAX
    "begin":1400000000000,
    // end:非必选,默认为Long Max(9223372036854775807L),取值范围是0~LONG_MAX
    "end"  :1600000000000
},

全部版本

列模式读宽表参数

参数

描述

是否必选

默认值

column

指定要导出的列,在列模式下只支持普通列。

格式:

"column": [
    {"name1":"{your column name1}"},
    {"name2":"{your column name2}"}
],
说明
  • 在列模式下,不支持常量列。

  • PK列不能指定,导出4元组中默认包括完整的PK。

  • 不能重复指定列。

所有列

range

读取的数据范围,读取的范围为[begin,end),左闭右开的区间,并且:

  • begin小于end,表示正序读取数据。

  • begin大于end,表示反序读取数据。

  • begin和end不能相等。

type支持的类型有如下几类:

  • string

  • int

  • binary:输入的方式采用二进制的Base64字符串形式传入。

  • INF_MIN:表示无限小。

  • INF_MAX:表示无限大。

格式:

"range":{
    // 可选,默认表示从无限小开始读取
    // 这个值的输入可以填写空数组,或者PK前缀,亦或者完整的PK,在正序读取数据时,默认填充PK后缀为INF_MIN,反序为INF_MAX
    // 例子:
    // 如果用户的表有2个PK,类型分别为string、int,那么如下3种输入都是合法,如:
    //1. 从表的开始位置读取 -> 读取到表的结束为止:
    //"begin":[],"end":[],
    //2. 从第一列主键值为"a",第二列主键的最小值开始读取 -> 读取到第一列主键值为"b",第二列主键的最大值:
    //"begin":[{"type":"string", "value":"a"}],"end":[{"type":"string", "value":"b"}],
    //3. 从第一列主键值为"a",第二列主键的最小值开始读取 -> 读取到表的结束位置:
    //"begin":[{"type":"string", "value":"a"},{"type":"INF_MIN"}],"end":[],    
    //
    // binary类型的PK列比较特殊,因为Json不支持直接输入二进制数,所以系统定义:用户如果要传入
    // 二进制,必须使用(Java)Base64.encodeBase64String方法,将二进制转换为一个可视化的字符串,然后将这个字符串填入value中
    // 例子(Java):
    //   byte[] bytes = "hello".getBytes();  # 构造一个二进制数据,这里使用字符串hello的byte值
    //   String inputValue = Base64.encodeBase64String(bytes) # 调用Base64方法,将二进制转换为可视化的字符串
    //   上面的代码执行之后,可以获得inputValue为"aGVsbG8="
    //   最终写入配置:{"type":"binary","value" : "aGVsbG8="}

    "begin":[{"type":"string", "value":"a"},{"type":"INF_MIN"}],

    // 默认表示读取到无限大结束
    // 这个值得输入可以填写空数组,或者PK前缀,亦或者完整的PK,在正序读取数据时,默认填充PK后缀为INF_MAX,反序为INF_MIN
    // 可选
    "end":[{"type":"string", "value":"g"},{"type":"INF_MAX"}],

    // 当前用户数据较多时,需要开启并发导出,Split可以将当前范围的数据按照切分点切分为多个并发任务
    // 可选
    //   1. split中的输入值只能PK的第一列(分片建),且值的类型必须和PartitionKey一致
    //   2. 值的范围必须在begin和end之间
    //   3. split内部的值必须根据begin和end的正反序关系而递增或者递减
    "split":[{"type":"string", "value":"b"}, {"type":"string", "value":"c"}]
},

全部数据

timeRange

请求数据的时间范围,读取的范围是[begin,end),左闭右开的区间,且begin必须小于end,时间戳单位为毫秒。

格式:

"timeRange":{
    // begin:非必选,默认为0,取值范围是0~LONG_MAX
    "begin":1400000000000,
    // end:非必选,默认为Long Max(9223372036854775807L),取值范围是0~LONG_MAX
    "end"  :1600000000000
},

全部版本

maxVersion

请求的最大数据版本数,取值范围是1~INT32_MAX。

全部版本

附录二:Writer脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Writer脚本Demo

行模式写入宽表配置

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ots",//插件名。
            "parameter":{
                "datasource":"",//数据源。
                "table":"",//表名。
                "newVersion":"true",//使用新版otswriter
                "mode": "normal",// 行模式写入数据
                "isTimeseriesTable":"false",// 配置该表为宽表(非时序表)
                "primaryKey" : [//Tablestore的主键信息。
                    {"name":"gid", "type":"INT"},
                    {"name":"uid", "type":"STRING"}
                 ],
                "column" : [//字段。
                      {"name":"col1", "type":"INT"},
                      {"name":"col2", "type":"DOUBLE"},
                      {"name":"col3", "type":"STRING"},
                      {"name":"col4", "type":"STRING"},
                      {"name":"col5", "type":"BOOL"}
                  ],
                "writeMode" : "PutRow"    //写入模式。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

行模式写时序表配置

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ots",//插件名。
            "parameter":{
                "datasource":"",//数据源。
                "table": "testTimeseriesTableName01",
                "mode": "normal",
                "newVersion": "true",
                "isTimeseriesTable":"true",
                "timeunit":"microseconds",
                "column": [
                      {
                        "name": "_m_name"
                      },
                      {
                        "name": "_data_source",
                      },
                      {
                        "name": "_tags",
                      },
                      {
                        "name": "_time",
                      },
                      {
                        "name": "string_1",
                        "type":"string"
                      },
                      {
                        "name":"tag3",
                        "is_timeseries_tag":"true",
                      }
                    ]
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

列模式写入宽表配置

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"ots",//插件名。
            "parameter":{
                "datasource":"",//数据源。
                "table":"",
                "newVersion":"true",
                "mode":"multiVersion",
                "primaryKey" : [
                    "gid",
                    "uid"
                    ]
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },x`
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Writer脚本参数通用配置

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。

endPoint

Tablestore Server的EndPoint(服务地址),详情请参见服务地址

accessId

Tablestore的AccessKey ID。

accessKey

Tablestore的AccessKey Secret。

instanceName

Tablestore的实例名称,实例是您使用和管理Tablestore服务的实体。

您在开通Tablestore服务后,需要通过管理控制台来创建实例,然后在实例内进行表的创建和管理。实例是Tablestore资源管理的基础单元,Tablestore对应用程序的访问控制和资源计量都在实例级别完成。

table

所选取的需要抽取的表名称,此处能且只能填写一张表。在Tablestore中不存在多表同步的需求。

newVersion

定义了使用的Tablestore Writer插件版本。

  • false:旧版本Tablestore Writer,仅支持行模式写入宽表。

  • true:新版本Tablestore Writer,支持行模式列模式时序表宽表,且支持主键字增列功能。

新版Tablestore Writer除了支持新的功能,对系统资源的开销也相对较低,因此推荐使用新版Tablestore Writer。

新版插件兼容旧版本插件配置,即旧任务增加newVersion=true配置之后,可以正常运行。

false

mode

定义了写入数据的模式,当前支持两种模式

  • normal:写入普通格式的数据(行模式)。

  • multiVersion:写入多版本格式的数据(列模式)。

本配置仅在newVersion:true配置下生效

旧版Tablestore Writer会忽略mode配置,并仅支持行模式读取。

normal

isTimeseriesTable

定义了操作的数据表是否为时序数据表。

  • false:数据表为普通的宽表。

  • true:数据表为时序数据表。

本配置仅在newVersion:true & mode:normal配置下生效(列模式不兼容时序表)。

false

Writer脚本参数附加配置

Tablestore Writer支持行模式写入宽表、行模式写入时序表、列模式写入宽表。下面介绍各模式的附加配置。

行模式写宽表参数

参数

描述

是否必选

默认值

primaryKey

Tablestore的主键信息,使用JSON的数组描述字段信息。Tablestore本身是NoSQL系统,在Tablestore Writer导入数据过程中,必须指定相应的字段名称。

数据同步系统本身支持类型转换的,因此对于源头数据非STRING/INT,Tablestore Writer会进行数据类型转换。配置示例如下。

"primaryKey" : [
    {"name":"gid", "type":"INT"},
    {"name":"uid", "type":"STRING"}
                 ],
说明

Tablestore的PrimaryKey仅支持STRING和INT类型,因此Tablestore Writer本身也限定填写STRING和INT两种类型。

column

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。

配置示例:

"column" : [
     {"name":"col1", "type":"INT"},
     {"name":"col2", "type":"DOUBLE"},
     {"name":"col3", "type":"STRING"},
     {"name":"col4", "type":"BINARY"},
     {"name":"col5", "type":"BOOL"}
              ],

其中的name为写入的Tablestore列名称,type为写入的类型。Tablestore支持STRING、INT、DOUBLE、BOOL和BINARY类型。

说明

写入过程不支持常量、函数或者自定义表达式。

writeMode

数据写入表格存储的模式,目前支持以下两种模式:

  • PutRow:对应于Tablestore API PutRow,插入数据到指定的行。如果该行不存在,则新增一行。如果该行存在,则覆盖原有行。

  • UpdateRow:对应于Tablestore API UpdateRow,更新指定行的数据。如果该行不存在,则新增一行。如果该行存在,则根据请求的内容在这一行中新增、修改或者删除指定列的值。

enableAutoIncrement

是否允许向包含主键自增列的Tablestore表中写入数据。

  • true:插件会自动扫描目的端表的自增列信息,并在写入数据时添加自增列(不需要用户配置自增列名称)。

  • false:写入含主键自增列的表时会报错。

false

requestTotalSizeLimitation

该配置限制写入Tablestore时单行数据的大小,配置类型为数字。

1MB

attributeColumnSizeLimitation

该配置限制写入Tablestore时单个属性列的大小,配置类型为数字。

2MB

primaryKeyColumnSizeLimitation

该配置限制写入Tablestore时单个主键列的大小,配置类型为数字。

1KB

attributeColumnMaxCount

该配置限制写入Tablestore时属性列的个数,配置类型为数字。

1,024

行模式写时序表参数

参数

描述

是否必选

默认值

column

column中的每个元素对应时序数据中的一个字段,每个元素可配置以下参数。

  • name:列名,必需配置。以下为预定义的字段:

    • 时间线的'度量名称'使用_m_name标识,数据类型为String。

    • 时间线的'数据源'使用_data_source标识,数据类型为String。

    • 时间线的'标签'使用_tags标识,数据类型为String,字符串格式为["tagKey1=value1","tagKey2=value2"]。

    • 时间线的'时间戳'使用_time标识,数据类型为Long,默认单位为微秒。

  • is_timeseries_tag : 是否为tags字段内部的键值,非必需配置,默认为false。

  • type : 字段值类型,非必需配置,默认为string。支持string、int、double、bool、binary类型。

由于时序数据的度量名称与时间戳不能为空,因此必须配置_m_name_time字段。

示例:一条待写入数据如下,包含六个字段:

mName1    source1    ["tag1=A","tag2=B"]    1677763080000000    field_value     C

使用如下配置:

"column": [
      {
        "name": "_m_name"
      },
      {
        "name": "_data_source",
      },
      {
        "name": "_tags",
      },
      {
        "name": "_time",
      },
      {
        "name": "string_1",
        "type":"string"
      },
      {
        "name":"tag3",
        "is_timeseries_tag":"true",
      }
    ],

写入Tablestore后,控制台查看结果如下结果

timeunit

所配置的时间戳_time字段的单位,支持NANOSECONDSMICROSECONDSMILLISECONDSSECONDSMINUTES

MICROSECONDS

列模式写宽表参数

参数

描述

是否必选

默认值

primaryKey

表的主键列。

考虑到配置成本,并不需要配置primaryKey在Record(Line)中的位置,但Record的格式需要固定,要求primaryKey一定在行首,primaryKey之后是columnName。record的格式为:{pk0,pk1...}, {columnName}, {timestamp}, {value}

例如,有以下9条数据:

1,pk1,row1,1677699863871,value_0_0
1,pk1,row2,1677699863871,value_0_1
1,pk1,row3,1677699863871,value_0_2
2,pk2,row1,1677699863871,value_1_0
2,pk2,row2,1677699863871,value_1_1
2,pk2,row3,1677699863871,value_1_2
3,pk3,row1,1677699863871,value_2_0
3,pk3,row2,1677699863871,value_2_1
3,pk3,row3,1677699863871,value_2_2

配置示例如下:

"primaryKey" : [
    "gid",
    "uid"
    ],

写入宽行结果:

gid     uid     row1        row2        row3
1        pk1        value_0_0    value_0_1    value_0_2
2        pk2        value_1_0    value_1_1    value_1_2
3        pk3        value_2_0    value_2_1    value_2_2

columnNamePrefixFilter

列名前缀过滤。

Hbase导入的数据,cfqulifier共同组成了columnName,但Tablestore不支持cf,所以需要将cf过滤掉。

配置示例:"columnNamePrefixFilter":"cf:"

说明
  • 该参数选填,如果没有填写或者值为空字符串,表示不对列名进行过滤。

  • 如果通过数据集成添加的数据columnName列不是以前缀开始,则将该Record放入脏数据回收器中。

常见问题

问题一

Q:向包含主键自增列的目标表写入数据,需要如何配置Tablestore Writer?

  1. Tablestore Writer的配置中必须包含以下两条:

    "newVersion": "true",
    "enableAutoIncrement": "true",
  2. Tablestore Writer中不需要配置主键自增列的列名。

  3. Tablestore Writer中配置的primaryKey条数+column条数需要等于上游Tablestore Reader数据的列数。

问题二

Q:在时序模型的配置中,如何理解_tagis_timeseries_tag两个字段?

示例:某条数据共有三个标签,标签为:【手机=小米,内存=8G,镜头=莱卡】。数据

  • 数据导出示例(Tablestore Reader)

    • 如果想将上述标签合并到一起作为一列导出,则配置为:

      "column": [
            {
              "name": "_tags",
            }
          ],

      DataWorks会将标签导出为一列数据,形式如下:

      ["phone=xiaomi","camera=LEICA","RAM=8G"]
    • 如果希望导出phone标签和camera标签,并且每个标签单独作为一列导出,则配置为:

      "column": [
            {
              "name": "phone",
              "is_timeseries_tag":"true",
            },
            {
              "name": "camera",
              "is_timeseries_tag":"true",
            }
          ],

      DataWorks会导出两列数据,形式如下:

      xiaomi, LEICA
  • 数据导入示例(Tablestore Writer)

    现在上游数据源(Reader)有两列数据:

    • 一列数据为:["phone=xiaomi","camera=LEICA","RAM=8G"]

    • 另一列数据为:6499。

    现希望将这两列数据都添加到标签里面,预期的写入后标签字段格式如下所示:格式则配置为:

    "column": [
          {
            "name": "_tags",
          },
          {
            "name": "price",
            "is_timeseries_tag":"true",
          },
        ],
    • 第一列配置将["phone=xiaomi","camera=LEICA","RAM=8G"]整体导入标签字段。

    • 第二列配置将price=6499单独导入标签字段。