全部產品
Search
文件中心

Realtime Compute for Apache Flink:基於Flink+Paimon搭建流式湖倉

更新時間:Sep 13, 2024

本文為您介紹如何通過Realtime ComputeFlink版和流式資料湖倉Paimon搭建流式湖倉。

背景資訊

隨著社會數字化發展,企業對資料時效性的需求越來越強烈。傳統的離線數倉搭建方法論比較明確,通過定時調度離線作業的方式,將上一時段產生的新鮮變更併入分層的數倉中(ODS->DWD->DWS->ADS),但是存在延時間長度和成本高兩大問題。離線作業的調度通常每小時甚至每天才進行一次,資料的消費者僅能看到上一小時甚至昨天的資料。同時,資料的更新多以覆寫(overwrite)分區的方式進行,需要重新讀取分區中原有的資料,才能與新鮮變更合并,產生新的結果資料。

基於Realtime ComputeFlink版和流式資料湖倉Paimon搭建流式湖倉可以解決上述傳統離線數倉的問題。利用Flink的Realtime Compute能力,資料可以在數倉分層之間即時資料流動。同時,利用Paimon高效的更新能力,資料變更可以在分鐘級的延時內傳遞給下遊消費者。因此,流式湖倉在延時和成本上具有雙重優勢。

關於流式資料湖倉Paimon的更多特性,請參見特色功能Apache Paimon官方網站

方案架構和優勢

架構

Realtime ComputeFlink版是強大的流式計算引擎,支援對海量即時資料高效處理。流式資料湖倉Paimon是流批統一的湖儲存格式,支援高吞吐的更新和低延後查詢。Paimon與Flink深度整合,能夠提供一體化的流式湖倉聯合解決方案。本文基於Flink+Paimon搭建流式湖倉的方案架構如下:

  1. Flink將資料來源寫入Paimon,形成ODS層。

  2. Flink訂閱ODS層的變更資料(Changelog)進行加工,形成DWD層再次寫入Paimon。

  3. Flink訂閱DWD層的Changelog進行加工,形成DWS層再次寫入Paimon。

  4. 最後由雲原生MaxCompute或開源巨量資料平台E-MapReduce讀取Paimon外部表格,對外提供應用查詢。

image.png

優勢

該方案有如下優勢:

  • Paimon的每一層資料都可以在分鐘級的延時內將變更傳遞給下遊,將傳統離線數倉的延時從小時級甚至天級降低至分鐘級。

  • Paimon的每一層資料都可以直接接受變更資料,無需覆寫分區,極大地降低了傳統離線數倉資料更新與訂正的成本,解決了中介層資料不易查、不易更新、不易修正的問題。

  • 模型統一,架構簡化。ETL鏈路的邏輯是基於Flink SQL實現的;ODS層、DWD層和DWS層的資料統一儲存在Paimon中,可以降低架構複雜度,提高資料處理效率。

該方案依賴於Paimon的三個核心能力,詳情如下表所示。

Paimon核心能力

詳情

主鍵表更新

Paimon底層使用LSM Tree資料結構,可以實現高效的資料更新。

關於Paimon主鍵表、Paimon底層資料結構的介紹請參見Primary Key TableFile Layouts

增量資料產生機制(Changelog Producer)

Paimon可以為任意輸入資料流產生完整的增量資料(所有的update_after資料都有對應的update_before資料),保證資料變更可以完整地傳遞給下遊。詳情請參見增量資料產生機制

資料合併機制(Merge Engine)

當Paimon主鍵表收到多條具有相同主鍵的資料時,為了保持主鍵的唯一性,Paimon結果表會將這些資料合併成一條資料。Paimon支援去重、部分更新、預彙總等豐富多樣的資料合併行為,詳情請參見資料合併機制

實踐情境

本文以某個電商平台為例,通過搭建一套流式湖倉,實現資料的加工清洗,並支援上層應用對資料的查詢。流式湖倉實現了資料的分層和複用,並支撐各個業務方的報表查詢(交易大屏、行為資料分析、使用者畫像標籤)以及個人化推薦等多個業務情境。

