全部产品
Search
文档中心

实时计算Flink版:表格存储Tablestore(OTS)

更新时间:Aug 26, 2024

本文为您介绍如何使用表格存储Tablestore(OTS)连接器。

背景信息

表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore

Tablestore连接器支持的信息如下。

类别

详情

运行模式

流模式

API种类

SQL

支持类型

源表、维表和结果表

数据格式

暂不支持

特有监控指标

  • 源表:无

  • 维表:无

  • 结果表:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

是否支持更新或删除结果表数据

前提条件

已购买Tablestore实例并创建表,详情请参见使用流程

使用限制

仅实时计算引擎VVR 3.0.0及以上版本支持表格存储Tablestore连接器。

语法结构

  • 结果表

    CREATE TABLE ots_sink (
      name VARCHAR,
      age BIGINT,
      birthday BIGINT,
      primary key(name,age) not enforced
    ) WITH (
      'connector'='ots',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}',
      'endPoint'='<yourEndpoint>',
      'valueColumns'='birthday'
    );
    说明

    Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。

  • 维表

    CREATE TABLE ots_dim (
      id int,
      len int,
      content STRING
    ) WITH (
      'connector'='ots',
      'endPoint'='<yourEndpoint>',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}'
    );
  • 源表

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR
    ) WITH (
      'connector'='ots',
      'endPoint' ='<yourEndpoint>',
      'instanceName' = 'flink-source',
      'tableName' ='flink_source_table',
      'tunnelName' = 'flinksourcestream',
      'accessId' ='${ak_id}',
      'accessKey' ='${ak_secret}',
      'ignoreDelete' = 'false'
    );

    属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordTypeOtsRecordTimestamp两个字段。字段说明请参见下表。

    字段名

    Flink映射名

    描述

    OtsRecordType

    type

    数据操作类型。

    OtsRecordTimestamp

    timestamp

    数据操作时间,单位为微秒。

    说明

    全量读取数据时,OtsRecordTimestamp取值为0。

    当需要读取OtsRecordTypeOtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR,
      record_type STRING METADATA FROM 'type',
      record_timestamp BIGINT METADATA FROM 'timestamp'
    ) WITH (
      ...
    );

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为ots

    instanceName

    实例名。

    String

    无。

    endPoint

    实例访问地址。

    String

    请参见服务地址

    tableName

    表名。

    String

    无。

    accessId

    阿里云账号或者RAM用户的AccessKey ID。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理

    accessKey

    阿里云账号或者RAM用户的AccessKey Secret。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理

    connectTimeout

    连接器连接Tablestore的超时时间。

    Integer

    30000

    单位为毫秒。

    socketTimeout

    连接器连接Tablestore的Socket超时时间。

    Integer

    30000

    单位为毫秒。

    ioThreadCount

    IO线程数量。

    Integer

    4

    无。

    callbackThreadPoolSize

    回调线程池大小。

    Integer

    4

    无。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    tunnelName

    表格存储数据表的数据通道名称。

    String

    您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道

    ignoreDelete

    是否忽略DELETE操作类型的实时数据。

    Boolean

    false

    参数取值如下:

    • true:忽略。

    • false(默认值):不忽略。

    skipInvalidData

    是否忽略脏数据。如果不忽略脏数据,则处理脏数据时会进行报错。

    Boolean

    false

    参数取值如下:

    • true:忽略脏数据。

    • false(默认值):不忽略脏数据。

    说明

    仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

    retryStrategy

    重试策略。

    Enum

    TIME

    参数取值如下:

    • TIME:在超时时间retryTimeoutMs内持续进行重试。

    • COUNT:在最大重试次数retryCount内持续进行重试。

    retryCount

    重试次数。

    Integer

    3

    当retryStrategy设置为COUNT时,可以设置重试次数。

    retryTimeoutMs

    重试的超时时间。

    Integer

    180000

    当retryStrategy设置为TIME时,可以设置重试的超时时间,单位为毫秒。

    streamOriginColumnMapping

    原始列名到真实列名的映射。

    String

    原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如origin_col1:col1,origin_col2:col2

    outputSpecificRowType

    是否透传具体的RowType。

    Boolean

    false

    参数取值如下:

    • false:不透传,所有数据RowType均为INSERT。

    • true:透传,将根据透传的类型相应设置为INSERT、DELETE或UPDATE_AFTER。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    retryIntervalMs

    重试间隔时间。

    Integer

    1000

    单位为毫秒。

    maxRetryTimes

    最大重试次数。

    Integer

    10

    无。

    valueColumns

    插入字段的列名。

    String

    多个字段以半角逗号(,)分割,例如ID或NAME。

    bufferSize

    流入多少条数据后开始输出。

    Integer

    5000

    无。

    batchWriteTimeoutMs

    写入超时的时间。

    Integer

    5000

    单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。

    batchSize

    一次批量写入的条数。

    Integer

    100

    最大值为200。

    ignoreDelete

    是否忽略DELETE操作。

    Boolean

    False

    无。

    autoIncrementKey

    当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。

    String

    当结果表没有主键自增列时,请不要设置此参数。

    说明

    仅实时计算引擎VVR 8.0.4及以上版本支持该参数。

    overwriteMode

    数据覆盖模式。

    Enum

    PUT

    参数取值如下:

    • PUT:以PUT方式将数据写入到Tablestore表。

    • UPDATE:以UPDATE方式写入到Tablestore表。

    说明

    动态列模式下只支持UPDATE模式。

    defaultTimestampInMillisecond

    设定写入Tablestrore数据的默认时间戳。

    Long

    -1

    如果不指定,则会使用系统当前的毫秒时间戳。

    dynamicColumnSink

    是否开启动态列模式。

    Boolean

    false

    动态列模式适用于在表定义中无需指定列名,根据作业运行情况动态插入数据列的场景。建表语句中主键需要定义为前若干列,最后两列中前一列的值作为列名变量,且类型必须为String,后一列的值作为该列对应的值。

    说明

    开启动态列模式时,不支持主键自增列,且参数overwriteMode必须设置为UPDATE。

    checkSinkTableMeta

    是否检查结果表元数据。

    Boolean

    true

    若设置为true,会检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。

    enableRequestCompression

    数据写入过程中是否开启数据压缩。

    Boolean

    false

    无。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    retryIntervalMs

    重试间隔时间。

    Integer

    1000

    单位为毫秒。

    maxRetryTimes

    最大重试次数。

    Integer

    10

    无。

    cache

    缓存策略。

    String

    ALL

    目前Tablestore维表支持以下三种缓存策略:

    • None:无缓存。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

      需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

      适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList

      说明

      因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。

    cacheSize

    缓存大小。

    Integer

    当缓存策略选择LRU时,可以设置缓存大小。

    说明

    单位为数据条数。

    cacheTTLMs

    缓存失效时间。

    Integer

    单位为毫秒。cacheTTLMs配置和cache有关:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

    cacheEmpty

    是否缓存空结果。

    Boolean

    • true:缓存

    • false:不缓存

    cacheReloadTimeBlackList

    更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。

    String

    格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:

    • 用半角逗号(,)来分隔多个黑名单。

    • 用箭头(->)来分割黑名单的起始结束时间。

    async

    是否异步返回数据。

    Boolean

    false

    • true:表示异步返回数据。异步返回数据默认是无序的。

    • false(默认值):表示不进行异步返回数据。

类型映射

  • 源表

    Tablestore字段类型

    Flink字段类型

    INTEGER

    BIGINT

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    DOUBLE

    DOUBLE

    BINARY

    BINARY

  • 结果表

    Flink字段类型

    Tablestore字段类型

    BINARY

    BINARY

    VARBINARY

    CHAR

    STRING

    VARCHAR

    TINYINT

    INTEGER

    SMALLINT

    INTEGER

    BIGINT

    FLOAT

    DOUBLE

    DOUBLE

    BOOLEAN

    BOOLEAN

使用示例

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false',
  'skipInvalidData' ='false' 
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}' 
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;