本文为您介绍如何使用表格存储Tablestore(OTS)连接器。
背景信息
表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore。
Tablestore连接器支持的信息如下。
类别 | 详情 |
运行模式 | 流模式 |
API种类 | SQL |
支持类型 | 源表、维表和结果表 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
是否支持更新或删除结果表数据 | 是 |
前提条件
已购买Tablestore实例并创建表,详情请参见使用流程。
使用限制
仅实时计算引擎VVR 3.0.0及以上版本支持表格存储Tablestore连接器。
语法结构
结果表
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}', 'endPoint'='<yourEndpoint>', 'valueColumns'='birthday' );
说明Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。
维表
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}' );
源表
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='${ak_id}', 'accessKey' ='${ak_secret}', 'ignoreDelete' = 'false' );
属性列支持读取待消费字段和Tunnel Service,以及返回数据中的
OtsRecordType
和OtsRecordTimestamp
两个字段。字段说明请参见下表。字段名
Flink映射名
描述
OtsRecordType
type
数据操作类型。
OtsRecordTimestamp
timestamp
数据操作时间,单位为微秒。
说明全量读取数据时,OtsRecordTimestamp取值为0。
当需要读取
OtsRecordType
和OtsRecordTimestamp
字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', record_timestamp BIGINT METADATA FROM 'timestamp' ) WITH ( ... );
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
ots
。instanceName
实例名。
String
是
无
无。
endPoint
实例访问地址。
String
是
无
请参见服务地址。
tableName
表名。
String
是
无
无。
accessId
阿里云账号或者RAM用户的AccessKey ID。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理。
accessKey
阿里云账号或者RAM用户的AccessKey Secret。
String
是
无
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理。
connectTimeout
连接器连接Tablestore的超时时间。
Integer
否
30000
单位为毫秒。
socketTimeout
连接器连接Tablestore的Socket超时时间。
Integer
否
30000
单位为毫秒。
ioThreadCount
IO线程数量。
Integer
否
4
无。
callbackThreadPoolSize
回调线程池大小。
Integer
否
4
无。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
tunnelName
表格存储数据表的数据通道名称。
String
是
无
您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道。
ignoreDelete
是否忽略DELETE操作类型的实时数据。
Boolean
否
false
参数取值如下:
true:忽略。
false(默认值):不忽略。
skipInvalidData
是否忽略脏数据。如果不忽略脏数据,则处理脏数据时会进行报错。
Boolean
否
false
参数取值如下:
true:忽略脏数据。
false(默认值):不忽略脏数据。
说明仅实时计算引擎VVR 8.0.4及以上版本支持该参数。
retryStrategy
重试策略。
Enum
否
TIME
参数取值如下:
TIME:在超时时间retryTimeoutMs内持续进行重试。
COUNT:在最大重试次数retryCount内持续进行重试。
retryCount
重试次数。
Integer
否
3
当retryStrategy设置为COUNT时,可以设置重试次数。
retryTimeoutMs
重试的超时时间。
Integer
否
180000
当retryStrategy设置为TIME时,可以设置重试的超时时间,单位为毫秒。
streamOriginColumnMapping
原始列名到真实列名的映射。
String
否
无
原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如
origin_col1:col1,origin_col2:col2
。outputSpecificRowType
是否透传具体的RowType。
Boolean
否
false
参数取值如下:
false:不透传,所有数据RowType均为INSERT。
true:透传,将根据透传的类型相应设置为INSERT、DELETE或UPDATE_AFTER。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
retryIntervalMs
重试间隔时间。
Integer
否
1000
单位为毫秒。
maxRetryTimes
最大重试次数。
Integer
否
10
无。
valueColumns
插入字段的列名。
String
是
无
多个字段以半角逗号(,)分割,例如ID或NAME。
bufferSize
流入多少条数据后开始输出。
Integer
否
5000
无。
batchWriteTimeoutMs
写入超时的时间。
Integer
否
5000
单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
batchSize
一次批量写入的条数。
Integer
否
100
最大值为200。
ignoreDelete
是否忽略DELETE操作。
Boolean
否
False
无。
autoIncrementKey
当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。
String
否
无
当结果表没有主键自增列时,请不要设置此参数。
说明仅实时计算引擎VVR 8.0.4及以上版本支持该参数。
overwriteMode
数据覆盖模式。
Enum
否
PUT
参数取值如下:
PUT:以PUT方式将数据写入到Tablestore表。
UPDATE:以UPDATE方式写入到Tablestore表。
说明动态列模式下只支持UPDATE模式。
defaultTimestampInMillisecond
设定写入Tablestrore数据的默认时间戳。
Long
否
-1
如果不指定,则会使用系统当前的毫秒时间戳。
dynamicColumnSink
是否开启动态列模式。
Boolean
否
false
动态列模式适用于在表定义中无需指定列名,根据作业运行情况动态插入数据列的场景。建表语句中主键需要定义为前若干列,最后两列中前一列的值作为列名变量,且类型必须为String,后一列的值作为该列对应的值。
说明开启动态列模式时,不支持主键自增列,且参数overwriteMode必须设置为UPDATE。
checkSinkTableMeta
是否检查结果表元数据。
Boolean
否
true
若设置为true,会检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。
enableRequestCompression
数据写入过程中是否开启数据压缩。
Boolean
否
false
无。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
retryIntervalMs
重试间隔时间。
Integer
否
1000
单位为毫秒。
maxRetryTimes
最大重试次数。
Integer
否
10
无。
cache
缓存策略。
String
否
ALL
目前Tablestore维表支持以下三种缓存策略:
None:无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList。
说明因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize
缓存大小。
Integer
否
无
当缓存策略选择LRU时,可以设置缓存大小。
说明单位为数据条数。
cacheTTLMs
缓存失效时间。
Integer
否
无
单位为毫秒。cacheTTLMs配置和cache有关:
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
cacheEmpty
是否缓存空结果。
Boolean
否
无
true:缓存
false:不缓存
cacheReloadTimeBlackList
更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。
String
否
无
格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:
用半角逗号(,)来分隔多个黑名单。
用箭头(->)来分割黑名单的起始结束时间。
async
是否异步返回数据。
Boolean
否
false
true:表示异步返回数据。异步返回数据默认是无序的。
false(默认值):表示不进行异步返回数据。
类型映射
源表
Tablestore字段类型
Flink字段类型
INTEGER
BIGINT
STRING
STRING
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
BINARY
BINARY
结果表
Flink字段类型
Tablestore字段类型
BINARY
BINARY
VARBINARY
CHAR
STRING
VARCHAR
TINYINT
INTEGER
SMALLINT
INTEGER
BIGINT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
使用示例
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;