image.png

  1. 構建ODS層:業務資料庫即時入倉 ​MySQL有orders(訂單表),orders_pay(訂單支付表)和product_catalog(商品類別字典表)三張業務表,這三張表通過Flink即時寫入OSS,並以Paimon格式進行儲存,作為ODS層。

  2. 構建DWD層:主題寬表 ​將訂單表、商品類別字典表、訂單支付表利用Paimon的部分更新(partial-update)合并機制進行打寬,以分鐘級延時產生DWD層寬表併產出變更資料(Changelog)。

  3. 構建DWS層:指標計算 ​Flink即時消費寬表的變更資料,利用Paimon的預彙總(aggregation)合并機制產出DWM層dwm_users_shops(使用者-商戶彙總中間表),並最終產出DWS層dws_users(使用者彙總指標表)以及dws_shops(商戶彙總指標表)。

前提條件

說明

MaxCompute專案、OSS Bucket需要與Flink工作空間處於相同地區。

使用限制

僅Realtime Compute引擎VVR 8.0.1及以上版本支援該流式湖倉方案。

構建流式湖倉

準備MySQL CDC資料來源

本文以雲資料庫RDS MySQL版為例,建立資料庫名稱為order_dw,並建立三張業務表及資料。

  1. 快速建立RDS MySQL執行個體

    重要

    RDS MySQL版執行個體需要與Flink工作空間處於同一VPC。不在同一VPC下時請參見網路連通性

  2. 建立資料庫和帳號

    建立名稱為order_dw的資料庫,並建立高許可權帳號或具有資料庫order_dw讀寫權限的普通帳號。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee bigint not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- 準備資料
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

管理中繼資料

  1. 建立Paimon Catalog。

    1. 進入中繼資料管理頁面。

      1. 登入Realtime Compute控制台

      2. 單擊目標工作空間操作列下的控制台

      3. 單擊左側的中繼資料管理

    2. 建立自訂Catalog類型。

      1. 單擊建立Catalog,選擇自訂Catalog頁簽,單擊建立自訂Catalog類型

      2. 建立自訂Catalog類型視窗中,上傳Paimon自訂catalog外掛程式(paimon-ali-vvr-8.0-vvp-0.6-ali-1-SNAPSHOT.jar)。

        若出現檔案尺寸過大的提示資訊,單擊繼續即可。

      3. 單擊下一步(需等待一定時間),單擊確定

    3. 自訂Catalog頁簽,單擊paimon-06-1後,單擊下一步

    4. 建立Catalog視窗中,輸入以下SQL語句並填入配置項後,單擊確定,建立名為dw的Paimon Catalog。

      CREATE CATALOG `dw` WITH (
          'type' = 'paimon-06-1',
          'metastore' = 'maxcompute',
          'warehouse' = '<warehouse>',
          'fs.oss.endpoint' = '<oss endpoint>',
          'fs.oss.accessKeyId' = '<oss access key id>',
          'fs.oss.accessKeySecret' = '<access key secret>',
          'maxcompute.endpoint' = '<maxcompute endpoint>',
          'maxcompute.accessid' = '<maxcompute access id>',
          'maxcompute.accesskey' = '<access key secret>',
          'maxcompute.project' = '<maxcompute project>',
          'maxcompute.oss.endpoint' = '<maxcompute oss endpoint>'
      );

      WITH參數說明

      配置項

      說明

      是否必填

      備忘

      type

      Catalog類型。

      固定值為paimon-06-1

      metastore

      中繼資料存放區類型。

      為了後續能在MaxCompute中分析Paimon表的資料,此處填寫maxcompute

      warehouse

      OSS服務中所指定的數倉目錄。

      格式為oss://<bucket>/<object>。其中:

      • bucket:表示您建立的OSS Bucket名稱。

      • object:表示您存放資料的路徑。

      您可以在OSS管理主控台上查看您的bucket和object名稱。

      重要
      • OSS Bucket需要與MaxCompute專案處於同一地區。

      • OSS Bucket的儲存類型需要為標準儲存,詳情請參見儲存類型概述

      fs.oss.endpoint

      從Flink訪問OSS服務的訪問網域名稱。

      參見訪問網域名稱和資料中心

      fs.oss.accessKeyId

      擁有讀寫OSS許可權的阿里雲帳號的AccessKey ID。

      擷取方法請參見查看RAM使用者的AccessKey資訊

      fs.oss.accessKeySecret

      擁有讀寫OSS許可權的阿里雲帳號的AccessKey Secret。

      擷取方法請參見查看RAM使用者的AccessKey資訊

      maxcompute.endpoint

      MaxCompute服務的訪問網域名稱。

      參見Endpoint

      maxcompute.accessid

      擁有MaxCompute許可權的阿里雲帳號的AccessKey。

      擷取方法請參見查看RAM使用者的AccessKey資訊

      maxcompute.accesskey

      擁有MaxCompute許可權的阿里雲帳號的AccessKey Secret。

      擷取方法請參見查看RAM使用者的AccessKey資訊

      maxcompute.project

      需要操作的MaxCompute專案名稱。

      當前暫時不支援開啟了Schema操作的MaxCompute專案。

      maxcompute.oss.endpoint

      從MaxCompute訪問OSS服務的訪問網域名稱。

      若不填寫,則預設使用fs.oss.endpoint。參見訪問網域名稱和資料中心

      重要

      由於OSS Bucket與MaxCompute專案處於同一地區,此處應填寫內網Endpoint。

  2. 建立MySQL Catalog。

    1. 中繼資料管理頁面,單擊建立Catalog

    2. 內建Catalog頁簽,單擊MySQL,單擊下一步

    3. 填寫以下參數,單擊確定,建立名為mysqlcatalog的MySQL Catalog。

      配置項

      說明

      是否必填

      備忘

      catalog name

      Catalog名稱。

      填寫為自訂的英文名。

      本文以mysqlcatalog為例。

      hostname

      MySQL資料庫的IP地址或者Hostname。

      詳情請參見查看和管理執行個體串連地址和連接埠。由於RDS MySQL版執行個體和Flink全託管處於相同VPC,此處應填寫內網地址。

      port

      MySQL資料庫服務的連接埠號碼,預設值為3306。

      詳情請參見查看和管理執行個體串連地址和連接埠

      default-database

      預設的MySQL資料庫名稱。

      本文填寫需要同步的資料庫名order_dw。

      username

      MySQL資料庫服務的使用者名稱。

      本文為準備MySQL CDC資料來源中建立的帳號。

      password

      MySQL資料庫服務的密碼。

      本文為準備MySQL CDC資料來源中建立的密碼。

