全部產品
Search
文件中心

Realtime Compute for Apache Flink:基於Flink+Hologres搭建即時數倉

更新時間:Jul 27, 2024

使用Flink+Hologres搭建即時數倉可以充分利用Flink強大的即時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴充的即時資料處理和分析,協助您更好地應對不斷增長的資料量和即時業務需求。本文介紹如何通過Realtime ComputeFlink版和即時數倉Hologres搭建即時數倉。

背景資訊

隨著社會數字化發展,企業對資料時效性的需求越來越強烈。除傳統的面向海量資料加工情境設計的離線情境外,大量業務需要解決面向即時加工、即時儲存、即時分析的即時情境問題。傳統離線數倉搭建的方法論比較明確,通過定時調度實現數倉分層(ODS->DWD->DWS->ADS);但對於即時數倉的搭建,目前缺乏明確的方法體系。基於Streaming Warehouse理念,實現數倉分層之間即時資料的高效流動,可以解決即時數倉分層問題。

方案架構

Realtime ComputeFlink版是強大的流式計算引擎,支援對海量即時資料高效處理。Hologres是一站式即時數倉,支援資料即時寫入與更新,即時資料寫入即可查。Hologres與Flink深度整合,能夠提供一體化的即時數倉聯合解決方案。本文基於Flink+Hologres搭建即時數倉的方案架構如下:

  1. Flink將資料來源寫入Hologres,形成ODS層。

  2. Flink訂閱ODS層的Binlog進行加工,形成DWD層再次寫入Hologres。

  3. Flink訂閱DWD層的Binlog,通過計算形成DWS層,再次寫入Hologres。

  4. 最後由Hologres對外提供應用查詢。

image.png

該方案有如下優勢:

  • Hologres的每一層資料都支援高效更新與修正、寫入即可查,解決了傳統即時數倉解決方案的中介層資料不易查、不易更新、不易修正的問題。

  • Hologres的每一層資料都可單獨對外提供服務,資料的高效複用,真正實現數倉分層複用的目標。

  • 模型統一,架構簡化。即時ETL鏈路的邏輯是基於Flink SQL實現的;ODS層、DWD層和DWS層的資料統一儲存在Hologres中,可以降低架構複雜度,提高資料處理效率。

該方案依賴於Hologres的3個核心能力,詳情如下表所示。

Hologres核心能力

詳情

Binlog

Hologres提供Binlog能力,用於驅動Flink進行Realtime Compute,以此作為流式計算的上遊。Hologres的Binlog能力詳情請參見訂閱Hologres Binlog

行列共存

Hologres支援行列共存的儲存格式。一張表同時儲存行存資料和列存資料,並且兩份資料強一致。該特性保證中介層表不僅可以作為Flink的源表,也可以作為Flink的維表進行主鍵點查與維表Join,還可以供其他應用(OLAP、線上服務等)查詢。Hologres的行列共存能力詳情請參見表格儲存體格式:列存、行存、行列共存

資源強隔離

Hologres執行個體的負載較高時,可能影響中介層的點查效能。Hologres支援通過主從執行個體讀寫分離部署(共用儲存)實現資源強隔離,從而保證Flink對Hologres Binlog的資料拉取不影響線上服務。

實踐情境

本文以某個電商平台為例,通過搭建一套即時數倉,實現資料的即時加工清洗和對接上層應用資料查詢,形成即時資料的分層和複用,支撐各個業務方的報表查詢(交易大屏、行為資料分析、使用者畫像標籤)以及個人化推薦等多個業務情境。

image.png

  1. 構建ODS層:業務資料庫即時入倉

    MySQL有orders(訂單表),orders_pay(訂單支付表),product_catalog(商品類別字典表)3張業務表,這3張表通過Flink即時同步到Hologres中作為ODS層。

  2. 構建DWD層:即時主題寬表

    將訂單表、商品類別字典表、訂單支付表進行即時打寬,產生DWD層寬表。

  3. 構建DWS層:即時指標計算

    即時消費寬表的binlog,事件驅動地彙總出相應的DWS層指標表。

