借助阿里云在亚洲加速迈向成功
一站式安全合规咨询服务
MLPS 2.0 一站式合规解决方案
依托我们的网络进军中国市场
提升面向互联网应用的性能和安全性
保障您的中国业务安全无忧
通过强大的数据安全框架保护您的数据资产
申请 ICP 备案的流程解读和咨询服务
面向大数据建设、管理及应用的全域解决方案
企业内大数据建设、管理和应用的一站式解决方案
将您的采购和销售置于同一企业级全渠道数字平台上
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
快速搭建在线教育平台
提供域名注册、分析和保护服务
云原生 Kubernetes 容器化应用运行环境
以 Kubernetes 为使用界面的容器服务产品,提供符合容器规范的算力资源
安全的镜像托管服务,支持全生命周期管理
多集群环境下微服务应用流量统一管理
提供任意基础设施上容器集群的统一管控,助您轻松管控分布式云场景
高弹性、高可靠的企业级无服务器 Kubernetes 容器产品
敏捷安全的 Serverless 容器运行服务
为虚拟机和容器提供高可靠性、高性能、低时延的块存储服务
一款海量、安全、低成本、高可靠的云存储服务
可靠、弹性、高性能、多共享的文件存储服务
全托管、可扩展的并行文件系统服务。
全托管的 NoSQL 结构化数据实时存储服务
可抵扣多种存储产品的容量包,兼具灵活性和长期成本优化
让您的应用跨不同可用区资源自动分配访问量
随时绑定和解绑 VPC ECS
云网络公网、跨域流量统一计费
高性价比,可抵扣按流量计费的流量费用
创建云上隔离的网络,在专有环境中运行资源
在 VPC 环境下构建公网流量的出入口
具备网络状态可视化、故障智能诊断能力的自助式网络运维服务。
安全便捷的云上服务专属连接
基于阿里云专有网络的私有 DNS 解析服务
保障在线业务不受大流量 DDoS 攻击影响
系统运维和安全审计管控平台
业务上云的第一个网络安全基础设施
集零信任内网访问、办公数据保护、终端管理等多功能于一体的办公安全管控平台
提供7X24小时安全运维平台
防御常见 Web 攻击,缓解 HTTP 泛洪攻击
实现全站 HTTPS,呈现可信的 WEB 访问
为云上应用提供符合行业标准和密码算法等级的数据加解密、签名验签和数据认证能力
一款发现、分类和保护敏感数据的安全服务
创建、控制和管理您的加密密钥
快速提高应用高可用能力服务
围绕应用和微服务的 PaaS 平台
兼容主流开源微服务生态的一站式平台
多集群环境下微服务应用流量统一管理
Super MySQL 和 PostgreSQL,高度兼容 Oracle 语法
全托管 MySQL、PostgreSQL、SQL Server、MariaDB
兼容 Redis® 的缓存和KV数据库
兼容Apache Cassandra、Apache HBase、Elasticsearch、OpenTSDB 等多种开源接口
文档型数据库,支持副本集和分片架构
100%兼容 Apache HBase 并深度扩展,稳定、易用、低成本的NoSQL数据库。
低成本、高可用、可弹性伸缩的在线时序数据库服务
专为搜索和分析而设计,成本效益达到开源的两倍,采用最新的企业级AI搜索和AI助手功能。
一款兼容PostgreSQL协议的实时交互式分析产品
一种快速、完全托管的 TB/PB 级数据仓库
基于 Flink 为大数据行业提供解决方案
基于Qwen和其他热门模型的一站式生成式AI平台,可构建了解您业务的智能应用程
一站式机器学习平台,满足数据挖掘分析需求
高性能向量检索服务,提供低代码API和高成本效益
帮助您的应用快速构建高质量的个性化推荐服务能力
提供定制化的高品质机器翻译服务
全面的AI计算平台,满足大模型训练等高性能AI计算的算力和性能需求
具备智能会话能力的会话机器人
基于机器学习的智能图像搜索产品
基于阿里云深度学习技术,为用户提供图像分割、视频分割、文字识别等离线SDK能力,支持Android、iOS不同的适用终端。
语音识别、语音合成服务以及自学习平台
一站式智能搜索业务开发平台
助力金融企业快速搭建超低时延、高质量、稳定的行情数据服务
帮助企业快速测算和分析企业的碳排放和产品碳足迹
企业工作流程自动化,全面提高效率
金融级云原生分布式架构的一站式高可用应用研发、运维平台
eKYC 数字远程在线解决方案
可智能检测、大数据驱动的综合性反洗钱 (AML) 解决方案
阿里云APM类监控产品
实时云监控服务,确保应用及服务器平稳运行
为系统运维人员管理云基础架构提供全方位服务的云上自动化运维平台
面向您的云资源的风险检测服务
提升分布式环境下的诊断效率
日志类数据一站式服务,无需开发就能部署
ECS 预留实例
让弹性计算产品的成本和灵活性达到最佳平衡的付费方式。云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势自带IP上云
自带公网 IP 地址上云全球网络互联
端到端的软件定义网络解决方案,可推动跨国企业的业务发展全球应用加速
提升面向互联网应用的性能和安全性全球互联网接入
将IDC网关迁移到云端云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势金融科技云数据库解决方案
利用专为金融科技而设的云原生数据库解决方案游戏行业云数据库解决方案
提供多种成熟架构,解决所有数据问题Oracle 数据库迁移
将 Oracle 数据库顺利迁移到云原生数据库数据库迁移
加速迁移您的数据到阿里云阿里云上的数据湖
实时存储、管理和分析各种规模和类型的数据数码信贷
利用大数据和 AI 降低信贷和黑灰产风险面向企业数据技术的大数据咨询服务
帮助企业实现数据现代化并规划其数字化未来人工智能对话服务
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人EasyDispatch 现场服务管理
为现场服务调度提供实时AI决策支持在线教育
快速搭建在线教育平台窄带高清 (HD) 转码
带宽成本降低高达 30%广电级大型赛事直播
为全球观众实时直播大型赛事,视频播放流畅不卡顿直播电商
快速轻松地搭建一站式直播购物平台用于供应链规划的Alibaba Dchain
构建和管理敏捷、智能且经济高效的供应链云胸牌
针对赛事运营的创新型凭证数字服务数字门店中的云 POS 解决方案
将所有操作整合到一个云 POS 系统中元宇宙
元宇宙是下一代互联网人工智能 (AI) 加速
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速DevOps
快速、安全地最大限度提高您的DevOps优势数据迁移解决方案
加速迁移您的数据到阿里云企业 IT 治理
在阿里云上构建高效可控的云环境基于日志管理的AIOps
登录到带有智能化日志管理解决方案的 AIOps 环境备份与存档
数据备份、数据存档和灾难恢复用阿里云金融服务加快创新
在云端开展业务,提升客户满意度
为全球资本市场提供安全、准确和数字化的客户体验
利用专为金融科技而设的云原生数据库解决方案
利用大数据和 AI 降低信贷和黑灰产风险
建立快速、安全的全球外汇交易平台
新零售时代下,实现传统零售业转型
利用云服务处理流量波动问题,扩展业务运营、降低成本
快速轻松地搭建一站式直播购物平台
面向大数据建设、管理及应用的全域解决方案
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
以数字化媒体旅程为当今的媒体市场准备就绪您的内容
带宽成本降低高达 30%
快速轻松地搭建一站式直播购物平台
为全球观众实时直播大型赛事,视频播放流畅不卡顿
使用阿里云弹性高性能计算 E-HPC 将本地渲染农场连接到云端
构建发现服务,帮助客户找到最合适的内容
保护您的媒体存档安全
通过统一的数据驱动平台提供一致的全生命周期客户服务
在钉钉上打造一个多功能的电信和数字生活平台
在线存储、共享和管理照片与文件
提供全渠道的无缝客户体验
面向中小型企业,为独立软件供应商提供可靠的IT服务
打造最快途径,助力您的新云业务扬帆起航
先进的SD-WAN平台,可实现WAN连接、实时优化并降低WAN成本
通过自动化和流程标准化实现快速事件响应
针对关键网络安全威胁提供集中可见性并进行智能安全分析
提供大容量、可靠且高度安全的企业文件传输
用智能技术数字化体育赛事
基于人工智能的低成本体育广播服务
专业的广播转码及信号分配管理服务
基于云的音视频内容引入、编辑和分发服务
在虚拟场馆中模拟关键运营任务
针对赛事运营的创新型凭证数字服务
智能和交互式赛事指南
轻松管理云端背包单元的绑定直播流
通过数据加强您的营销工作
元宇宙是下一代互联网
利用生成式 AI 加速创新,创造新的业务佳绩
阿里云高性能开源大模型
借助AI轻松解锁和提炼文档中的知识
通过AI驱动的语音转文本服务获取洞察
探索阿里云人工智能和数据智能的所有功能、新优惠和最新产品
该体验中心提供广泛的用例和产品帮助文档,助您开始使用阿里云 AI 产品和浏览您的业务数据。
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速
元宇宙是下一代互联网
构建发现服务,帮助客户找到最合适的内容
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
加速迁移您的数据到阿里云
在阿里云上建立一个安全且易扩容的环境,助力高效率且高成本效益的上云旅程
迁移到完全托管的云数据库
将 Oracle 数据库顺利迁移到云原生数据库
自带公网 IP 地址上云
利用阿里云强大的安全工具集,保障业务安全、应用程序安全、数据安全、基础设施安全和帐户安全
保护、备份和还原您的云端数据资产
MLPS 2.0 一站式合规解决方案
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
实现对 CloudOps、DevOps、SecOps、AIOps 和 FinOps 的高效、安全和透明的管理
构建您的原生云环境并高效管理集群
快速、安全地最大限度提高您的DevOps优势
实施细粒度安全控制
提供运维效率和总体系统安全性
实时分析您的云消耗并实现节约
实时存储、管理和分析各种规模和类型的数据
登录到带有智能化日志管理解决方案的 AIOps 环境
帮助企业实现数据现代化并规划其数字化未来
帮助零售商快速规划数字化之旅
将全球知名的 CRM 平台引入中国
在线存储、共享和管理照片与文件
构建、部署和管理高可用、高可靠、高弹性的应用程序
快速、安全地最大限度提高您的DevOps优势
将您的采购和销售置于同一企业级全渠道数字平台上
企业内大数据建设、管理和应用的一站式解决方案
帮助企业简化 IT 架构、实现商业价值、加速数字化转型的步伐
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
快速搜集、处理、分析联网设备产生的数据
0.0.201
本文为您介绍如何使用Flink计算表格存储(Tablestore)的数据,表格存储中的数据表或时序表均可作为实时计算Flink的源表或结果表进行使用。
已开通表格存储服务并创建实例。具体操作,请参见开通服务和创建实例。
已开通Flink工作空间。具体操作,请参见开通实时计算Flink版。
实时计算Flink必须与表格存储服务位于同一地域。实时计算Flink支持的地域,请参见地域列表。
已获取AccessKey信息。
出于安全考虑,强烈建议您通过RAM用户使用表格存储功能。具体操作,请参见创建RAM用户并授权。
进入SQL作业创建页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击数据开发 > ETL。
单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步。
填写作业信息。
单击创建。
此处以将数据表中的数据同步至另一个数据表为例,为您介绍如何编写SQL作业。更多SQL示例,请参考SQL示例。
分别创建源表(数据表)和结果表(数据表)的临时表。
详细配置信息,请参见附录1:Tablestore连接器。
-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector' = 'ots', -- 源表的连接器类型。固定取值为ots。
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
'instanceName' = 'xxx', -- 表格存储的实例名称。
'tableName' = 'flink_source_table', -- 表格存储的源表名称。
'tunnelName' = 'flink_source_tunnel', -- 表格存储源表的数据通道名称。
'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。
'ignoreDelete' = 'false' -- 是否忽略DELETE操作类型的实时数据:不忽略。
);
-- 创建结果表(数据表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED -- 主键。
) WITH (
'connector' = 'ots', -- 结果表的连接器类型。固定取值为ots。
'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- 表格存储实例的VPC地址。
'instanceName' = 'xxx', -- 表格存储的实例名称。
'tableName' = 'flink_sink_table', -- 表格存储的结果表名称。
'accessId' = 'xxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey ID。
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- 阿里云账号或者RAM用户的AccessKey Secret。
'valueColumns'='customerid,customername' --插入字段的列名。
);
编写作业逻辑。
将源表数据插入到结果表的代码示例如下:
--将源表数据插入到结果表
INSERT INTO tablestore_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
在SQL编辑区域右侧页签,您可以查看或上传相关配置。
页签名称 | 配置说明 |
页签名称 | 配置说明 |
更多配置 |
|
代码结构 |
|
版本信息 | 您可以在此处查看作业版本信息,操作列下的功能详情请参见管理作业版本。 |
深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。
在SQL编辑区域右上方,单击深度检查。
在深度检查对话框,单击确认。
您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。
在SQL编辑区域右上方,单击调试。
在调试对话框,选择调试集群后,单击下一步。
如果没有可用集群则需要创建新的Session集群,Session集群与SQL作业引擎版本需要保持一致并处于运行中。详情请参见创建Session集群。
配置调试数据。
如果您使用线上数据,无需处理。
如果您需要使用调试数据,需要先单击下载调试数据模板,填写调试数据后,上传调试数据。详情请参见作业调试。
确定好调试数据后,单击确定。
在SQL编辑区域右上方,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确定。
Session集群适用于非生产环境的开发测试环境,通过部署或调试作业提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将生产作业提交至Session集群中,可能会导致业务稳定性问题。
在左侧导航栏,单击运维中心 > 作业运维。
单击目标作业操作列中的启动。
选择无状态启动后,单击启动。当作业状态转变为运行中时,代表作业运行正常。作业启动参数配置,详情请参见作业启动。
Flink中的每个TaskManager建议配置2CPU和4GB内存,此配置可以充分发挥每个TaskManager的计算能力。单个TaskManager能达到1万/s的写入速率。
在source表分区数目足够多的情况下,建议Flink中并发配置在16以内,写入速率随并发线性增长。
在作业运维详情页面,查看Flink计算结果。
在运维中心 > 作业运维页面,单击目标作业名称。
在作业日志页签,单击运行Task Managers页签下Path,ID列的目标任务。
单击日志,在页面查看相关的日志信息。
(可选)停止作业。
如果您对作业进行了修改(例如更改代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要重新部署作业,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,或者更新非动态生效的参数配置时,也需要停止后再启动作业。作业停止详情请参见作业停止。
实时计算Flink版内置了表格存储Tablestore连接器,用于Tablestore的数据读写与同步。
数据表作为源表的DDL定义示例如下:
-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_source_table',
'tunnelName' = 'flink_source_tunnel',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false'
);
时序表作为源表的DDL定义示例如下:
-- 创建源表(时序表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
_m_name STRING,
_data_source STRING,
_tags STRING,
_time BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_source_table',
'tunnelName' = 'flink_source_tunnel',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);
属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordType
和OtsRecordTimestamp
两个字段。字段说明请参见下表。
字段名 | Flink映射名 | 描述 |
OtsRecordType | type | 数据操作类型。 |
OtsRecordTimestamp | timestamp | 数据操作时间,单位为微秒。 说明 全量读取数据时,取值为0。 |
参数 | 适用表 | 是否必填 | 描述 |
connector | 通用参数 | 是 | 源表的连接器类型。固定取值为ots。 |
endPoint | 通用参数 | 是 | 表格存储实例的服务地址,必须使用VPC地址。更多信息,请参见服务地址。 |
instanceName | 通用参数 | 是 | 表格存储的实例名称。 |
tableName | 通用参数 | 是 | 表格存储的源表名称。 |
tunnelName | 通用参数 | 是 | 表格存储源表的通道名称。关于创建通道的具体操作,请参见创建数据通道。 |
accessId | 通用参数 | 是 | 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。 重要 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID和AccessKey Secret,详情请参见变量管理。 |
accessKey | 通用参数 | 是 | |
connectTimeout | 通用参数 | 否 | 连接器连接Tablestore的超时时间,单位为毫秒。默认值为30000。 |
socketTimeout | 通用参数 | 否 | 连接器连接Tablestore的Socket超时时间,单位为毫秒。默认值为30000。 |
ioThreadCount | 通用参数 | 否 | IO线程数量。默认值为4。 |
callbackThreadPoolSize | 通用参数 | 否 | 回调线程池大小。默认值为4。 |
ignoreDelete | 数据表 | 否 | 是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。 |
skipInvalidData | 通用参数 | 否 | 是否忽略脏数据。默认值为false,表示不忽略脏数据。如果不忽略脏数据,则处理脏数据时会报错。 重要 仅实时计算引擎VVR 8.0.4及以上版本支持该参数。 |
retryStrategy | 通用参数 | 否 | 重试策略。参数取值如下:
|
retryCount | 通用参数 | 否 | 重试次数。当retryStrategy设置为COUNT时,可以设置重试次数。默认值为3。 |
retryTimeoutMs | 通用参数 | 否 | 重试的超时时间,单位为毫秒。当retryStrategy设置为TIME时,可以设置重试的超时时间。默认值为180000。 |
streamOriginColumnMapping | 通用参数 | 否 | 原始列名到真实列名的映射。 说明 原始列名与真实列名之间,请使用半角冒号(:)隔开;多组映射之间,请使用半角逗号(,)隔开。例如 |
outputSpecificRowType | 通用参数 | 否 | 是否透传具体的RowType。参数取值如下:
|
Tablestore字段类型 | Flink字段类型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
数据表作为结果表的DDL定义示例如下:
-- 创建结果表(数据表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_sink_table',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
Tablestore结果表必须定义主键(Primary Key)和至少一个属性列,输出数据以Update方式追加到Tablestore表。
时序模型结果表需要指定_m_name
、_data_source
、_tags
、_time
四个主键,其余配置与数据表的结果表配置相同。目前支持WITH参数,SINK表主键和Map格式主键三种方式指定时序表主键。三种方式_tags列的转换优先级为WITH参数方式的优先级最高,Map格式主键与SINK表主键方式次之。
使用WITH参数方式定义DDL的示例如下。
-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);
-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
select
measurement,
datasource,
tag_a,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value,
tag_b,
tag_c,
tag_d,
tag_e,
tag_f
from tablestore_stream;
使用Map格式主键方式定义DDL的示例如下。
Tablestore引入了Flink的Map类型,以便于生成时序模型中时序表的_tags列,Map类型可以支持列的改名、简单函数等映射操作。使用Map时必须保证其中的_tags主键声明位置在第三位。
-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
measurement STRING,
datasource STRING,
tags Map<String, String>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES'
);
-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
select
measurement,
datasource,
MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value
from timeseries_source;
使用SINK表主键方式定义DDL的示例如下。主键定义中的第一位measurement为_m_name列,第二位datasource为_data_source列,最后一位time为time列,中间的多列为tag列。
-- 创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_sink_table',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES'
);
-- 将源表数据插入到结果表
INSERT INTO tablestore_sink
select
measurement,
datasource,
tag_a,
tag_b,
tag_c,
tag_d,
tag_e,
tag_f,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value
from timeseries_source;
参数 | 适用表 | 是否必填 | 说明 |
connector | 通用参数 | 是 | 结果表的连接器类型。固定取值为ots。 |
endPoint | 通用参数 | 是 | 表格存储实例的服务地址,必须使用VPC地址。更多信息,请参见服务地址。 |
instanceName | 通用参数 | 是 | 表格存储的实例名称。 |
tableName | 通用参数 | 是 | 表格存储的时序表名称。 |
accessId | 通用参数 | 是 | 阿里云账号或者RAM用户的AccessKey(包括AccessKey ID和AccessKey Secret)。 重要 为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID和AccessKey Secret,详情请参见变量管理。 |
accessKey | 通用参数 | 是 | |
valueColumns | 数据表 | 是 | 插入字段的列名。多个字段以半角逗号(,)分割,例如 |
storageType | 通用参数 | 否 | 重要 当时序表作为结果表时,必须配置为TIMESERIES。 数据存储类型。取值范围如下:
|
timeseriesSchema | 时序表 | 否 | 重要 当时序表作为结果表时,如果使用WITH参数的方式指定时序表主键,则必须配置该参数。 需要指定为时序表主键的列。
|
connectTimeout | 通用参数 | 否 | 连接器连接Tablestore的超时时间,单位为毫秒。默认值为30000。 |
socketTimeout | 通用参数 | 否 | 连接器连接Tablestore的Socket超时时间,单位为毫秒。默认值为30000。 |
ioThreadCount | 通用参数 | 否 | IO线程数量。默认值为4。 |
callbackThreadPoolSize | 通用参数 | 否 | 回调线程池大小。默认值为4。 |
retryIntervalMs | 通用参数 | 否 | 重试间隔时间,单位为毫秒。默认值为1000。 |
maxRetryTimes | 通用参数 | 否 | 最大重试次数。默认值为10。 |
bufferSize | 通用参数 | 否 | 流入多少条数据后开始输出。默认值为5000,表示输入的数据达到5000条就开始输出。 |
batchWriteTimeoutMs | 通用参数 | 否 | 写入超时的时间,单位为毫秒。默认值为5000,表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 |
batchSize | 通用参数 | 否 | 一次批量写入的条数。默认值为100,最大值为200。 |
ignoreDelete | 通用参数 | 否 | 是否忽略DELETE操作类型的实时数据。默认值为false,表示不忽略DELETE操作类型的实时数据。 重要 仅数据表作为源表时可以根据需要配置。 |
autoIncrementKey | 数据表 | 否 | 当结果表中包含主键自增列时,通过该参数指定主键自增列的列名称。当结果表没有主键自增列时,请不要设置此参数。 重要 仅实时计算引擎VVR 8.0.4及以上版本支持该参数。 |
overwriteMode | 通用参数 | 否 | 数据覆盖模式。参数取值如下:
说明 动态列模式下只支持UPDATE模式。 |
defaultTimestampInMillisecond | 通用参数 | 否 | 设定写入Tablestore数据的默认时间戳。如果不指定,则使用系统当前的毫秒时间戳。 |
dynamicColumnSink | 通用参数 | 否 | 是否开启动态列模式。默认值为false,表示不开启动态列模式。 说明
|
checkSinkTableMeta | 通用参数 | 否 | 是否检查结果表元数据。默认值为true,表示检查Tablestore表的主键列和此处的建表语句中指定的主键是否一致。 |
enableRequestCompression | 通用参数 | 否 | 数据写入过程中是否开启数据压缩。默认值为false,表示不开启数据压缩。 |
Flink字段类型 | Tablestore字段类型 |
BINARY | BINARY |
VARBINARY | BINARY |
CHAR | STRING |
VARCHAR | STRING |
TINYINT | INTEGER |
SMALLINT | INTEGER |
INTEGER | INTEGER |
BIGINT | INTEGER |
FLOAT | DOUBLE |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
从源表(数据表)flink_source_table中读取数据,并将结果写入结果表(时序表)flink_sink_table。
SQL示例如下:
-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_source_table',
'tunnelName' = 'flink_source_tunnel',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false'
);
-- 使用With参数方式创建结果表(时序表)的临时表 tablestore_sink
CREATE TEMPORARY TABLE tablestore_sink(
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_sink_table',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'storageType' = 'TIMESERIES',
'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
);
--将源表数据插入到结果表
INSERT INTO tablestore_sink
select
measurement,
datasource,
tag_a,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value,
tag_b,
tag_c,
tag_d,
tag_e,
tag_f
from tablestore_stream;
从源表(时序表)flink_source_table中读取数据,并将结果写入结果表(数据表)flink_sink_table。
SQL示例如下:
-- 创建源表(时序表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
_m_name STRING,
_data_source STRING,
_tags STRING,
_time BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_source_table',
'tunnelName' = 'flink_source_tunnel',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);
-- 创建结果表(数据表)的临时表 print_table。
CREATE TEMPORARY TABLE tablestore_target(
measurement STRING,
datasource STRING,
tags STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_sink_table',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);
--将源表数据插入到结果表
INSERT INTO tablestore_target
SELECT
_m_name,
_data_source,
_tags,
_time,
binary_value,
bool_value,
double_value,
long_value,
string_value
from tablestore_stream;
批量从源表flink_source_table中读取数据,您可以使用作业调试功能模拟作业运行,调试结果将显示在SQL编辑器下方。
SQL示例如下:
-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_source_table',
'tunnelName' = 'flink_source_tunnel',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false'
);
-- 从源表读取数据
SELECT * FROM tablestore_stream LIMIT 100;
从源表flink_source_table中读取数据,并通过Print连接器将结果打印到TaskManager日志中。
SQL示例如下:
-- 创建源表(数据表)的临时表 tablestore_stream
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector' = 'ots',
'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'xxx',
'tableName' = 'flink_source_table',
'tunnelName' = 'flink_source_tunnel',
'accessId' = 'xxxxxxxxxxx',
'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false'
);
-- 创建结果表的临时表 print_table。
CREATE TEMPORARY TABLE print_table(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector' = 'print', -- print连接器
'logger' = 'true' -- 控制台显示计算结果
);
-- 打印源表的字段
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;