構建ODS層:業務資料庫即時入倉

基於CREATE DATABASE AS(CDAS)語句功能,可以一次性將ODS層建出來。SQL作業中通過SET語句指定的配置項也可以在作業營運頁面的作業部署詳情頁簽指定,詳見如何配置作業運行參數?。Paimon寫入效能最佳化請參見Apache Paimon官方文檔

  1. 建立CDAS同步作業。

    1. 資料開發 > ETL頁面,建立名為ods的SQL流作業,將如下代碼複製到SQL編輯器。

      SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; -- 減輕檢查點長尾的影響。
      SET 'table.exec.sink.upsert-materialize' = 'NONE'; -- 消除無用的SinkMaterialize運算元。
      
      -- Paimon結果表在每次檢查點完成之後才會正式提交資料。
      -- 此處將檢查點間隔縮短為10s,是為了更快地看到結果。
      -- 在生產環境下,系統檢查點的間隔與兩次系統檢查點之間的最短時間間隔根據業務對延時要求的不同,一般設定為1分鐘到10分鐘。
      SET 'execution.checkpointing.interval' = '10s';
      SET 'execution.checkpointing.min-pause' = '10s';
      
      CREATE DATABASE IF NOT EXISTS dw.order_dw
      WITH (
        'changelog-producer' = 'input' -- 因為輸入資料就是MySQL產生的binlog,已經是完整的變更資料,所以可以直接把輸入資料作為變更資料。
      ) AS DATABASE mysqlcatalog.order_dw INCLUDING all tables; -- 也可以根據需要選擇上遊資料庫需要入倉的表。
    2. 單擊右上方的部署,進行作業部署。

    3. 作業營運頁面,單擊剛剛部署的ods作業操作列的啟動,選擇無狀態啟動啟動作業。

  2. 查看MySQL同步到Paimon的三張表的資料。

    資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行。​

    SELECT * FROM dw.order_dw.orders ORDER BY order_id;

    image.png

