全部產品
Search
文件中心

Realtime Compute for Apache Flink:Flink批處理快速入門

更新時間:Sep 13, 2024

作為流批一體的計算架構,Flink不僅能夠提供低延遲的流式資料處理(Streaming Data Processsing),也能進行高吞吐的批處理(Batch Data Processing)。Realtime ComputeFlink版對批處理能力進行了專門的支援,提供了包括作業開發、作業營運、作業編排、資源隊列管理、資料結果探查等能力,可以利用Flink批處理能力更好地解決業務需求。本文通過具體的樣本為您介紹如何利用Realtime ComputeFlink版關鍵功能進行資料批處理。

功能介紹

Realtime ComputeFlink版提供了以下關鍵功能來支援Flink批處理:

  • SQL作業開發:在SQL開發頁面的作業草稿頁簽,可以建立批作業草稿,批作業草稿會以批作業的形式被部署和執行。

  • 作業管理:在作業營運頁面,可以直接部署JAR或Python類型的批作業。在頂部下拉框中選擇批作業,查看已部署的批作業。展開目標批作業,可查看其工作執行個體列表。通常,一個批作業的不同工作執行個體具有相同的處理邏輯,但是採用不同的參數,例如處理的資料所屬日期。

  • 資料查詢:在SQL開發頁面的查詢指令碼頁簽,可以執行一些DDL或短查詢,快速地進行資料管理和資料探查。這些短查詢執行在預建立的Flink Session中,通過資源複用,實現低延遲的簡單查詢。

  • 管理中繼資料:在中繼資料管理頁面,可以建立和查看Catalog,包括其中的資料庫和表的資訊。您也可以在SQL開發頁面的中繼資料頁簽進行查看,提高開發效率。

  • 任務編排(公測):在任務編排頁面,可以定義工作流程,通過可視化的操作方式,編排一系列批作業的執行依賴。工作流程會作為一個整體,根據定義好的依賴關係執行包含的批作業。支援通過手動觸發或定時調度方式來執行建立好的工作流程。

  • 管理資源隊列:在隊列管理頁面,可以對工作空間中的資源進行劃分,從而避免流作業和批作業、以及不同優先順序的作業間發生資源爭搶。

注意事項

  • 已建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版

  • 已開通Object Storage Service,詳情請參見控制台快速入門。OSS Bucket的儲存類型需要為標準儲存,詳情請參見儲存類型概述

  • 由於本文樣本使用Apache Paimon儲存資料,僅Realtime Compute引擎VVR 8.0.5及以上版本支援本文樣本。

樣本情境

本文以一個電子商務平台的業務情境為例,使用Apache Paimon的湖倉格式對資料進行儲存。類比了一個資料倉儲結構,包括ODS(操作資料儲存)、DWD(資料倉儲細節級)、DWS(資料倉儲匯總級)的儲存層級。通過Flink的批處理能力,對資料進行加工清洗後寫入Paimon表,從而實現資料分層結構的搭建。

image

準備工作

  1. 建立資料查詢

    通過查詢指令碼頁簽,您可以建立Catalog以及其中的資料庫和表,並且向表中插入一些類比的資料。

  2. 建立Paimon Catalog。

    1. 查詢指令碼的文本編輯地區,輸入如下SQL語句。

      CREATE CATALOG `my_catalog` WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = '<warehouse>',
        'fs.oss.endpoint' = '<fs.oss.endpoint>',
        'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>',
        'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>'
      );

      參數配置項如下。

      配置項

      說明

      是否必填

      備忘

      type

      Catalog類型。

      固定值為Paimon。

      metastore

      中繼資料存放區類型。

      本文樣本填寫filesystem,其他類型詳情請參見管理Paimon Catalog

      warehouse

      OSS服務中所指定的數倉目錄。

      格式為oss://<bucket>/<object>。其中:

      • bucket:表示您建立的OSS Bucket名稱。

      • object:表示您存放資料的路徑。

      請在OSS管理主控台上查看您的Bucket和Object名稱。

      fs.oss.endpoint

      OSS服務的串連地址。

      當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其它帳號下的OSS bucket時需要填寫。

      請參見訪問網域名稱和資料中心

      fs.oss.accessKeyId

      擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的AccessKey。

      當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其它帳號下的OSS Bucket時需要填寫。擷取方法請參見建立AccessKey

      fs.oss.accessKeySecret

      擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的AccessKey Secret。

    2. 選中上述代碼,單擊左側的運行

      返回The following statement has been executed successfully!資訊表示Catalog建立成功。此時可以在中繼資料管理頁面(或是SQL開發頁面的中繼資料子頁面),查看新建立的Catalog。

      image.png

操作流程