前提條件

  • Realtime ComputeFlink版、RDS MySQL和Hologres需要在同一VPC。如果不在同一VPC,需要先打通跨VPC的網路或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?Flink全託管如何訪問公網?

  • 通過RAM使用者或RAM角色等身份訪問Hologres和RDS MySQL資源時,需要其具備對應資源的許可權。

  • 已開通Realtime ComputeFlink版,詳情請參見開通Realtime ComputeFlink版

  • 已購買獨享通用型Hologres執行個體,詳情請參見購買Hologres

    購買執行個體後,需要建立order_dw資料庫和使用者(為使用者賦予admin許可權),推薦使用簡單許可權模型建立資料庫,詳情請參見簡單許可權模型的使用DB管理

    如果在被授權帳號的下拉式清單找不到對應的帳號,則說明該帳號並未添加至當前執行個體,您需要前往使用者管理頁面添加使用者為SuperUser。

    說明
    • Hologres1.3版本在建立完資料庫後,需要執行create extension hg_binlog命令才能開啟binlog擴充。

    • Hologres2.0之後版本預設開啟binlog擴充,無需手動執行。

  • 已建立RDS MySQL執行個體,並準備MySQL CDC資料來源(order_dw資料庫中的三張業務表的建表DDL以及插入的資料如下),詳情請參見建立RDS MySQL執行個體管理資料庫

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- 準備資料
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

使用限制

  • 僅Realtime Compute引擎VVR 6.0.7及以上版本支援該即時數倉方案。

  • 僅1.3及以上版本的Hologres支援該即時數倉方案。

構建即時數倉

建立Catalog

  1. 建立Hologres Catalog。

    Realtime Compute控制台SQL開發頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼,並修改目標參數取值,選中目標片段後單擊左側程式碼上的運行

    CREATE CATALOG dw WITH (
      'type' = 'hologres',
      'endpoint' = '<ENDPOINT>', 
      'username' = '<USERNAME>',
      'password' = '${secret_values.ak_holo}',
      'dbname' = 'order_dw',
      'binlog' = 'true', -- 建立catalog時可以設定源表、維表和結果表支援的with參數,之後在使用此catalog下的表時會預設添加這些預設參數。
      'sdkMode' = 'jdbc', -- 推薦使用jdbc模式。
      'cdcmode' = 'true',
      'connectionpoolname' = 'the_conn_pool',
      'ignoredelete' = 'true',  -- 寬表merge需要開啟,防止回撤。
      'partial-insert.enabled' = 'true', -- 寬表merge需要開啟此參數,實現部分列更新。
      'mutateType' = 'insertOrUpdate', -- 寬表merge需要開啟此參數,實現部分列更新。
      'table_property.binlog.level' = 'replica', --也可以在建立catalog時傳入持久化的hologres表屬性,之後建立表時,預設都開啟binlog。
      'table_property.binlog.ttl' = '259200'
    );
    

    您需要修改以下參數取值為您實際Hologres服務資訊。

    參數

    說明

    備忘

    endpoint

    Hologres的Endpoint地址。

    詳情請參見執行個體配置

    username

    阿里雲帳號的AccessKey ID。

    當前配置的AccessKey對應的使用者需要能夠訪問所有的Hologres資料庫,Hologres資料庫許可權請參見Hologres許可權模型概述

    為了避免您的AK資訊泄露,本樣本通過使用名為ak_holo密鑰的方式填寫AccessKey Secret取值,詳情請參見變數和密鑰管理

    password

    阿里雲帳號的AccessKey Secret。

    說明

    建立Catalog時可以設定預設的源表、維表和結果表的WITH參數,也可以設定建立Hologres物理表的預設屬性,例如上方table_property開頭的參數。詳情請參見管理Hologres Catalog即時數倉Hologres WITH參數

  2. 建立MySQL Catalog。

    將如下代碼拷貝到查詢指令碼,並修改目標參數取值,選中目標片段後單擊左側程式碼上的運行

    CREATE CATALOG mysqlcatalog WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '${secret_values.mysql_pw}',
      'default-database' = 'order_dw'
    );

    您需要修改以下參數取值為您實際的MySQL服務資訊。

    參數

    說明

    hostname

    MySQL資料庫的IP地址或者Hostname。

    port

    MySQL資料庫服務的連接埠號碼,預設值為3306。

    username

    MySQL資料庫服務的使用者名稱。

    password

    MySQL資料庫服務的密碼。

    本樣本通過使用名為mysql_pw密鑰的方式填寫密碼取值,避免資訊泄露,詳情請參見變數和密鑰管理

構建ODS層:業務資料庫即時入倉

