阿里云助力您在中国加快取得成功
一站式安全合规咨询服务
MLPS 2.0 一站式合规解决方案
依托我们的网络进军中国市场
提升面向互联网应用的性能和安全性
保障您的中国业务安全无忧
通过强大的数据安全框架保护您的数据资产
申请 ICP 备案的流程解读和咨询服务
面向大数据建设、管理及应用的全域解决方案
企业内大数据建设、管理和应用的一站式解决方案
将您的采购和销售置于同一企业级全渠道数字平台上
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
快速搭建在线教育平台
提供域名注册、分析和保护服务
云原生 Kubernetes 容器化应用运行环境
以 Kubernetes 为使用界面的容器服务产品,提供符合容器规范的算力资源
安全的镜像托管服务,支持全生命周期管理
多集群环境下微服务应用流量统一管理
提供任意基础设施上容器集群的统一管控,助您轻松管控分布式云场景
高弹性、高可靠的企业级无服务器 Kubernetes 容器产品
敏捷安全的 Serverless 容器运行服务
为虚拟机和容器提供高可靠性、高性能、低时延的块存储服务
一款海量、安全、低成本、高可靠的云存储服务
可靠、弹性、高性能、多共享的文件存储服务
全托管、可扩展的并行文件系统服务。
全托管的 NoSQL 结构化数据实时存储服务
可抵扣多种存储产品的容量包,兼具灵活性和长期成本优化
让您的应用跨不同可用区资源自动分配访问量
随时绑定和解绑 VPC ECS
云网络公网、跨域流量统一计费
高性价比,可抵扣按流量计费的流量费用
创建云上隔离的网络,在专有环境中运行资源
在 VPC 环境下构建公网流量的出入口
具备网络状态可视化、故障智能诊断能力的自助式网络运维服务。
安全便捷的云上服务专属连接
基于阿里云专有网络的私有 DNS 解析服务
保障在线业务不受大流量 DDoS 攻击影响
系统运维和安全审计管控平台
业务上云的第一个网络安全基础设施
集零信任内网访问、办公数据保护、终端管理等多功能于一体的办公安全管控平台
提供7X24小时安全运维平台
防御常见 Web 攻击,缓解 HTTP 泛洪攻击
实现全站 HTTPS,呈现可信的 WEB 访问
为云上应用提供符合行业标准和密码算法等级的数据加解密、签名验签和数据认证能力
一款发现、分类和保护敏感数据的安全服务
创建、控制和管理您的加密密钥
快速提高应用高可用能力服务
围绕应用和微服务的 PaaS 平台
兼容主流开源微服务生态的一站式平台
多集群环境下微服务应用流量统一管理
企业级全托管实时数据流平台
全托管,开箱即用的Apache Kafka全托管服务
提供物联网移动端和云交互的消息队列
开箱即用的全托管 RabbitMQ 服务
提供基于消息的可靠异步通信机制
应用之间的消息队列和通知
无服务器事件总线服务
Super MySQL 和 PostgreSQL,高度兼容 Oracle 语法
全托管 MySQL、PostgreSQL、SQL Server、MariaDB
兼容 Redis® 的缓存和KV数据库
兼容Apache Cassandra、Apache HBase、Elasticsearch、OpenTSDB 等多种开源接口
文档型数据库,支持副本集和分片架构
100%兼容 Apache HBase 并深度扩展,稳定、易用、低成本的NoSQL数据库。
低成本、高可用、可弹性伸缩的在线时序数据库服务
专为搜索和分析而设计,成本效益达到开源的两倍,采用最新的企业级AI搜索和AI助手功能。
一款兼容PostgreSQL协议的实时交互式分析产品
一种快速、完全托管的 TB/PB 级数据仓库
基于 Flink 为大数据行业提供解决方案
基于Qwen和其他热门模型的一站式生成式AI平台,可构建了解您业务的智能应用程
一站式机器学习平台,满足数据挖掘分析需求
高性能向量检索服务,提供低代码API和高成本效益
帮助您的应用快速构建高质量的个性化推荐服务能力
提供定制化的高品质机器翻译服务
全面的AI计算平台,满足大模型训练等高性能AI计算的算力和性能需求
具备智能会话能力的会话机器人
基于机器学习的智能图像搜索产品
基于阿里云深度学习技术,为用户提供图像分割、视频分割、文字识别等离线SDK能力,支持Android、iOS不同的适用终端。
语音识别、语音合成服务以及自学习平台
一站式智能搜索业务开发平台
助力金融企业快速搭建超低时延、高质量、稳定的行情数据服务
帮助企业快速测算和分析企业的碳排放和产品碳足迹
企业工作流程自动化,全面提高效率
金融级云原生分布式架构的一站式高可用应用研发、运维平台
eKYC 数字远程在线解决方案
可智能检测、大数据驱动的综合性反洗钱 (AML) 解决方案
阿里云APM类监控产品
实时云监控服务,确保应用及服务器平稳运行
为系统运维人员管理云基础架构提供全方位服务的云上自动化运维平台
面向您的云资源的风险检测服务
提升分布式环境下的诊断效率
日志类数据一站式服务,无需开发就能部署
ECS 预留实例
让弹性计算产品的成本和灵活性达到最佳平衡的付费方式。云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势自带IP上云
自带公网 IP 地址上云全球网络互联
端到端的软件定义网络解决方案,可推动跨国企业的业务发展全球应用加速
提升面向互联网应用的性能和安全性全球互联网接入
将IDC网关迁移到云端云原生 AI 套件
加速AI平台构建,提高资源效率和交付速度FinOps
实时分析您的云消耗并实现节约SecOps
实施细粒度安全控制DevOps
快速、安全地最大限度提高您的DevOps优势金融科技云数据库解决方案
利用专为金融科技而设的云原生数据库解决方案游戏行业云数据库解决方案
提供多种成熟架构,解决所有数据问题Oracle 数据库迁移
将 Oracle 数据库顺利迁移到云原生数据库数据库迁移
加速迁移您的数据到阿里云阿里云上的数据湖
实时存储、管理和分析各种规模和类型的数据数码信贷
利用大数据和 AI 降低信贷和黑灰产风险面向企业数据技术的大数据咨询服务
帮助企业实现数据现代化并规划其数字化未来人工智能对话服务
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人EasyDispatch 现场服务管理
为现场服务调度提供实时AI决策支持在线教育
快速搭建在线教育平台窄带高清 (HD) 转码
带宽成本降低高达 30%广电级大型赛事直播
为全球观众实时直播大型赛事,视频播放流畅不卡顿直播电商
快速轻松地搭建一站式直播购物平台用于供应链规划的Alibaba Dchain
构建和管理敏捷、智能且经济高效的供应链云胸牌
针对赛事运营的创新型凭证数字服务数字门店中的云 POS 解决方案
将所有操作整合到一个云 POS 系统中元宇宙
元宇宙是下一代互联网人工智能 (AI) 加速
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速DevOps
快速、安全地最大限度提高您的DevOps优势数据迁移解决方案
加速迁移您的数据到阿里云企业 IT 治理
在阿里云上构建高效可控的云环境基于日志管理的AIOps
登录到带有智能化日志管理解决方案的 AIOps 环境备份与存档
数据备份、数据存档和灾难恢复用阿里云金融服务加快创新
在云端开展业务,提升客户满意度
为全球资本市场提供安全、准确和数字化的客户体验
利用专为金融科技而设的云原生数据库解决方案
利用大数据和 AI 降低信贷和黑灰产风险
建立快速、安全的全球外汇交易平台
新零售时代下,实现传统零售业转型
利用云服务处理流量波动问题,扩展业务运营、降低成本
快速轻松地搭建一站式直播购物平台
面向大数据建设、管理及应用的全域解决方案
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
以数字化媒体旅程为当今的媒体市场准备就绪您的内容
带宽成本降低高达 30%
快速轻松地搭建一站式直播购物平台
为全球观众实时直播大型赛事,视频播放流畅不卡顿
使用阿里云弹性高性能计算 E-HPC 将本地渲染农场连接到云端
构建发现服务,帮助客户找到最合适的内容
保护您的媒体存档安全
通过统一的数据驱动平台提供一致的全生命周期客户服务
在钉钉上打造一个多功能的电信和数字生活平台
在线存储、共享和管理照片与文件
提供全渠道的无缝客户体验
面向中小型企业,为独立软件供应商提供可靠的IT服务
打造最快途径,助力您的新云业务扬帆起航
先进的SD-WAN平台,可实现WAN连接、实时优化并降低WAN成本
通过自动化和流程标准化实现快速事件响应
针对关键网络安全威胁提供集中可见性并进行智能安全分析
提供大容量、可靠且高度安全的企业文件传输
用智能技术数字化体育赛事
基于人工智能的低成本体育广播服务
专业的广播转码及信号分配管理服务
基于云的音视频内容引入、编辑和分发服务
在虚拟场馆中模拟关键运营任务
针对赛事运营的创新型凭证数字服务
智能和交互式赛事指南
轻松管理云端背包单元的绑定直播流
通过数据加强您的营销工作
元宇宙是下一代互联网
利用生成式 AI 加速创新,创造新的业务佳绩
阿里云高性能开源大模型
借助AI轻松解锁和提炼文档中的知识
通过AI驱动的语音转文本服务获取洞察
探索阿里云人工智能和数据智能的所有功能、新优惠和最新产品
该体验中心提供广泛的用例和产品帮助文档,助您开始使用阿里云 AI 产品和浏览您的业务数据。
利用阿里云 GPU 技术,为 AI 驱动型业务以及 AI 模型训练和推理加速
元宇宙是下一代互联网
构建发现服务,帮助客户找到最合适的内容
全渠道内置 AI 驱动、拟人化、多语言对话的聊天机器人
加速迁移您的数据到阿里云
在阿里云上建立一个安全且易扩容的环境,助力高效率且高成本效益的上云旅程
迁移到完全托管的云数据库
将 Oracle 数据库顺利迁移到云原生数据库
自带公网 IP 地址上云
利用阿里云强大的安全工具集,保障业务安全、应用程序安全、数据安全、基础设施安全和帐户安全
保护、备份和还原您的云端数据资产
MLPS 2.0 一站式合规解决方案
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
实现对 CloudOps、DevOps、SecOps、AIOps 和 FinOps 的高效、安全和透明的管理
构建您的原生云环境并高效管理集群
快速、安全地最大限度提高您的DevOps优势
实施细粒度安全控制
提供运维效率和总体系统安全性
实时分析您的云消耗并实现节约
实时存储、管理和分析各种规模和类型的数据
登录到带有智能化日志管理解决方案的 AIOps 环境
帮助企业实现数据现代化并规划其数字化未来
帮助零售商快速规划数字化之旅
将全球知名的 CRM 平台引入中国
在线存储、共享和管理照片与文件
构建、部署和管理高可用、高可靠、高弹性的应用程序
快速、安全地最大限度提高您的DevOps优势
将您的采购和销售置于同一企业级全渠道数字平台上
企业内大数据建设、管理和应用的一站式解决方案
帮助企业简化 IT 架构、实现商业价值、加速数字化转型的步伐
快速高效地将您的业务扩展到中国,同时遵守适用的当地法规
快速搜集、处理、分析联网设备产生的数据
0.0.201
本文为您介绍如何通过实时计算Flink版和流式数据湖仓Paimon搭建流式湖仓。
随着社会数字化发展,企业对数据时效性的需求越来越强烈。传统的离线数仓搭建方法论比较明确,通过定时调度离线作业的方式,将上一时段产生的新鲜变更并入分层的数仓中(ODS->DWD->DWS->ADS),但是存在延时长和成本高两大问题。离线作业的调度通常每小时甚至每天才进行一次,数据的消费者仅能看到上一小时甚至昨天的数据。同时,数据的更新多以覆写(overwrite)分区的方式进行,需要重新读取分区中原有的数据,才能与新鲜变更合并,产生新的结果数据。
基于实时计算Flink版和流式数据湖仓Paimon搭建流式湖仓可以解决上述传统离线数仓的问题。利用Flink的实时计算能力,数据可以在数仓分层之间实时流动。同时,利用Paimon高效的更新能力,数据变更可以在分钟级的延时内传递给下游消费者。因此,流式湖仓在延时和成本上具有双重优势。
关于流式数据湖仓Paimon的更多特性,请参见特色功能和Apache Paimon官方网站。
实时计算Flink版是强大的流式计算引擎,支持对海量实时数据高效处理。流式数据湖仓Paimon是流批统一的湖存储格式,支持高吞吐的更新和低延迟的查询。Paimon与Flink深度集成,能够提供一体化的流式湖仓联合解决方案。本文基于Flink+Paimon搭建流式湖仓的方案架构如下:
Flink将数据源写入Paimon,形成ODS层。
Flink订阅ODS层的变更数据(Changelog)进行加工,形成DWD层再次写入Paimon。
Flink订阅DWD层的Changelog进行加工,形成DWS层再次写入Paimon。
最后由云原生大数据计算服务MaxCompute或开源大数据平台E-MapReduce读取Paimon外部表,对外提供应用查询。
该方案有如下优势:
Paimon的每一层数据都可以在分钟级的延时内将变更传递给下游,将传统离线数仓的延时从小时级甚至天级降低至分钟级。
Paimon的每一层数据都可以直接接受变更数据,无需覆写分区,极大地降低了传统离线数仓数据更新与订正的成本,解决了中间层数据不易查、不易更新、不易修正的问题。
模型统一,架构简化。ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Paimon中,可以降低架构复杂度,提高数据处理效率。
该方案依赖于Paimon的三个核心能力,详情如下表所示。
Paimon核心能力 | 详情 |
Paimon核心能力 | 详情 |
主键表更新 | Paimon底层使用LSM Tree数据结构,可以实现高效的数据更新。 关于Paimon主键表、Paimon底层数据结构的介绍请参见Primary Key Table和File Layouts。 |
增量数据产生机制(Changelog Producer) | Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据),保证数据变更可以完整地传递给下游。详情请参见增量数据产生机制。 |
数据合并机制(Merge Engine) | 当Paimon主键表收到多条具有相同主键的数据时,为了保持主键的唯一性,Paimon结果表会将这些数据合并成一条数据。Paimon支持去重、部分更新、预聚合等丰富多样的数据合并行为,详情请参见数据合并机制。 |
本文以某个电商平台为例,通过搭建一套流式湖仓,实现数据的加工清洗,并支持上层应用对数据的查询。流式湖仓实现了数据的分层和复用,并支撑各个业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化推荐等多个业务场景。
构建ODS层:业务数据库实时入仓 MySQL有orders(订单表),orders_pay(订单支付表)和product_catalog(商品类别字典表)三张业务表,这三张表通过Flink实时写入OSS,并以Paimon格式进行存储,作为ODS层。
构建DWD层:主题宽表 将订单表、商品类别字典表、订单支付表利用Paimon的部分更新(partial-update)合并机制进行打宽,以分钟级延时生成DWD层宽表并产出变更数据(Changelog)。
构建DWS层:指标计算 Flink实时消费宽表的变更数据,利用Paimon的预聚合(aggregation)合并机制产出DWM层dwm_users_shops(用户-商户聚合中间表),并最终产出DWS层dws_users(用户聚合指标表)以及dws_shops(商户聚合指标表)。
已开通对象存储OSS,详情请参见OSS控制台快速入门。OSS Bucket的存储类型需要为标准存储,详情请参见存储类型概述。
已开通Flink全托管,详情请参见开通实时计算Flink版。
已开通云原生大数据计算服务MaxCompute,并已在MaxCompute项目中上传Paimon插件(paimon_maxcompute_connector.jar)。详情请参见开通MaxCompute和DataWorks以及Paimon外部表。
MaxCompute项目、OSS Bucket需要与Flink工作空间处于相同地域。
仅实时计算引擎VVR 8.0.1及以上版本支持该流式湖仓方案。
本文以云数据库RDS MySQL版为例,创建数据库名称为order_dw,并创建三张业务表及数据。
RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见网络连通性。
创建名称为order_dw的数据库,并创建高权限账号或具有数据库order_dw读写权限的普通账号。
CREATE TABLE `orders` (
order_id bigint not null primary key,
user_id varchar(50) not null,
shop_id bigint not null,
product_id bigint not null,
buy_fee bigint not null,
create_time timestamp not null,
update_time timestamp not null default now(),
state int not null
);
CREATE TABLE `orders_pay` (
pay_id bigint not null primary key,
order_id bigint not null,
pay_platform int not null,
create_time timestamp not null
);
CREATE TABLE `product_catalog` (
product_id bigint not null primary key,
catalog_name varchar(50) not null
);
-- 准备数据
INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
INSERT INTO orders VALUES
(100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
(100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
(100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
(100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
(100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
(100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
(100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
INSERT INTO orders_pay VALUES
(2001, 100001, 1, '2023-02-15 17:40:56'),
(2002, 100002, 1, '2023-02-15 17:40:56'),
(2003, 100003, 0, '2023-02-15 17:40:56'),
(2004, 100004, 0, '2023-02-15 17:40:56'),
(2005, 100005, 0, '2023-02-15 18:40:56'),
(2006, 100006, 0, '2023-02-15 18:40:56'),
(2007, 100007, 0, '2023-02-15 18:40:56');
创建Paimon Catalog。
进入元数据管理页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
单击左侧的元数据管理。
创建自定义Catalog类型。
单击创建Catalog,选择自定义Catalog页签,单击创建自定义Catalog类型。
在创建自定义Catalog类型窗口中,上传Paimon自定义catalog插件(paimon-ali-vvr-8.0-vvp-0.6-ali-1-SNAPSHOT.jar)。
若出现文件尺寸过大的提示信息,单击继续即可。
单击下一步(需等待一定时间),单击确定。
在自定义Catalog页签,单击paimon-06-1后,单击下一步。
在创建Catalog窗口中,输入以下SQL语句并填入配置项后,单击确定,新建名为dw的Paimon Catalog。
CREATE CATALOG `dw` WITH (
'type' = 'paimon-06-1',
'metastore' = 'maxcompute',
'warehouse' = '<warehouse>',
'fs.oss.endpoint' = '<oss endpoint>',
'fs.oss.accessKeyId' = '<oss access key id>',
'fs.oss.accessKeySecret' = '<access key secret>',
'maxcompute.endpoint' = '<maxcompute endpoint>',
'maxcompute.accessid' = '<maxcompute access id>',
'maxcompute.accesskey' = '<access key secret>',
'maxcompute.project' = '<maxcompute project>',
'maxcompute.oss.endpoint' = '<maxcompute oss endpoint>'
);
WITH参数说明
配置项 | 说明 | 是否必填 | 备注 |
type | Catalog类型。 | 是 | 固定值为 |
metastore | 元数据存储类型。 | 是 | 为了后续能在MaxCompute中分析Paimon表的数据,此处填写 |
warehouse | OSS服务中所指定的数仓目录。 | 是 | 格式为oss://<bucket>/<object>。其中:
您可以在OSS管理控制台上查看您的bucket和object名称。 重要
|
fs.oss.endpoint | 从Flink访问OSS服务的访问域名。 | 是 | 参见访问域名和数据中心。 |
fs.oss.accessKeyId | 拥有读写OSS权限的阿里云账号的AccessKey ID。 | 是 | 获取方法请参见查看RAM用户的AccessKey信息。 |
fs.oss.accessKeySecret | 拥有读写OSS权限的阿里云账号的AccessKey Secret。 | 是 | 获取方法请参见查看RAM用户的AccessKey信息。 |
maxcompute.endpoint | MaxCompute服务的访问域名。 | 是 | 参见Endpoint。 |
maxcompute.accessid | 拥有MaxCompute权限的阿里云账号的AccessKey。 | 是 | 获取方法请参见查看RAM用户的AccessKey信息。 |
maxcompute.accesskey | 拥有MaxCompute权限的阿里云账号的AccessKey Secret。 | 是 | 获取方法请参见查看RAM用户的AccessKey信息。 |
maxcompute.project | 需要操作的MaxCompute项目名称。 | 是 | 当前暂时不支持开启了Schema操作的MaxCompute项目。 |
maxcompute.oss.endpoint | 从MaxCompute访问OSS服务的访问域名。 | 否 | 若不填写,则默认使用fs.oss.endpoint。参见访问域名和数据中心。 重要 由于OSS Bucket与MaxCompute项目处于同一地域,此处应填写内网Endpoint。 |
创建MySQL Catalog。
在元数据管理页面,单击创建Catalog。
在内置Catalog页签,单击MySQL,单击下一步。
填写以下参数,单击确定,新建名为mysqlcatalog的MySQL Catalog。
配置项 | 说明 | 是否必填 | 备注 |
配置项 | 说明 | 是否必填 | 备注 |
catalog name | Catalog名称。 | 是 | 填写为自定义的英文名。 本文以mysqlcatalog为例。 |
hostname | MySQL数据库的IP地址或者Hostname。 | 是 | 详情请参见查看和管理实例连接地址和端口。由于RDS MySQL版实例和Flink全托管处于相同VPC,此处应填写内网地址。 |
port | MySQL数据库服务的端口号,默认值为3306。 | 否 | 详情请参见查看和管理实例连接地址和端口。 |
default-database | 默认的MySQL数据库名称。 | 是 | 本文填写需要同步的数据库名order_dw。 |
username | MySQL数据库服务的用户名。 | 是 | 本文为准备MySQL CDC数据源中创建的账号。 |
password | MySQL数据库服务的密码。 | 是 | 本文为准备MySQL CDC数据源中创建的密码。 |
基于CREATE DATABASE AS(CDAS)语句功能,可以一次性将ODS层建出来。SQL作业中通过SET语句指定的配置项也可以在作业运维页面的作业部署详情页签指定,详见控制台操作。Paimon写入性能优化请参见Apache Paimon官方文档。
创建CDAS同步作业。
在数据开发 > ETL页面,新建名为ods的SQL流作业,将如下代码复制到SQL编辑器。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; -- 减轻检查点长尾的影响。
SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- 消除无用的SinkMaterialize算子。
-- Paimon结果表在每次检查点完成之后才会正式提交数据。
-- 此处将检查点间隔缩短为10s,是为了更快地看到结果。
-- 在生产环境下,系统检查点的间隔与两次系统检查点之间的最短时间间隔根据业务对延时要求的不同,一般设置为1分钟到10分钟。
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
CREATE DATABASE IF NOT EXISTS dw.order_dw
WITH (
'changelog-producer' = 'input' -- 因为输入数据就是MySQL产生的binlog,已经是完整的变更数据,所以可以直接把输入数据作为变更数据。
) AS DATABASE mysqlcatalog.order_dw INCLUDING all tables; -- 也可以根据需要选择上游数据库需要入仓的表。
单击右上方的部署,进行作业部署。
在作业运维页面,单击刚刚部署的ods作业操作列的启动,选择无状态启动启动作业。
查看MySQL同步到Paimon的三张表的数据。
在数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
SELECT * FROM dw.order_dw.orders ORDER BY order_id;
创建DWD层Paimon宽表dwd_orders 在数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
CREATE TABLE dw.order_dw.dwd_orders (
order_id BIGINT,
order_user_id STRING,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP,
order_update_time TIMESTAMP,
order_state INT,
pay_id BIGINT,
pay_platform INT COMMENT 'platform 0: phone, 1: pc',
pay_create_time TIMESTAMP,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
'changelog-producer' = 'lookup' -- 使用lookup增量数据产生机制以低延时产出变更数据
);
返回Query has been executed
表示创建成功。
实时消费ODS层orders、orders_pay表的变更数据。
在数据开发 > ETL页面,新建名为dwd的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。
通过该SQL作业,orders表会与product_catalog表进行维表关联,关联后的结果将与orders_pay一起写入dwd_orders表中,利用Paimon表的部分更新数据合并机制,将orders表和orders_pay表中order_id相同的数据进行打宽。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL。
INSERT INTO dw.order_dw.dwd_orders
SELECT
o.order_id,
o.user_id,
o.shop_id,
o.product_id,
dim.catalog_name,
o.buy_fee,
o.create_time,
o.update_time,
o.state,
NULL,
NULL,
NULL
FROM
dw.order_dw.orders o
LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
ON o.product_id = dim.product_id
UNION ALL
SELECT
order_id,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
pay_id,
pay_platform,
create_time
FROM
dw.order_dw.orders_pay;
查看宽表dwd_orders的数据。 在数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
SELECT * FROM dw.order_dw.dwd_orders ORDER BY order_id;
创建DWS层的聚合表dws_users以及dws_shops。 在数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
-- 用户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_users (
user_id STRING,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额',
PRIMARY KEY (user_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
-- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制
);
-- 商户维度聚合指标表。
CREATE TABLE dw.order_dw.dws_shops (
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额',
uv BIGINT COMMENT '当日不同购买用户总人数',
pv BIGINT COMMENT '当日购买用户总人次',
PRIMARY KEY (shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果
'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果
-- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制
);
-- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。
CREATE TABLE dw.order_dw.dwm_users_shops (
user_id STRING,
shop_id BIGINT,
ds STRING,
payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额',
pv BIGINT COMMENT '当日用户在商户购买的次数',
PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表
'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果
'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果
'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
-- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。
'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。
'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。
);
返回Query has been executed
表示创建成功。
实时消费DWD层dwd_orders表的变更数据。 在数据开发 > ETL页面的作业草稿页签,新建名为dwm的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。
通过该SQL作业,dwd_orders表的数据会写入dwm_users_shops表中,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
INSERT INTO dw.order_dw.dwm_users_shops
SELECT
order_user_id,
order_shop_id,
DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
order_fee,
1 -- 一条输入记录代表一次消费
FROM dw.order_dw.dwd_orders
WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
实时消费DWM层dwm_users_shops表的变更数据。 在数据开发 > ETL页面,新建名为dws的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。
通过该SQL作业,dwm_users_shops表的数据会写入dws_users表中,利用Paimon表的预聚合数据合并机制,自动对payed_buy_fee_sum求和,算出用户在所有商户的消费总额。另外,该SQL作业也会将dwm_users_shops表的数据写入dws_shops表中,利用Paimon表的预聚合数据合并机制,自动对payed_buy_fee_sum求和,算出商户的总流水。另外,自动对1求和,也能算出有几个不同的用户在该商户消费过。最后,自动对pv求和,算出在该商户消费的总人次。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
-- 与dwd不同,此处每一条INSERT语句写入的是不同的Paimon表,可以放在同一个作业中。
BEGIN STATEMENT SET;
INSERT INTO dw.order_dw.dws_users
SELECT
user_id,
ds,
payed_buy_fee_sum
FROM dw.order_dw.dwm_users_shops;
-- 以商户为主键,部分热门商户的数据量可能远高于其他商户。
-- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。
INSERT INTO dw.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
SELECT
shop_id,
ds,
payed_buy_fee_sum,
1, -- 一条输入记录代表一名用户在该商户的所有消费
pv
FROM dw.order_dw.dwm_users_shops;
END;
查看dws_users表和dws_shops表的数据。 在数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
--查看dws_users表数据
SELECT * FROM dw.order_dw.dws_users ORDER BY user_id;
--查看dws_shops表数据
SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;
前面已完成了流式湖仓的构建,下面将测试流式湖仓捕捉业务数据库变化的能力。
向MySQL的order_dw数据库中插入如下数据。
INSERT INTO orders VALUES
(100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
(100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
(100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
INSERT INTO orders_pay VALUES
(2008, 100008, 1, '2023-02-15 18:40:56'),
(2009, 100009, 1, '2023-02-15 19:40:56'),
(2010, 100010, 0, '2023-02-15 20:40:56');
查看dws_users表和dws_shops表的数据。 在数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
dws_users表
SELECT * FROM dw.order_dw.dws_users ORDER BY user_id;
dws_shops表
SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;
上一小节展示了在Flink中进行Paimon Catalog的创建与Paimon表的写入。由于Catalog中指定了元数据存储为MaxCompute,因此在Catalog中创建表时,MaxCompute中将会自动创建Paimon外表。本节展示流式湖仓搭建完成后,利用MaxCompute进行数据分析的一些简单应用场景。关于MaxCompute如何查询Paimon外表,详见Paimon外部表。
对DWS层聚合表进行分析。本文使用DataWorks查询23年2月15日交易额前三高的商户的代码示例如下。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;
对DWD层宽表进行分析。本文使用DataWorks查询某个客户23年2月特定支付平台支付的订单明细的代码示例如下。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;
对DWD层宽表进行分析。本文使用DataWorks查询23年2月内每个品类的订单总量和订单总金额的代码示例如下。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
报错You have NO privilege 'odps:CreateInstance'的解决方法:ODPS-0420095
通过Flink批处理能力搭建Paimon离线湖仓,详情请参见Flink批处理快速入门。