構建DWD層:主題寬表

  1. 建立DWD層Paimon寬表dwd_orders ​在資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    CREATE TABLE dw.order_dw.dwd_orders (
        order_id BIGINT,
        order_user_id STRING,
        order_shop_id BIGINT,
        order_product_id BIGINT,
        order_product_catalog_name STRING,
        order_fee BIGINT,
        order_create_time TIMESTAMP,
        order_update_time TIMESTAMP,
        order_state INT,
        pay_id BIGINT,
        pay_platform INT COMMENT 'platform 0: phone, 1: pc',
        pay_create_time TIMESTAMP,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'partial-update', -- 使用部分更新資料合併機制產生寬表
        'changelog-producer' = 'lookup' -- 使用lookup增量資料產生機制以低延時產出變更資料
    );

    返回Query has been executed表示建立成功。

  2. 即時消費ODS層orders、orders_pay表的變更資料。

    資料開發 > ETL頁面,建立名為dwd的SQL流作業,並將如下代碼複製到SQL編輯器後,部署作業並無狀態啟動作業。​

    通過該SQL作業,orders表會與product_catalog表進行維表關聯,關聯後的結果將與orders_pay一起寫入dwd_orders表中,利用Paimon表的部分更新資料合併機制,將orders表和orders_pay表中order_id相同的資料進行打寬。

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Paimon目前暫不支援在同一個作業裡通過多條INSERT語句寫入同一張表,因此這裡使用UNION ALL。
    INSERT INTO dw.order_dw.dwd_orders 
    SELECT 
        o.order_id,
        o.user_id,
        o.shop_id,
        o.product_id,
        dim.catalog_name,
        o.buy_fee,
        o.create_time,
        o.update_time,
        o.state,
        NULL,
        NULL,
        NULL
    FROM
        dw.order_dw.orders o 
        LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
        ON o.product_id = dim.product_id
    UNION ALL
    SELECT
        order_id,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        pay_id,
        pay_platform,
        create_time
    FROM
        dw.order_dw.orders_pay;
  3. 查看寬表dwd_orders的資料。 ​在資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    SELECT * FROM dw.order_dw.dwd_orders ORDER BY order_id;

    Image 22.png