基於Catalog的CREATE DATABASE AS(CDAS)語句功能,可以一次性把ODS層建出來。ODS層一般不直接做OLAP或SERVING(KV點查),主要作為流式作業的事件驅動,開啟binlog即可滿足需求。Binlog是Hologres的核心能力之一,Hologres連接器也支援先全量讀取再增量消費Binlog的全增量模式。

  1. 建立CDAS同步作業ODS。

    1. Realtime Compute控制台上,建立名為ODS的SQL流作業,並將如下代碼拷貝到SQL編輯器。

      CREATE DATABASE IF NOT EXISTS dw.order_dw   -- 建立catalog時設定了table_property.binlog.level參數,因此通過CDAS建立的所有表都開啟了binlog。
      AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根據需要選擇上遊資料庫需要入倉的表。
      /*+ OPTIONS('server-id'='8001-8004') */ ;   -- 指定mysql-cdc執行個體server-id範圍。
      說明

      本樣本預設將資料同步到資料庫order_dw的Public Schema下。您也可以將資料同步到Hologres目標庫的指定Schema中,詳情請參見作為CDAS的目標端Catalog,指定後使用Catalog時的表名格式也會發生變化,詳情請參見使用Hologres Catalog

    2. 單擊右上方的部署,進行作業部署。

    3. 單擊左側導覽列的作業營運,單擊剛剛部署的ODS作業操作列的啟動,選擇無狀態啟動後單擊啟動

  2. 查看MySQL同步到Hologres的3張表資料。

    HoloWeb開發頁面串連Hologres執行個體並登入目標資料庫後,在SQL編輯器上執行如下命令。

    ---查orders中的資料。
    SELECT * FROM orders;
    
    ---查orders_pay中的資料。
    SELECT * FROM orders_pay;
    
    ---查product_catalog中的資料。
    SELECT * FROM product_catalog;

    image.png

構建DWD層:即時主題寬表

構建DWD層用到了Hologres連接器特有的部分列更新能力,可以使用INSERT DML方便地表達部分列更新的語義。作業中需要對不同的維表進行查詢,是基於Hologres行存以及行列共存表提供的高效能的點查能力。同時,Hologres資源強隔離的架構,可以保證寫入、讀取、分析等作業之間互不干擾。

  1. 通過Flink Catalog功能在Hologres中建DWD層的寬表dwd_orders。

    Realtime Compute控制台SQL開發頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    -- 寬表欄位要nullable,因為不同的流寫入到同一張結果表,每一列都可能出現null的情況。
    CREATE TABLE dw.order_dw.dwd_orders (
      order_id bigint not null,
      order_user_id string,
      order_shop_id bigint,
      order_product_id bigint,
      order_product_catalog_name string,
      order_fee numeric(20,2),
      order_create_time timestamp,
      order_update_time timestamp,
      order_state int,
      pay_id bigint,
      pay_platform int comment 'platform 0: phone, 1: pc', 
      pay_create_time timestamp,
      PRIMARY KEY(order_id) NOT ENFORCED
    );
    
    -- 支援通過catalog修改Hologres物理表屬性。
    ALTER TABLE dw.order_dw.dwd_orders SET (
      'table_property.binlog.ttl' = '604800' --修改binlog的逾時時間為一周。
    );
  2. 實現即時消費ODS層orders、orders_pay表的binlog。

    Realtime Compute控制台上,建立名為DWD的SQL流作業,並將如下代碼拷貝到SQL編輯器後,部署啟動作業。通過如下SQL作業,orders表會與product_catalog表進行維表關聯,將最終結果寫入dwd_orders表中,實現資料的即時打寬。

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dwd_orders 
     (
       order_id,
       order_user_id,
       order_shop_id,
       order_product_id,
       order_fee,
       order_create_time,
       order_update_time,
       order_state,
       order_product_catalog_name
     ) SELECT o.*, dim.catalog_name 
       FROM dw.order_dw.orders as o
       LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
       ON o.product_id = dim.product_id;
    
    INSERT INTO dw.order_dw.dwd_orders 
      (pay_id, order_id, pay_platform, pay_create_time)
       SELECT * FROM dw.order_dw.orders_pay;
    
    END;
  3. 查看寬表dwd_orders資料。

    HoloWeb開發頁面串連Hologres執行個體並登入目標資料庫後,在SQL編輯器上執行如下命令。

    SELECT * FROM dwd_orders;

    image.png

