本文为您介绍如何通过实时计算Flink版和流式数据湖仓Paimon搭建流式湖仓。
背景信息
随着社会数字化发展,企业对数据时效性的需求越来越强烈。传统的离线数仓搭建方法论比较明确,通过定时调度离线作业的方式,将上一时段产生的新鲜变更并入分层的数仓中(ODS->DWD->DWS->ADS),但是存在延时长和成本高两大问题。离线作业的调度通常每小时甚至每天才进行一次,数据的消费者仅能看到上一小时甚至昨天的数据。同时,数据的更新多以覆写(overwrite)分区的方式进行,需要重新读取分区中原有的数据,才能与新鲜变更合并,产生新的结果数据。
基于实时计算Flink版和流式数据湖仓Paimon搭建流式湖仓可以解决上述传统离线数仓的问题。利用Flink的实时计算能力,数据可以在数仓分层之间实时流动。同时,利用Paimon高效的更新能力,数据变更可以在分钟级的延时内传递给下游消费者。因此,流式湖仓在延时和成本上具有双重优势。
关于流式数据湖仓Paimon的更多特性,请参见特色功能和Apache Paimon官方网站。
方案架构和优势
架构
实时计算Flink版是强大的流式计算引擎,支持对海量实时数据高效处理。流式数据湖仓Paimon是流批统一的湖存储格式,支持高吞吐的更新和低延迟的查询。Paimon与Flink深度集成,能够提供一体化的流式湖仓联合解决方案。本文基于Flink+Paimon搭建流式湖仓的方案架构如下:
Flink将数据源写入Paimon,形成ODS层。
Flink订阅ODS层的变更数据(Changelog)进行加工,形成DWD层再次写入Paimon。
Flink订阅DWD层的Changelog进行加工,形成DWS层再次写入Paimon。
最后由云原生大数据计算服务MaxCompute或开源大数据平台E-MapReduce读取Paimon外部表,对外提供应用查询。
优势
该方案有如下优势:
Paimon的每一层数据都可以在分钟级的延时内将变更传递给下游,将传统离线数仓的延时从小时级甚至天级降低至分钟级。
Paimon的每一层数据都可以直接接受变更数据,无需覆写分区,极大地降低了传统离线数仓数据更新与订正的成本,解决了中间层数据不易查、不易更新、不易修正的问题。
模型统一,架构简化。ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Paimon中,可以降低架构复杂度,提高数据处理效率。
该方案依赖于Paimon的三个核心能力,详情如下表所示。
Paimon核心能力 | 详情 |
主键表更新 | Paimon底层使用LSM Tree数据结构,可以实现高效的数据更新。 关于Paimon主键表、Paimon底层数据结构的介绍请参见Primary Key Table和File Layouts。 |
增量数据产生机制(Changelog Producer) | Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据),保证数据变更可以完整地传递给下游。详情请参见增量数据产生机制。 |
数据合并机制(Merge Engine) | 当Paimon主键表收到多条具有相同主键的数据时,为了保持主键的唯一性,Paimon结果表会将这些数据合并成一条数据。Paimon支持去重、部分更新、预聚合等丰富多样的数据合并行为,详情请参见数据合并机制。 |
实践场景
本文以某个电商平台为例,通过搭建一套流式湖仓,实现数据的加工清洗,并支持上层应用对数据的查询。流式湖仓实现了数据的分层和复用,并支撑各个业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化推荐等多个业务场景。
构建ODS层:业务数据库实时入仓 MySQL有orders(订单表),orders_pay(订单支付表)和product_catalog(商品类别字典表)三张业务表,这三张表通过Flink实时写入OSS,并以Paimon格式进行存储,作为ODS层。
构建DWD层:主题宽表 将订单表、商品类别字典表、订单支付表利用Paimon的部分更新(partial-update)合并机制进行打宽,以分钟级延时生成DWD层宽表并产出变更数据(Changelog)。
构建DWS层:指标计算 Flink实时消费宽表的变更数据,利用Paimon的预聚合(aggregation)合并机制产出DWM层dwm_users_shops(用户-商户聚合中间表),并最终产出DWS层dws_users(用户聚合指标表)以及dws_shops(商户聚合指标表)。
前提条件
已开通对象存储OSS,详情请参见OSS控制台快速入门。OSS Bucket的存储类型需要为标准存储,详情请参见存储类型概述。
已开通Flink全托管,详情请参见开通实时计算Flink版。
已开通云原生大数据计算服务MaxCompute,并已在MaxCompute项目中上传Paimon插件(paimon_maxcompute_connector.jar)。详情请参见开通MaxCompute和DataWorks以及Paimon外部表。
MaxCompute项目、OSS Bucket需要与Flink工作空间处于相同地域。
使用限制
仅实时计算引擎VVR 8.0.1及以上版本支持该流式湖仓方案。
构建流式湖仓
准备MySQL CDC数据源
本文以云数据库RDS MySQL版为例,创建数据库名称为order_dw,并创建三张业务表及数据。
- 重要
RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见网络连通性。
创建名称为order_dw的数据库,并创建高权限账号或具有数据库order_dw读写权限的普通账号。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee bigint not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- 准备数据 INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
管理元数据
创建Paimon Catalog。
进入元数据管理页面。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
单击左侧的元数据管理。
创建自定义Catalog类型。
单击创建Catalog,选择自定义Catalog页签,单击创建自定义Catalog类型。
在创建自定义Catalog类型窗口中,上传Paimon自定义catalog插件(paimon-ali-vvr-8.0-vvp-0.6-ali-1-SNAPSHOT.jar)。
若出现文件尺寸过大的提示信息,单击继续即可。
单击下一步(需等待一定时间),单击确定。
在自定义Catalog页签,单击paimon-06-1后,单击下一步。
在创建Catalog窗口中,输入以下SQL语句并填入配置项后,单击确定,新建名为dw的Paimon Catalog。
CREATE CATALOG `dw` WITH ( 'type' = 'paimon-06-1', 'metastore' = 'maxcompute', 'warehouse' = '<warehouse>', 'fs.oss.endpoint' = '<oss endpoint>', 'fs.oss.accessKeyId' = '<oss access key id>', 'fs.oss.accessKeySecret' = '<access key secret>', 'maxcompute.endpoint' = '<maxcompute endpoint>', 'maxcompute.accessid' = '<maxcompute access id>', 'maxcompute.accesskey' = '<access key secret>', 'maxcompute.project' = '<maxcompute project>', 'maxcompute.oss.endpoint' = '<maxcompute oss endpoint>' );
创建MySQL Catalog。
在元数据管理页面,单击创建Catalog。
在内置Catalog页签,单击MySQL,单击下一步。
填写以下参数,单击确定,新建名为mysqlcatalog的MySQL Catalog。
配置项
说明
是否必填
备注
catalog name
Catalog名称。
是
填写为自定义的英文名。
本文以mysqlcatalog为例。
hostname
MySQL数据库的IP地址或者Hostname。
是
详情请参见查看和管理实例连接地址和端口。由于RDS MySQL版实例和Flink全托管处于相同VPC,此处应填写内网地址。
port
MySQL数据库服务的端口号,默认值为3306。
否
详情请参见查看和管理实例连接地址和端口。
default-database
默认的MySQL数据库名称。
是
本文填写需要同步的数据库名order_dw。
username
MySQL数据库服务的用户名。
是
本文为准备MySQL CDC数据源中创建的账号。
password
MySQL数据库服务的密码。
是
本文为准备MySQL CDC数据源中创建的密码。
构建ODS层:业务数据库实时入仓
基于CREATE DATABASE AS(CDAS)语句功能,可以一次性将ODS层建出来。SQL作业中通过SET语句指定的配置项也可以在作业运维页面的作业部署详情页签指定,详见控制台操作。Paimon写入性能优化请参见Apache Paimon官方文档。
创建CDAS同步作业。
在
页面,新建名为ods的SQL流作业,将如下代码复制到SQL编辑器。SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; -- 减轻检查点长尾的影响。 SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- 消除无用的SinkMaterialize算子。 -- Paimon结果表在每次检查点完成之后才会正式提交数据。 -- 此处将检查点间隔缩短为10s,是为了更快地看到结果。 -- 在生产环境下,系统检查点的间隔与两次系统检查点之间的最短时间间隔根据业务对延时要求的不同,一般设置为1分钟到10分钟。 SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; CREATE DATABASE IF NOT EXISTS dw.order_dw WITH ( 'changelog-producer' = 'input' -- 因为输入数据就是MySQL产生的binlog,已经是完整的变更数据,所以可以直接把输入数据作为变更数据。 ) AS DATABASE mysqlcatalog.order_dw INCLUDING all tables; -- 也可以根据需要选择上游数据库需要入仓的表。
单击右上方的部署,进行作业部署。
在作业运维页面,单击刚刚部署的ods作业操作列的启动,选择无状态启动启动作业。
查看MySQL同步到Paimon的三张表的数据。
在
页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。SELECT * FROM dw.order_dw.orders ORDER BY order_id;
构建DWD层:主题宽表
创建DWD层Paimon宽表dwd_orders 在
页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。CREATE TABLE dw.order_dw.dwd_orders ( order_id BIGINT, order_user_id STRING, order_shop_id BIGINT, order_product_id BIGINT, order_product_catalog_name STRING, order_fee BIGINT, order_create_time TIMESTAMP, order_update_time TIMESTAMP, order_state INT, pay_id BIGINT, pay_platform INT COMMENT 'platform 0: phone, 1: pc', pay_create_time TIMESTAMP, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表 'changelog-producer' = 'lookup' -- 使用lookup增量数据产生机制以低延时产出变更数据 );
返回
Query has been executed
表示创建成功。实时消费ODS层orders、orders_pay表的变更数据。
在
页面,新建名为dwd的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。通过该SQL作业,orders表会与product_catalog表进行维表关联,关联后的结果将与orders_pay一起写入dwd_orders表中,利用Paimon表的部分更新数据合并机制,将orders表和orders_pay表中order_id相同的数据进行打宽。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; -- Paimon目前暂不支持在同一个作业里通过多条INSERT语句写入同一张表,因此这里使用UNION ALL。 INSERT INTO dw.order_dw.dwd_orders SELECT o.order_id, o.user_id, o.shop_id, o.product_id, dim.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, NULL, NULL, NULL FROM dw.order_dw.orders o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id UNION ALL SELECT order_id, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, pay_id, pay_platform, create_time FROM dw.order_dw.orders_pay;
查看宽表dwd_orders的数据。 在
页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。SELECT * FROM dw.order_dw.dwd_orders ORDER BY order_id;
构建DWS层:指标计算
创建DWS层的聚合表dws_users以及dws_shops。 在
页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。-- 用户维度聚合指标表。 CREATE TABLE dw.order_dw.dws_users ( user_id STRING, ds STRING, payed_buy_fee_sum BIGINT COMMENT '当日完成支付的总金额', PRIMARY KEY (user_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表 'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 对 payed_buy_fee_sum 的数据求和产生聚合结果 -- 由于dws_users表不再被下游流式消费,因此无需指定增量数据产生机制 ); -- 商户维度聚合指标表。 CREATE TABLE dw.order_dw.dws_shops ( shop_id BIGINT, ds STRING, payed_buy_fee_sum BIGINT COMMENT '当日完成支付总金额', uv BIGINT COMMENT '当日不同购买用户总人数', pv BIGINT COMMENT '当日购买用户总人次', PRIMARY KEY (shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果 'fields.uv.aggregate-function' = 'sum', -- 对 uv 的数据求和产生聚合结果 'fields.pv.aggregate-function' = 'sum' -- 对 pv 的数据求和产生聚合结果 -- 由于dws_shops表不再被下游流式消费,因此无需指定增量数据产生机制 ); -- 为了同时计算用户视角的聚合表以及商户视角的聚合表,另外创建一个以用户 + 商户为主键的中间表。 CREATE TABLE dw.order_dw.dwm_users_shops ( user_id STRING, shop_id BIGINT, ds STRING, payed_buy_fee_sum BIGINT COMMENT '当日用户在商户完成支付的总金额', pv BIGINT COMMENT '当日用户在商户购买的次数', PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- 使用预聚合数据合并机制产生聚合表 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 对 payed_buy_fee_sum 的数据求和产生聚合结果 'fields.pv.aggregate-function' = 'sum', -- 对 pv 的数据求和产生聚合结果 'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据 -- dwm层的中间表一般不直接提供上层应用查询,因此可以针对写入性能进行优化。 'file.format' = 'avro', -- 使用avro行存格式的写入性能更加高效。 'metadata.stats-mode' = 'none' -- 放弃统计信息会增加OLAP查询代价(对持续的流处理无影响),但会让写入性能更加高效。 );
返回
Query has been executed
表示创建成功。实时消费DWD层dwd_orders表的变更数据。 在
页面的作业草稿页签,新建名为dwm的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。通过该SQL作业,dwd_orders表的数据会写入dwm_users_shops表中,利用Paimon表的预聚合数据合并机制,自动对order_fee求和,算出用户在商户的消费总额。同时,自动对1求和,也能算出用户在商户的消费次数。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; INSERT INTO dw.order_dw.dwm_users_shops SELECT order_user_id, order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, order_fee, 1 -- 一条输入记录代表一次消费 FROM dw.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
实时消费DWM层dwm_users_shops表的变更数据。 在
页面,新建名为dws的SQL流作业,并将如下代码复制到SQL编辑器后,部署作业并无状态启动作业。通过该SQL作业,dwm_users_shops表的数据会写入dws_users表中,利用Paimon表的预聚合数据合并机制,自动对payed_buy_fee_sum求和,算出用户在所有商户的消费总额。另外,该SQL作业也会将dwm_users_shops表的数据写入dws_shops表中,利用Paimon表的预聚合数据合并机制,自动对payed_buy_fee_sum求和,算出商户的总流水。另外,自动对1求和,也能算出有几个不同的用户在该商户消费过。最后,自动对pv求和,算出在该商户消费的总人次。
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; -- 与dwd不同,此处每一条INSERT语句写入的是不同的Paimon表,可以放在同一个作业中。 BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dws_users SELECT user_id, ds, payed_buy_fee_sum FROM dw.order_dw.dwm_users_shops; -- 以商户为主键,部分热门商户的数据量可能远高于其他商户。 -- 因此使用local merge在写入Paimon之前先在内存中进行预聚合,缓解数据倾斜问题。 INSERT INTO dw.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */ SELECT shop_id, ds, payed_buy_fee_sum, 1, -- 一条输入记录代表一名用户在该商户的所有消费 pv FROM dw.order_dw.dwm_users_shops; END;
查看dws_users表和dws_shops表的数据。 在
页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。--查看dws_users表数据 SELECT * FROM dw.order_dw.dws_users ORDER BY user_id; --查看dws_shops表数据 SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;
捕捉业务数据库的变化
前面已完成了流式湖仓的构建,下面将测试流式湖仓捕捉业务数据库变化的能力。
向MySQL的order_dw数据库中插入如下数据。
INSERT INTO orders VALUES (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1), (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1), (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 18:40:56'), (2009, 100009, 1, '2023-02-15 19:40:56'), (2010, 100010, 0, '2023-02-15 20:40:56');
查看dws_users表和dws_shops表的数据。 在
页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。dws_users表
SELECT * FROM dw.order_dw.dws_users ORDER BY user_id;
dws_shops表
SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;
使用流式湖仓
上一小节展示了在Flink中进行Paimon Catalog的创建与Paimon表的写入。由于Catalog中指定了元数据存储为MaxCompute,因此在Catalog中创建表时,MaxCompute中将会自动创建Paimon外表。本节展示流式湖仓搭建完成后,利用MaxCompute进行数据分析的一些简单应用场景。关于MaxCompute如何查询Paimon外表,详见Paimon外部表。
排名查询
对DWS层聚合表进行分析。本文使用DataWorks查询23年2月15日交易额前三高的商户的代码示例如下。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;
明细查询
对DWD层宽表进行分析。本文使用DataWorks查询某个客户23年2月特定支付平台支付的订单明细的代码示例如下。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;
数据报表
对DWD层宽表进行分析。本文使用DataWorks查询23年2月内每个品类的订单总量和订单总金额的代码示例如下。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
相关文档
报错You have NO privilege 'odps:CreateInstance'的解决方法:ODPS-0420095
通过Flink批处理能力搭建Paimon离线湖仓,详情请参见Flink批处理快速入门。