本文以表格存储Tablestore中的宽表作为上游数据源为例介绍如何使用实时计算Flink写数据到Tablestore的时序表中。
背景信息
Tablestore的时序模型是针对时间序列数据的特点进行设计,适用于物联网设备监控、设备采集数据、机器监控数据等场景。更多信息,请参见时序模型介绍。
在Tablestore的时序模型中,采用一张二维的时序表来存储时序数据。
每行代表一个时间线在某个时间点的数据,该行的主键部分为时间线标识和时间戳,该行的数据列部分为该时间线在该时间戳下的数据点,可以有多个数据列。其中度量名称(measurement)、数据源(data source)和标签(tags)组成了一个时间线标识,时间戳(time)则标识具体的时间点。
注意事项
Flink中的每个TaskManager建议配置2 CPU和4GB内存,此配置可以充分发挥每个TaskManager的计算能力。单个TaskManager能达到1万/s的写入速率。
在source表分区数目足够多的情况下,建议Flink中并发配置在16以内,写入速率随并发线性增长。
Flink与Tablestore实例必须处于同一专有网络VPC。Tablestore实例的服务地址必须使用VPC地址。
当前支持使用此功能的地域有华东1(杭州)、华东2(上海)、华北2(北京)、华北3(张家口)、华北6(乌兰察布)、华南1(深圳)、中国香港、德国(法兰克福)、美国(弗吉尼亚)、新加坡。
Tablestore数据结果表
Flink支持使用Tablestore时序表存储输出结果。更多信息,请参见表格存储Tablestore连接器。
时序模型主要有_m_name
、_data_source
、_tags
、_time
四个主键,因此时序表作为结果表时需要指定四个主键,其余配置与数据表作为结果表时的配置相同。目前支持WITH参数,SINK表主键和Map格式主键三种方式指定时序表主键。三种方式_tags列的转换优先级为WITH参数方式的优先级最高,Map格式主键与SINK表主键方式次之。
WITH参数
使用WITH参数方式定义DDL的示例如下:
--创建源表的临时表。
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_widecolume_source_table',
'tunnelName' = 'test_widecolume_source_tunnel',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'true', --是否忽略delete操作的数据。
);
--创建结果表的临时表。
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_timeseries_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
);
--将源表数据插入到结果表。
INSERT INTO timeseries_sink
select
measurement,
datasource,
tag_a,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value,
tag_b,
tag_c,
tag_d,
tag_e,
tag_f
from
timeseries_source;
WITH参数说明请参见下表。
参数 | 适用模型 | 是否必填 | 说明 |
connector | 通用参数 | 是 | 连接器类型。固定取值为ots。 |
endPoint | 通用参数 | 是 | Tablestore实例的服务地址,必须使用实例的VPC地址。更多信息,请参见服务地址。 |
instanceName | 通用参数 | 是 | Tablestore实例的名称。 |
tableName | 通用参数 | 是 | Tablestore的数据表或者时序表名称。 数据表作为源表时填写数据表名称,时序表作为结果表时填写时序表名称。 |
tunnelName | 宽表模型 | 是 | Tablestore数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道。 |
accessId | 通用参数 | 是 | 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。关于获取AccessKey的具体操作,请参见创建AccessKey。 |
accessKey | 通用参数 | 是 | |
ignoreDelete | 宽表模型 | 否 | 是否忽略DELETE操作类型的实时数据,可选配置。默认值为false。数据表作为源表时可以根据需要配置。 |
storageType | 通用参数 | 是 | 数据存储类型。取值范围如下:
|
timeseriesSchema | 时序模型 | 是 | 需要指定为时序表主键的列。以JSON的key-value格式来指定时序表主键,例如 配置的主键类型必须与时序表中主键类型一致。其中tags主键可以支持同时包含多列。 |
SINK表主键
时序结果表的DDL定义示例如下所示。主键定义中的第一位measurement为_m_name列,第二位datasource为_data_source列,最后一位time为time列,中间的多列为tag列。
使用SINK表主键方式定义DDL的示例如下:
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
PRIMARY KEY(measurement, datasource, tag_a,tag_b,tag_c,tag_d,tag_e,tag_f `time`) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_timeseries_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
);
Map格式的主键
对于时序Sink表主键,Tablestore引入了Flink的Map类型便于生成时序模型中时序表的_tags列,Map类型可以支持列的改名、简单函数等映射操作。使用Map时必须保证其中的_tags主键声明位置在第三位。
--创建源表的临时表。
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_widecolume_source_table',
'tunnelName' = 'test_widecolume_source_tunnel',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'true', --是否忽略delete操作的数据。
);
--创建结果表的临时表。
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags Map<String, String>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'test_timeseries_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
);
--将源表数据插入到结果表。
INSERT INTO timeseries_sink
select
m_name,
data_source,
MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
`time`,
cpu_sys,
cpu_user,
disk_0,
disk_1,
disk_2,
memory_used,
net_in,
net_out
from
timeseries_source;
实时计算作业开发流程
前提条件
已创建AccessKey。具体操作,请参见创建AccessKey。
已为Tablestore数据表(源表)创建数据通道。具体操作,请参见创建数据通道。
步骤一:创建作业
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击SQL开发。
单击新建。
在新建作业草稿对话框中,单击空白的流作业草稿。
Flink全托管也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板和数据同步模板。
单击下一步。
填写作业配置信息。
作业参数
说明
示例
文件名称
作业的名称。
说明作业名称在当前项目中必须保持唯一。
flink-test
存储位置
指定该作业的代码文件所属的文件夹。
您还可以在现有文件夹右侧,单击图标,新建子文件夹。
作业草稿
引擎版本
当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。
vvr-6.0.4-flink-1.15
单击创建。
步骤二:编写作业代码
创建一个源表(Tablestore数据表)和结果表(Tablestore时序表)的临时表。
说明在生产作业中,建议您尽量减少临时表的使用,直接使用元数据管理中已经注册的表。
创建一个timeseries_source和timeseries_sink临时表代码示例如下:
CREATE TEMPORARY TABLE timeseries_source ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING ) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_widecolume_source_table', 'tunnelName' = 'test_widecolume_source_tunnel', 'accessId' = 'xxxxxxxxxxx', 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', --是否忽略delete操作的数据。 'ignoreDelete' = 'true', ); CREATE TEMPORARY TABLE timeseries_sink ( measurement STRING, datasource STRING, tag_a STRING, `time` BIGINT, binary_value BINARY, bool_value BOOLEAN, double_value DOUBLE, long_value BIGINT, string_value STRING, tag_b STRING, tag_c STRING, tag_d STRING, tag_e STRING, tag_f STRING, PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED ) WITH ( 'connector' = 'ots', 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', 'instanceName' = 'xxx', 'tableName' = 'test_timeseries_sink_table', 'accessId' = 'xxxxxxxxxxx', 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'storageType' = 'TIMESERIES', 'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}' );
编写作业逻辑。
将源表数据插入到结果表的代码示例如下:
INSERT INTO timeseries_sink select measurement, datasource, tag_a, `time`, binary_value, bool_value, double_value, long_value, string_value, tag_b, tag_c, tag_d, tag_e, tag_f from timeseries_source;
步骤三:进行更多配置
在作业开发页面右侧,单击更多配置后,您可以填写以下参数信息:
引擎版本:修改您创建作业时选择的Flink引擎版本。
说明 从VVR 3.0.3版本(对应Flink 1.12版本)开始,VVP支持同时运行多个不同引擎版本的SQL作业。如果您的作业已使用了Flink 1.12及更早版本的引擎,您需要按照以下情况进行处理:- Flink 1.12版本:停止后启动作业,系统将自动将引擎升级为vvr-3.0.3-flink-1.12版本。
- Flink 1.11或Flink 1.10版本:手动将作业引擎版本升级到vvr-3.0.3-flink-1.12或vvr-4.0.7-flink-1.13版本后重启作业,否则会在启动作业时超时报错。
附加依赖文件:作业中需要使用到的附加依赖,例如临时函数等。
说明如果没有VVR的权限,您可以下载VVR依赖,并在资源上传页面进行上传,然后选择附加依赖文件为上传的VVR依赖即可。具体操作,请参见附录:配置VVR依赖。
步骤四:进行深度检查
在作业开发页面顶部,单击深度检查,进行语法检查。
(可选)步骤五:进行作业调试
在作业开发页面顶部,单击调试。
您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险,详情请参见作业调试。
步骤六:作业部署
在作业开发页面顶部,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确认。
Session集群适用于非生产环境的开发测试环境,您可以使用Session集群模式部署或调试作业,提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将作业提交至Session集群中,因为会存在业务稳定性问题。具体操作,请参见步骤一:创建Session集群。
步骤七:启动并查看Flink计算结果
如果您对作业进行了修改(例如更改SQL代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要先上线,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,也需要停止后再启动作业。作业停止详情请参见作业停止。
在左侧导航栏,单击作业运维。
单击目标作业名称操作列中的启动。
作业启动参数配置详情请参见作业启动。单击启动后,您可以看到作业状态变为运行中,则代表作业运行正常。
在作业运维详情页面,查看Flink计算结果。
在作业运维页面,单击目标作业名称。
单击作业探查。
在运行日志页签,单击运行Task Managers页签下的Path, ID。
单击日志,在页面搜索Sink相关的日志信息。