表格存储支持作为实时计算Flink的源表和结果表使用,您可以将表格存储数据表中的数据经过Flink处理后得到的结果保存到表格存储的另一张数据表中。
背景信息
实时计算Flink能将Tunnel Service的数据通道作为流式数据的输入,每条数据类似一个JSON格式。示例如下:
{
"OtsRecordType": "PUT",
"OtsRecordTimestamp": 1506416585740836,
"PrimaryKey": [
{
"ColumnName": "pk_1",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2",
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION",
"ColumnName": "attr_1"
}
]
}
字段名 | 描述 |
OtsRecordType | 数据操作类型,取值范围如下:
|
OtsRecordTimestamp | 数据操作时间,单位为微秒。全量数据时取值为0。 |
PrimaryKey | 主键列信息,以JSON格式数组表示。支持配置1~4列,请以实际主键列为准。包括如下选项:
|
Columns | 属性列信息,以JSON格式的数组表示。包括如下选项:
|
Tablestore数据源表
存储在Tablestore中数据的主键和属性列值均可以在Flink中通过数据源表DDL以列名与相应的类型映射进行读取。更多信息,请参见表格存储Tablestore连接器。
DDL定义
数据源表的DDL定义示例如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的数据.
);
除了待消费的用户数据外,Tunnel Service返回数据中的OtsRecordType、OtsRecordTimestamp字段均支持通过属性字段的方式读取。字段说明请参见下表。
字段名 | Flink映射名 | 描述 |
OtsRecordType | type | 数据操作类型。 |
OtsRecordTimestamp | timestamp | 数据操作时间,单位为微秒。全量数据时取值为0。 |
当需要读取OtsRecordType和OtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
WITH参数
参数 | 是否必填 | 描述 |
connector | 是 | 源表类型。固定取值为ots。 |
endPoint | 是 | 表格存储实例的服务地址。更多信息,请参见服务地址。 |
instanceName | 是 | 表格存储的实例名称。 |
tableName | 是 | 表格存储的数据表名称。 |
tunnelName | 是 | 表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道。 |
accessId | 是 | 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey。 |
accessKey | 是 | |
ignoreDelete | 否 | 是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。 |
skipInvalidData | 否 | 是否忽略脏数据。默认值为false,表示不忽略脏数据。 如果不忽略脏数据,则处理脏数据时会进行报错。如果需要忽略脏数据,请设置此参数为true。 |
源表字段类型映射
Tablestore字段类型 | Flink字段类型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
Tablestore数据结果表
Flink支持使用Tablestore存储输出结果。更多信息,请参见表格存储Tablestore连接器。
DDL定义
结果表的DDL定义示例如下:
在Tablestore数据结果表定义中除了主键列外,需要包含至少一个属性列。
CREATE TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
...
);
WITH参数
参数 | 是否必填 | 描述 |
connector | 是 | 结果表类型。固定取值为ots。 |
endPoint | 是 | 表格存储实例的服务地址。更多信息,请参见服务地址。 |
instanceName | 是 | 表格存储的实例名称。 |
tableName | 是 | 表格存储的数据表名称。 |
tunnelName | 是 | 表格存储数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道。 |
accessId | 是 | 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。获取AccessKey的具体操作,请参见获取AccessKey。 |
accessKey | 是 | |
valueColumns | 是 | 指定插入的字段列名。插入多个字段以半角逗号(,)分隔。例如 |
bufferSize | 否 | 流入多少条数据后开始输出。默认值为5000,表示输入的数据达到5000条就开始输出。 |
batchWriteTimeoutMs | 否 | 写入超时的时间。单位为毫秒。默认值为5000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 |
batchSize | 否 | 一次批量写入的条数。默认值为100。 |
retryIntervalMs | 否 | 重试间隔时间,单位毫秒。默认值为1000。 |
maxRetryTimes | 否 | 最大重试次数。默认值为100。 |
ignoreDelete | 否 | 是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。 |
autoIncrementKey | 否 | 当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。 |
defaultTimestampInMillisecond | 否 | 写入结果表的数据的版本号,单位为毫秒。当不进行配置时,版本号取决于写入的时间。 |
结果表字段类型映射
Flink字段类型 | Tablestore字段类型 |
BINARY | BINARY |
VARBINARY | BINARY |
CHAR | STRING |
VARCHAR | STRING |
TINYINT | INTEGER |
SMALLINT | INTEGER |
INTEGER | INTEGER |
BIGINT | INTEGER |
FLOAT | DOUBLE |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
SQL示例
读取数据源表的数据
批量从数据源表ots source中读取数据,SQL示例如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);
SELECT * FROM tablestore_stream LIMIT 100;
数据同步到结果表
ots sink数据会以updateRow的方式写入结果表,SQL示例如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的数据。
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='xxxxxxxxxxx',
'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
实时计算作业开发流程
前提条件
已创建AccessKey。具体操作,请参见创建AccessKey。
已为表格存储数据表(源表)创建数据通道。具体操作,请参见创建数据通道。
步骤一:创建作业
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击SQL开发。
单击新建。
在新建作业草稿对话框中,单击空白的流作业草稿。
Flink全托管也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板和数据同步模板。
单击下一步。
填写作业配置信息。
作业参数
说明
示例
文件名称
作业的名称。
说明作业名称在当前项目中必须保持唯一。
flink-test
存储位置
指定该作业的代码文件所属的文件夹。
您还可以在现有文件夹右侧,单击图标,新建子文件夹。
作业草稿
引擎版本
当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。
vvr-6.0.4-flink-1.15
单击创建。
步骤二:编写作业代码
创建一个源表和结果表的临时表。
说明在生产作业中,建议您尽量减少临时表的使用,直接使用元数据管理中已经注册的表。
创建一个tablestore_stream和ots_sink临时表代码示例如下:
CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'false' //是否忽略delete操作的数据。 ); CREATE TEMPORARY TABLE ots_sink ( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED ) WITH ( 'connector'='ots', 'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com', 'instanceName'='flink-sink', 'tableName'='flink_sink_table', 'accessId'='xxxxxxxxxxx', 'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'valueColumns'='customerid,customername' );
编写作业逻辑。
将源表数据插入到结果表的代码示例如下:
INSERT INTO ots_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
步骤三:进行更多配置
在作业开发页面右侧,单击更多配置后,您可以填写以下参数信息:
引擎版本:修改您创建作业时选择的Flink引擎版本。
说明 从VVR 3.0.3版本(对应Flink 1.12版本)开始,VVP支持同时运行多个不同引擎版本的SQL作业。如果您的作业已使用了Flink 1.12及更早版本的引擎,您需要按照以下情况进行处理:- Flink 1.12版本:停止后启动作业,系统将自动将引擎升级为vvr-3.0.3-flink-1.12版本。
- Flink 1.11或Flink 1.10版本:手动将作业引擎版本升级到vvr-3.0.3-flink-1.12或vvr-4.0.7-flink-1.13版本后重启作业,否则会在启动作业时超时报错。
附加依赖文件:作业中需要使用到的附加依赖,例如临时函数等。
步骤四:进行深度检查
在作业开发页面顶部,单击深度检查,进行语法检查。
步骤五:(可选)进行作业调试
在作业开发页面顶部,单击调试。
您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。具体操作,请参见作业调试。
步骤六:作业部署
在作业开发页面顶部,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确认。
Session集群适用于非生产环境的开发测试环境,您可以使用Session集群模式部署或调试作业,提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将作业提交至Session集群中,因为会存在业务稳定性问题。具体操作,请参见步骤一:创建Session集群。
步骤七:启动并查看Flink计算结果
如果您对作业进行了修改(例如更改SQL代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要先上线,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,也需要停止后再启动作业。关于作业停止的具体操作,请参见作业停止。
在左侧导航栏,单击作业运维。
单击目标作业名称操作列中的启动。
作业启动参数配置详情请参见作业启动。单击启动后,您可以看到作业状态变为运行中,则代表作业运行正常。
在作业运维详情页面,查看Flink计算结果。
在作业运维页面,单击目标作业名称。
单击作业探查。
在运行日志页签,单击运行Task Managers页签下的Path, ID。
单击日志,在页面搜索Sink相关的日志信息。