全部产品
Search
文档中心

实时计算Flink版:物化表快速入门(构建流批一体湖仓)

更新时间:Dec 24, 2024

本文将基于物化表,带您快速体验如何构建流批一体的湖仓分析处理链路,以及通过修改物化表新鲜度,完成由批到流的切换,实现数据的实时更新。

物化表简介

物化表(Materialized Table)是Flink SQL引入的一种新的表类型,旨在简化批处理和流处理数据管道,提供流批一体化的开发体验。创建物化表时无需声明字段与类型,通过指定数据新鲜度和查询语句,Flink引擎将从查询语句自动推导物化表的Schema,并创建相应的数据刷新管道,以达到指定的数据新鲜度。详情请参见物化表管理

实时湖仓链路实践图

  1. Flink将数据源写入Paimon,形成ODS层。

  2. Flink将ODS层数据关联打宽进行加工,写入物化表,形成DWD层。

  3. 通过设定不同的数据新鲜度,构建多个物化表,进行多维度的业务统计,形成DWS层,以对外提供应用查询。

image

前提条件

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版

  • 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理

步骤一:准备测试数据

  1. (可选)创建Paimon Catalog。

    基于Apache Paimon提供了物化表能力,需要创建元数据存储类型为Filesystem的Paimon Catalog,如果已经创建,可以跳过此步骤。更多详情请参见创建Paimon Catalog

    创建Paimon Catalog

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏选择数据管理,单击创建Catalog。选择Apache Paimon,单击下一步

      image

      参数说明:

      配置项

      说明

      备注

      metastore

      元数据存储类型。

      本示例采用filesystem为元数据存储类型。

      catalog name

      Paimon Catalog名称。

      请填写为自定义的英文名。本示例为paimon。

      warehouse

      OSS服务中所指定的数仓目录。

      格式为oss://<bucket>/<object>。其中:

      • <bucket>:表示您创建的OSS Bucket名称。

      • <object>:表示您存放数据的路径。

      请在OSS管理控制台上查看您的bucket和object名称。

      fs.oss.endpoint

      OSS服务的连接地址。

      如果Flink与OSS位于同一地域,则使用内网Endpoint,否则使用外网Endpoint。详情请参见OSS地域和访问域名

      fs.oss.accessKeyId

      拥有读写OSS权限的阿里云账号或RAM账号的Accesskey ID。

      获取方法请参见创建AccessKey。为了防止明文密钥泄露,建议使用变量写入,详情请参见变量管理

      fs.oss.accessKeySecret

      拥有读写OSS权限的阿里云账号或RAM账号的Accesskey secret。

  2. 创建用户行为日志表ods_user_log和商品信息表ods_dim_product。

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏选择数据开发 > 数据查询

      本样例已经创建名为paimon的Paimon Catalog,使用默认数据库default。
      CREATE TABLE `paimon`.`default`.`ods_user_log` (
        item_id INT NOT NULL,
        user_id INT NOT NULL,
        vtime TIMESTAMP(6),
        ds VARCHAR(10)
      ) 
      PARTITIONED BY(ds)
      WITH (
        'bucket' = '4',            --指定分桶数为4
        'bucket-key' = 'item_id'   --指定确定数据分桶的键。相同的item_id会被放到一个桶里。
      );
      
      CREATE TABLE `paimon`.`default`.`ods_dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
      ) WITH (
        'bucket' = '4',
        'bucket-key' = 'item_id'
      );
    4. 单击右上角运行,创建相应的数据表。

    5. 在左侧导航栏选择数据管理,单击对应的Paimon Catalog后,单击刷新查看新增表。

  3. 使用模拟数据生成Faker连接器生成用户行为数据,并写入Paimon表中。

    1. 在左侧导航栏选择数据开发 > ETL

    2. 单击新建,选择空白的流作业草稿,单击下一步,单击创建

    3. 将如下SQL语句复制到SQL编辑器。

      CREATE TEMPORARY TABLE `user_log` (
        item_id INT,  //商品ID
        user_id INT,  //用户ID
        vtime TIMESTAMP,  
        ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
      ) WITH (
        'connector' = 'faker',    --faker连接器
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',    --生成0-1000其中一个随机数
        'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
        'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',           --基于当前日期时间前5小时生成数据
        'rows-per-second' = '3'   --每秒生成3条数据
       );
        
       CREATE TEMPORARY TABLE `dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
       ) WITH (
        'connector' = 'faker',    --faker连接器
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
        'fields.title.expression'='#{book.title}',
        'fields.pict_url.expression'='#{internet.domainName}',
        'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',   
        'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
        'rows-per-second' = '3'        --每秒生成3条数据
       );
      
      BEGIN STATEMENT SET; 
      
      INSERT INTO `paimon`.`default`.`ods_user_log` 
        SELECT 
        item_id,
        user_id,
        vtime,
        CAST(ds AS VARCHAR(10)) AS ds
      FROM `user_log`;
      INSERT INTO `paimon`.`default`.`ods_dim_product`
        SELECT 
        item_id,
        title,
        pict_url,
        brand_id,
        seller_id
      FROM `dim_product`;
      
      END; 
    4. 单击右上方的部署,进行作业部署。

    5. 单击左侧导航栏的运维中心 > 作业运维,单击目标作业操作列的启动,选择无状态启动后单击启动

  4. 查询模拟数据。

    在左侧导航栏选择数据开发 > 数据查询。将如下SQL语句复制到SQL编辑器后,单击右上角的运行

    SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10;
    
    SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;

    image