步驟一:建立ODS表並插入測試資料

說明

為了簡化本樣本,我們直接向ODS表中插入了一些測試資料,用於後續的DWD/DWS表的資料產生。在實際生產中,一般會使用Flink流處理從外部資料源讀取資料並寫入到湖中作為ODS層,具體可以參見 Paimon快速開始:準系統

  1. 查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行

    CREATE DATABASE `my_catalog`.`order_dw`;
    
    USE `my_catalog`.`order_dw`;
    
    CREATE TABLE orders (
      order_id BIGINT,
      user_id STRING,
      shop_id BIGINT,
      product_id BIGINT,
      buy_fee BIGINT,   
      create_time TIMESTAMP,
      update_time TIMESTAMP,
      state INT
    );
    
    CREATE TABLE orders_pay (
      pay_id BIGINT,
      order_id BIGINT,
      pay_platform INT, 
      create_time TIMESTAMP
    );
    
    CREATE TABLE product_catalog (
      product_id BIGINT,
      catalog_name STRING
    );
    
    -- 插入測試資料
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
    (100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')),
    (2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
    (2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')),
    (2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56'));
    
    INSERT INTO product_catalog VALUES
      (1, 'phone_aaa'),
      (2, 'phone_bbb'),
      (3, 'phone_ccc'),
      (4, 'phone_ddd'),
      (5, 'phone_eee');
    說明

    本文建立的是不帶主鍵的Paimon Append Only表,其相比於Paimon主鍵表具有更好的批量寫入效能,但不支援基於主鍵的更新操作。

    執行結果會包含多個子標籤頁,返回The following statement has been executed successfully!資訊表示對應的DDL語句執行成功。

    INSERT等DML語句則會返回一個JobId,表明產生了Flink作業並在Flink Session中執行,單擊結果欄左側的在Flink UI中查看,可觀察到這幾條SQL語句的執行情況,等待數秒至其執行完成。

  2. 探查ODS表資料。

    查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行

    SELECT count(*) as order_count FROM `my_catalog`.`order_dw`.`orders`;
    SELECT count(*) as pay_count FROM `my_catalog`.`order_dw`.`orders_pay`;
    SELECT * FROM `my_catalog`.`order_dw`.`product_catalog`;

    這些SQL語句也會在Flink Session中執行,最終可以在3個查詢的結果頁面中查看返回結果。

    image.png image.png image.png

步驟二:建立DWD和DWS表

查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行

USE `my_catalog`.`order_dw`;

CREATE TABLE dwd_orders (
    order_id BIGINT,
    order_user_id STRING,
    order_shop_id BIGINT,
    order_product_id BIGINT,
    order_product_catalog_name STRING,
    order_fee BIGINT,
    order_create_time TIMESTAMP,
    order_update_time TIMESTAMP,
    order_state INT,
    pay_id BIGINT,
    pay_platform INT COMMENT 'platform 0: phone, 1: pc',
    pay_create_time TIMESTAMP
) WITH (
    'sink.parallelism' = '2'
);

CREATE TABLE dws_users (
    user_id STRING,
    ds STRING,
    total_fee BIGINT COMMENT '當日完成支付的總金額'
) WITH (
    'sink.parallelism' = '2'
);

CREATE TABLE dws_shops (
    shop_id BIGINT,
    ds STRING,
    total_fee BIGINT COMMENT '當日完成支付總金額'
) WITH (
    'sink.parallelism' = '2'
);
說明

此處建立的仍然是Paimon Append Only表。Paimon表作為Flink Sink不支援自動並發推導,需要顯式設定其並發度,否則可能會報錯。

步驟三:建立與部署DWD和DWS作業

  1. 建立和部署DWD作業。

    1. 建立DWD表更新作業。

      資料開發 > ETL頁面建立空白的批作業草稿,命名為dwd_orders,將如下SQL語句複製到文本編輯地區中。由於DWD表是Paimon Append Only表,因此此處使用INSERT OVERWRITE語句進行整體的覆寫。

      INSERT OVERWRITE my_catalog.order_dw.dwd_orders
      SELECT 
          o.order_id,
          o.user_id,
          o.shop_id,
          o.product_id,
          c.catalog_name,
          o.buy_fee,
          o.create_time,
          o.update_time,
          o.state,
          p.pay_id,
          p.pay_platform,
          p.create_time
      FROM 
          my_catalog.order_dw.orders as o, 
          my_catalog.order_dw.product_catalog as c, 
          my_catalog.order_dw.orders_pay as p
      WHERE o.product_id = c.product_id AND o.order_id = p.order_id
    2. 單擊頁面右上方的部署,單擊確定,部署dwd_orders作業。

  2. 建立和部署DWS作業。

    1. 建立DWS表更新作業。

      資料開發 > ETL頁面建立兩個空白的批作業草稿,分別命名為dws_shops和dws_users,將下列SQL語句分別複製到對應草稿的文本編輯地區中。

      INSERT OVERWRITE my_catalog.order_dw.dws_shops
      SELECT 
          order_shop_id,
          DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
          SUM(order_fee) as total_fee
      FROM my_catalog.order_dw.dwd_orders
      WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
      GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
      INSERT OVERWRITE my_catalog.order_dw.dws_users
      SELECT 
          order_user_id,
          DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds,
          SUM(order_fee) as total_fee
      FROM my_catalog.order_dw.dwd_orders
      WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL
      GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
    2. 單擊頁面右上方的部署,單擊確定,部署dws_shops和dws_users作業。

步驟四:啟動與查看DWD和DWS作業

  • 啟動與查看DWD作業資料。

    1. 營運中心 > 作業營運頁面,在下拉框中選擇批作業,單擊dwd_orders作業操作列下的啟動

      對應批工作執行個體列表中,產生了一個啟動中的批工作執行個體,如下圖所示。

      image.png

      當該工作執行個體的狀態變為已完成時,表示資料處理完畢。

    2. 探查資料結果。

      查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行,查詢DWD表的資料。

      SELECT * FROM `my_catalog`.`order_dw`.`dwd_orders`;

      結果如下所示。

      image

  • 啟動與查看DWS作業資料。

    1. 營運中心 > 作業營運頁面,在下拉框中選擇批作業,單擊dws_shops和dws_users作業操作列下的啟動

    2. 查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行,查詢DWS表的資料。

      SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`;
      SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;

      結果如下所示。

      image.png image.png

步驟五:通過作業編排構建批處理鏈路

本部分將把前面建立的作業編排成一個工作流程,使得它們可以被統一的觸發並有序的執行。

  1. 建立工作流程。

    1. 單擊左側的營運中心 > 任務編排,單擊建立工作流程

    2. 在彈出的面板中,填入名稱wf_orders,調度類型保持不變(預設為手動觸發),資源隊列選擇default-queue後,單擊建立,進入工作流程編輯頁面。

    3. 編輯工作流程。

      1. 單擊初始的節點,命名為v_dwd_orders,選取其作業為dwd_orders。

      2. 單擊添加節點,建立節點v_dws_shops,選取其作業為dws_shops,上遊節點為v_dwd_orders。

      3. 再次單擊添加節點,建立節點v_dws_users,選取其作業為dws_users,上遊節點為v_dwd_orders。

      4. 單擊右上方的儲存確定

        建立的工作流程如下所示。

        image.png

  2. 手動觸發工作流程運行

    說明

    工作流程也可以被修改為定時調度的工作流程,只需要在任務編排頁面,單擊工作流程右側的編輯工作流程,將調度模式修改為周期調度即可,詳情請參見任務編排(公測)

    1. 在觸發工作流程運行前,先給ODS表插入一些新資料,用於驗證工作流程的執行結果。

      查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行

      USE `my_catalog`.`order_dw`;
      
      INSERT INTO orders VALUES
      (100008, 'user_001', 12346, 1, 10000, TO_TIMESTAMP('2023-02-15 17:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
      (100009, 'user_002', 12347, 2, 20000, TO_TIMESTAMP('2023-02-15 18:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1),
      (100010, 'user_003', 12348, 3, 30000, TO_TIMESTAMP('2023-02-15 19:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1);
      
      INSERT INTO orders_pay VALUES
      (2008, 100008, 1, TO_TIMESTAMP('2023-02-15 20:40:56')),
      (2009, 100009, 1, TO_TIMESTAMP('2023-02-15 20:40:56')),
      (2010, 100010, 1, TO_TIMESTAMP('2023-02-15 20:40:56'));

      單擊結果欄左側的在Flink UI,觀察作業狀態。

    2. 營運中心 > 任務編排頁面,單擊上一部分建立的工作流程操作列下的觸發運行,單擊確定,觸發工作流程運行。

      image.png

      單擊工作流程名稱,進入工作流程執行個體列表與詳情頁面,可以看到工作流程執行個體列表。

      image.png

      單擊運行中的工作流程執行個體運行ID,即可進入工作流程執行個體的執行詳情頁面,觀察到各個節點的執行狀態。等待整個工作流程執行完成。

      image.png

  3. 查看工作流程執行結果

    1. 查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行

      SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`;
      SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;
    2. 查看工作流程的執行結果。

      可以看到,ODS層新增資料經過處理已經寫入DWS表中。

      image.png image.png

相關文檔