本文将基于物化表,带您快速体验如何构建流批一体的湖仓分析处理链路,以及通过修改物化表新鲜度,完成由批到流的切换,实现数据的实时更新。
物化表简介
物化表(Materialized Table)是Flink SQL引入的一种新的表类型,旨在简化批处理和流处理数据管道,提供流批一体化的开发体验。创建物化表时无需声明字段与类型,通过指定数据新鲜度和查询语句,Flink引擎将从查询语句自动推导物化表的Schema,并创建相应的数据刷新管道,以达到指定的数据新鲜度。详情请参见物化表管理。
实时湖仓链路实践图
Flink将数据源写入Paimon,形成ODS层。
Flink将ODS层数据关联打宽进行加工,写入物化表,形成DWD层。
通过设定不同的数据新鲜度,构建多个物化表,进行多维度的业务统计,形成DWS层,以对外提供应用查询。
前提条件
已创建Flink工作空间,详情请参见开通实时计算Flink版。
如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。
步骤一:准备测试数据
(可选)创建Paimon Catalog。
基于Apache Paimon提供了物化表能力,需要创建元数据存储类型为Filesystem的Paimon Catalog,如果已经创建,可以跳过此步骤。更多详情请参见创建Paimon Catalog。
创建用户行为日志表ods_user_log和商品信息表ods_dim_product。
登录实时计算管理控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏选择
。本样例已经创建名为paimon的Paimon Catalog,使用默认数据库default。
CREATE TABLE `paimon`.`default`.`ods_user_log` ( item_id INT NOT NULL, user_id INT NOT NULL, vtime TIMESTAMP(6), ds VARCHAR(10) ) PARTITIONED BY(ds) WITH ( 'bucket' = '4', --指定分桶数为4 'bucket-key' = 'item_id' --指定确定数据分桶的键。相同的item_id会被放到一个桶里。 ); CREATE TABLE `paimon`.`default`.`ods_dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'bucket' = '4', 'bucket-key' = 'item_id' );
单击右上角运行,创建相应的数据表。
在左侧导航栏选择数据管理,单击对应的Paimon Catalog后,单击刷新查看新增表。
使用模拟数据生成Faker连接器生成用户行为数据,并写入Paimon表中。
在左侧导航栏选择
。单击新建,选择空白的流作业草稿,单击下一步,单击创建。
将如下SQL语句复制到SQL编辑器。
CREATE TEMPORARY TABLE `user_log` ( item_id INT, //商品ID user_id INT, //用户ID vtime TIMESTAMP, ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd') ) WITH ( 'connector' = 'faker', --faker连接器 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', --生成0-1000其中一个随机数 'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}', 'fields.vtime.expression'='#{date.past ''5'',''HOURS''}', --基于当前日期时间前5小时生成数据 'rows-per-second' = '3' --每秒生成3条数据 ); CREATE TEMPORARY TABLE `dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'connector' = 'faker', --faker连接器 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', 'fields.title.expression'='#{book.title}', 'fields.pict_url.expression'='#{internet.domainName}', 'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'rows-per-second' = '3' --每秒生成3条数据 ); BEGIN STATEMENT SET; INSERT INTO `paimon`.`default`.`ods_user_log` SELECT item_id, user_id, vtime, CAST(ds AS VARCHAR(10)) AS ds FROM `user_log`; INSERT INTO `paimon`.`default`.`ods_dim_product` SELECT item_id, title, pict_url, brand_id, seller_id FROM `dim_product`; END;
单击右上方的部署,进行作业部署。
单击左侧导航栏的
,单击目标作业操作列的启动,选择无状态启动后单击启动。
查询模拟数据。
在左侧导航栏选择
。将如下SQL语句复制到SQL编辑器后,单击右上角的运行。SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10; SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;
步骤二:创建物化表
本部分通过将源表进行打宽,构建了DWD层的物化表dwd_user_log_product,并基于该物化表进一步构建下游物化表以进行业务统计,完成了DWS层的构建。
构建数据仓库的DWD层,创建dwd_user_log_product物化表。
在左侧导航栏选择数据管理,单击目标Paimon Catalog。
单击目标数据库(本示例为default)后,单击创建物化表。将如下SQL语句复制到SQL编辑器,单击创建。
-- DWD 层打宽逻辑 CREATE MATERIALIZED TABLE dwd_user_log_product( PRIMARY KEY (item_id) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR --1小时刷新 AS SELECT l.ds, l.item_id, l.user_id, l.vtime, r.brand_id, r.seller_id FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r ON l.item_id = r.item_id;
构建数据仓库DWS层,基于dwd_user_log_product物化表进行多维度的业务统计。
本文以按天统计每小时PV/UV数创建dws_overall物化表为例,参考上一步创建dws_overall物化表。
//按天维度统计 PV/UV CREATE MATERIALIZED TABLE dws_overall( PRIMARY KEY(ds, hh) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR --1小时刷新 AS SELECT ds, COALESCE(hh, 'day') AS hh, count(*) AS pv, count(distinct user_id) AS uv FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id FROM `paimon`.`default`.`dwd_user_log_product`) tmp GROUP BY GROUPING SETS(ds, (ds, hh));
步骤三:更新物化表
开始更新
本示例的数据新鲜度为1小时,单击开始更新后,数据更新相对于基础表更新至少会滞后1小时。
在左侧导航栏选择数据血缘,搜索目标物化表。
单击对应物化表视图,在页面右下角单击开始更新。
数据回刷
数据回刷可以将历史数据重新写入相应的分区或者整张表,以修正一些流处理结果,或者对于未到调度时间的批作业,也可以进行数据回刷,立即进行数据更新写入。
选中物化表dwd_user_log_product视图,单击页面右下角手动更新,分区名称填写运行时间当天的日期,如20241216,勾选级联更新下游关联物化表,单击确认,弹框确认立即覆盖相应数据,即可马上更新。
更多数据回刷的使用详情请参见历史数据回刷。
修改数据新鲜度
您可以根据业务需要,将数据新鲜度修改为按天级、小时级、分钟级或秒级更新物化表。
依次修改物化表dwd_user_log_product和物化表dws_overall的数据新鲜度。单击对应物化表视图,单击页面右下角修改数据新鲜度,将数据新鲜度调整为分钟级,进行实时更新。
更多修改数据新鲜度的使用详情请参见修改数据新鲜度。
步骤四:查询物化表
数据预览
可以预览物化表最新的100条数据。
在左侧导航栏选择数据血缘,搜索目标物化表。
单击目标物化表视图,在页面右下角单击详情。
在物化表数据预览页签,单击查询图标。
数据查询
在左侧导航栏选择
,将如下SQL语句复制到SQL编辑器后,选中代码片段,单击运行,可以查询dws_overall物化表。SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;