步骤二:创建物化表

本部分通过将源表进行打宽,构建了DWD层的物化表dwd_user_log_product,并基于该物化表进一步构建下游物化表以进行业务统计,完成了DWS层的构建。

  1. 构建数据仓库的DWD层,创建dwd_user_log_product物化表。

    1. 在左侧导航栏选择数据管理,单击目标Paimon Catalog。

    2. 单击目标数据库(本示例为default)后,单击创建物化表。将如下SQL语句复制到SQL编辑器,单击创建

      -- DWD 层打宽逻辑
      CREATE MATERIALIZED TABLE dwd_user_log_product(
          PRIMARY KEY (item_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      WITH (
        'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR      --1小时刷新
      AS SELECT
        l.ds,
        l.item_id,
        l.user_id,
        l.vtime,
        r.brand_id,
        r.seller_id
      FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r
      ON l.item_id = r.item_id;
  2. 构建数据仓库DWS层,基于dwd_user_log_product物化表进行多维度的业务统计。

    本文以按天统计每小时PV/UV数创建dws_overall物化表为例,参考上一步创建dws_overall物化表。

    //按天维度统计 PV/UV
    CREATE MATERIALIZED TABLE dws_overall(
        PRIMARY KEY(ds, hh) NOT ENFORCED
    )
    PARTITIONED BY(ds)
    WITH (
      'partition.fields.ds.date-formatter' = 'yyyyMMdd'
    )
    FRESHNESS = INTERVAL '1' HOUR   --1小时刷新
    AS SELECT 
        ds,
        COALESCE(hh, 'day') AS hh,
        count(*) AS pv,
        count(distinct user_id) AS uv
        FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id 
    FROM `paimon`.`default`.`dwd_user_log_product`) tmp
    GROUP BY GROUPING SETS(ds, (ds, hh));

步骤三:更新物化表

开始更新

本示例的数据新鲜度为1小时,单击开始更新后,数据更新相对于基础表更新至少会滞后1小时。

  1. 在左侧导航栏选择数据血缘,搜索目标物化表。

    image

  2. 单击对应物化表视图,在页面右下角单击开始更新

数据回刷

数据回刷可以将历史数据重新写入相应的分区或者整张表,以修正一些流处理结果,或者对于未到调度时间的批作业,也可以进行数据回刷,立即进行数据更新写入。

选中物化表dwd_user_log_product视图,单击页面右下角手动更新,分区名称填写运行时间当天的日期,如20241216,勾选级联更新下游关联物化表,单击确认,弹框确认立即覆盖相应数据,即可马上更新。

image

更多数据回刷的使用详情请参见历史数据回刷

修改数据新鲜度

您可以根据业务需要,将数据新鲜度修改为按天级、小时级、分钟级或秒级更新物化表。

依次修改物化表dwd_user_log_product和物化表dws_overall的数据新鲜度。单击对应物化表视图,单击页面右下角修改数据新鲜度,将数据新鲜度调整为分钟级,进行实时更新。

image

更多修改数据新鲜度的使用详情请参见修改数据新鲜度

步骤四:查询物化表

数据预览

可以预览物化表最新的100条数据。

  1. 在左侧导航栏选择数据血缘,搜索目标物化表。

  2. 单击目标物化表视图,在页面右下角单击详情

  3. 在物化表数据预览页签,单击查询图标。

    image

数据查询

在左侧导航栏选择数据开发 > 数据查询,将如下SQL语句复制到SQL编辑器后,选中代码片段,单击运行,可以查询dws_overall物化表。

SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

image

相关文档