構建DWS層:指標計算

  1. 建立DWS層的彙總表dws_users以及dws_shops。 在資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    -- 使用者維度彙總指標表。
    CREATE TABLE dw.order_dw.dws_users (
        user_id STRING,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT '當日完成支付的總金額',
        PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- 使用預彙總資料合併機制產生彙總表
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- 對 payed_buy_fee_sum 的資料求和產生彙總結果
        -- 由於dws_users表不再被下遊流式消費,因此無需指定增量資料產生機制
    );
    
    -- 商戶維度彙總指標表。
    CREATE TABLE dw.order_dw.dws_shops (
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT '當日完成支付總金額',
        uv BIGINT COMMENT '當日不同購買使用者總人數',
        pv BIGINT COMMENT '當日購買使用者總人次',
        PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- 使用預彙總資料合併機制產生彙總表
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 對 payed_buy_fee_sum 的資料求和產生彙總結果
        'fields.uv.aggregate-function' = 'sum', -- 對 uv 的資料求和產生彙總結果
        'fields.pv.aggregate-function' = 'sum' -- 對 pv 的資料求和產生彙總結果
        -- 由於dws_shops表不再被下遊流式消費,因此無需指定增量資料產生機制
    );
    
    -- 為了同時計算使用者視角的彙總表以及商戶視角的彙總表,另外建立一個以使用者 + 商戶為主鍵的中間表。
    CREATE TABLE dw.order_dw.dwm_users_shops (
        user_id STRING,
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT '當日使用者在商戶完成支付的總金額',
        pv BIGINT COMMENT '當日使用者在商戶購買的次數',
        PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- 使用預彙總資料合併機制產生彙總表
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- 對 payed_buy_fee_sum 的資料求和產生彙總結果
        'fields.pv.aggregate-function' = 'sum', -- 對 pv 的資料求和產生彙總結果
        'changelog-producer' = 'lookup', -- 使用lookup增量資料產生機制以低延時產出變更資料
        -- dwm層的中間表一般不直接提供上層應用查詢,因此可以針對寫入效能進行最佳化。
        'file.format' = 'avro', -- 使用avro行存格式的寫入效能更加高效。
        'metadata.stats-mode' = 'none' -- 放棄統計資訊會增加OLAP查詢代價(對持續的流處理無影響),但會讓寫入效能更加高效。
    );

    返回Query has been executed表示建立成功。

  2. 即時消費DWD層dwd_orders表的變更資料。 在資料開發 > ETL頁面的作業草稿頁簽,建立名為dwm的SQL流作業,並將如下代碼複製到SQL編輯器後,部署作業並無狀態啟動作業。

    通過該SQL作業,dwd_orders表的資料會寫入dwm_users_shops表中,利用Paimon表的預彙總資料合併機制,自動對order_fee求和,算出使用者在商戶的消費總額。同時,自動對1求和,也能算出使用者在商戶的消費次數。​​​

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    INSERT INTO dw.order_dw.dwm_users_shops
    SELECT
        order_user_id,
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        order_fee,
        1 -- 一條輸入記錄代表一次消費
    FROM dw.order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
  3. 即時消費DWM層dwm_users_shops表的變更資料。 在資料開發 > ETL頁面,建立名為dws的SQL流作業,並將如下代碼複製到SQL編輯器後,部署作業並無狀態啟動作業。

    通過該SQL作業,dwm_users_shops表的資料會寫入dws_users表中,利用Paimon表的預彙總資料合併機制,自動對payed_buy_fee_sum求和,算出使用者在所有商戶的消費總額。另外,該SQL作業也會將dwm_users_shops表的資料寫入dws_shops表中,利用Paimon表的預彙總資料合併機制,自動對payed_buy_fee_sum求和,算出商戶的總流水。另外,自動對1求和,也能算出有幾個不同的使用者在該商戶消費過。最後,自動對pv求和,算出在該商戶消費的總人次。​​

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- 與dwd不同,此處每一條INSERT語句寫入的是不同的Paimon表,可以放在同一個作業中。
    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
    SELECT 
        user_id,
        ds,
        payed_buy_fee_sum
    FROM dw.order_dw.dwm_users_shops;
    
    -- 以商戶為主鍵,部分熱門商戶的資料量可能遠高於其他商戶。
    -- 因此使用local merge在寫入Paimon之前先在記憶體中進行預彙總,緩解資料扭曲問題。
    INSERT INTO dw.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
        shop_id,
        ds,
        payed_buy_fee_sum,
        1, -- 一條輸入記錄代表一名使用者在該商戶的所有消費
        pv
    FROM dw.order_dw.dwm_users_shops;
    
    END;
  4. 查看dws_users表和dws_shops表的資料。 ​在資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    --查看dws_users表資料
    SELECT * FROM dw.order_dw.dws_users ORDER BY user_id;
    
    --查看dws_shops表資料
    SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;

    image.png

捕捉業務資料庫的變化

前面已完成了流式湖倉的構建,下面將測試流式湖倉捕捉業務資料庫變化的能力。

  1. 向MySQL的order_dw資料庫中插入如下資料。​

    INSERT INTO orders VALUES
    (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
    (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
    (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2008, 100008, 1, '2023-02-15 18:40:56'),
    (2009, 100009, 1, '2023-02-15 19:40:56'),
    (2010, 100010, 0, '2023-02-15 20:40:56');
  2. 查看dws_users表和dws_shops表的資料。 ​在資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    • dws_users表

      SELECT * FROM dw.order_dw.dws_users ORDER BY user_id;

      image.png

    • dws_shops表

      SELECT * FROM dw.order_dw.dws_shops ORDER BY shop_id;

      image.png

使用流式湖倉

上一小節展示了在Flink中進行Paimon Catalog的建立與Paimon表的寫入。由於Catalog中指定了中繼資料存放區為MaxCompute,因此在Catalog中建立表時,MaxCompute中將會自動建立Paimon外表。本節展示流式湖倉搭建完成後,利用MaxCompute進行資料分析的一些簡單應用情境。關於MaxCompute如何查詢Paimon外表,詳見Paimon外部表格

排名查詢

對DWS層彙總表進行分析。本文使用DataWorks查詢23年2月15日交易額前三高的商戶的程式碼範例如下。

SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;

SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;

image.png

明細查詢

對DWD層寬表進行分析。本文使用DataWorks查詢某個客戶23年2月特定支付平台支付的訂單明細的程式碼範例如下。

SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;

image.png

資料報表

對DWD層寬表進行分析。本文使用DataWorks查詢23年2月內每個品類的訂單總量和訂單總金額的程式碼範例如下。

SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;

SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image.png

相關文檔