構建DWS層:即時指標計算

  1. 通過Flink Catalog功能,在Hologres中建立dws層的彙總dws_users以及dws_shops。

    Realtime Compute控制台SQL開發頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    -- 使用者維度彙總指標表。
    CREATE TABLE dw.order_dw.dws_users (
      user_id string not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment '當日完成支付的總金額',
      primary key(user_id,ds) NOT ENFORCED
    );
    
    -- 商戶維度彙總指標表。
    CREATE TABLE dw.order_dw.dws_shops (
      shop_id bigint not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment '當日完成支付總金額',
      primary key(shop_id,ds) NOT ENFORCED
    );
  2. 即時消費DWD層的寬表dw.order_dw.dwd_orders,在Flink中做彙總計算,最終寫入Hologres中的DWS表。

    Realtime Compute控制台上,建立名為DWS的SQL流作業,並將如下代碼拷貝到SQL編輯器後,部署啟動作業。

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
      SELECT 
        order_user_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
        FROM dw.order_dw.dwd_orders c
        WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流資料都已寫入寬表。
        GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    
    INSERT INTO dw.order_dw.dws_shops
      SELECT 
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
       FROM dw.order_dw.dwd_orders c
       WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流資料都已寫入寬表。
       GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    END;
  3. 查看DWS層的彙總結果,其結果會根據上遊資料的變更即時更新。

    HoloWeb開發頁面串連Hologres執行個體並登入目標資料庫後,在SQL編輯器上執行如下命令。

    • 查詢dws_users表結果。

      SELECT * FROM dws_users;

      image.png

    • 查詢dws_shops表結果。

      SELECT * FROM dws_shops;

      image.png

資料探查

因為開啟了Binlog,所以可直接探查到資料的變化情況。如果對中間結果需要即席(Ad-hoc)性質的業務資料探查,或者對最終計算結果進行資料正確性排查,此方案的每一層資料都實現了持久化,可以便捷地探查中間過程。

  • 流模式探查

    1. 建立並啟動資料探查流作業。

      Realtime Compute控制台上,建立名為Data-exploration的SQL流作業,並將如下代碼拷貝到SQL編輯器後,部署啟動作業。

      -- 流模式探查,列印到print可以看到資料的變化情況。
      CREATE TEMPORARY TABLE print_sink(
        order_id bigint not null,
        order_user_id string,
        order_shop_id bigint,
        order_product_id bigint,
        order_product_catalog_name string,
        order_fee numeric(20,2),
        order_create_time timestamp,
        order_update_time timestamp,
        order_state int,
        pay_id bigint,
        pay_platform int,
        pay_create_time timestamp,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO print_sink SELECT *
      FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --這裡的startTime是binlog產生的時間
      WHERE order_user_id = 'user_001';
    2. 查看資料探查結果。

      作業營運詳情頁面,單擊目標作業名稱,在作業探查頁簽下左側作業記錄頁簽,單擊運行Task Managers頁簽下的Path, ID。在Stdout頁面搜尋user_001相關的日誌資訊。

      image.png

  • 批模式探查

    Realtime Compute控制台上,建立SQL流作業,並將如下代碼拷貝到SQL編輯器後,單擊調試。詳情請參見作業調試

    批模式探查是擷取當前時刻的終態資料,在Flink作業開發介面調試結果如下圖所示。

    SELECT *
    FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ 
    WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; --批量模式支援filter下推,提升批作業執行效率。

    image.png

使用即時數倉

上一小節展示了通過Flink Catalog,可以僅在Flink側搭建一個基於Flink和Hologres的Streaming Warehouse即時分層數倉。本節則展示數倉搭建完成之後的一些簡單應用情境。

Key-Value服務

根據主鍵查詢DWS層的彙總指標表,支援百萬級RPS。

HoloWeb開發頁面查詢指定使用者指定日期的消費額的程式碼範例如下。

-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';

image.png

明細查詢

對DWD層寬表進行OLAP分析。

HoloWeb開發頁面查詢某個客戶23年2月特定支付平台支付的訂單明細的程式碼範例如下。

-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;

image.png

即時報表

基於DWD層寬表資料展示即時報表,Hologres的行列共存以及列存表有非常優秀的OLAP分析能力,支援秒級響應。

HoloWeb開發頁面查詢23年2月內每個品類的訂單總量和訂單總金額的程式碼範例如下。

-- holo sql
SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image.png

相關文檔