作為流批一體的計算架構,Flink不僅能夠提供低延遲的流式資料處理(Streaming Data Processsing),也能進行高吞吐的批處理(Batch Data Processing)。Realtime ComputeFlink版對批處理能力進行了專門的支援,提供了包括作業開發、作業營運、作業編排、資源隊列管理、資料結果探查等能力,可以利用Flink批處理能力更好地解決業務需求。本文通過具體的樣本為您介紹如何利用Realtime ComputeFlink版關鍵功能進行資料批處理。
功能介紹
Realtime ComputeFlink版提供了以下關鍵功能來支援Flink批處理:
SQL作業開發:在SQL開發頁面的作業草稿頁簽,可以建立批作業草稿,批作業草稿會以批作業的形式被部署和執行。
作業管理:在作業營運頁面,可以直接部署JAR或Python類型的批作業。在頂部下拉框中選擇批作業,查看已部署的批作業。展開目標批作業,可查看其工作執行個體列表。通常,一個批作業的不同工作執行個體具有相同的處理邏輯,但是採用不同的參數,例如處理的資料所屬日期。
資料查詢:在SQL開發頁面的查詢指令碼頁簽,可以執行一些DDL或短查詢,快速地進行資料管理和資料探查。這些短查詢執行在預建立的Flink Session中,通過資源複用,實現低延遲的簡單查詢。
管理中繼資料:在中繼資料管理頁面,可以建立和查看Catalog,包括其中的資料庫和表的資訊。您也可以在SQL開發頁面的中繼資料頁簽進行查看,提高開發效率。
任務編排(公測):在任務編排頁面,可以定義工作流程,通過可視化的操作方式,編排一系列批作業的執行依賴。工作流程會作為一個整體,根據定義好的依賴關係執行包含的批作業。支援通過手動觸發或定時調度方式來執行建立好的工作流程。
管理資源隊列:在隊列管理頁面,可以對工作空間中的資源進行劃分,從而避免流作業和批作業、以及不同優先順序的作業間發生資源爭搶。
注意事項
已建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版。
已開通Object Storage Service,詳情請參見控制台快速入門。OSS Bucket的儲存類型需要為標準儲存,詳情請參見儲存類型概述。
由於本文樣本使用Apache Paimon儲存資料,僅Realtime Compute引擎VVR 8.0.5及以上版本支援本文樣本。
樣本情境
本文以一個電子商務平台的業務情境為例,使用Apache Paimon的湖倉格式對資料進行儲存。類比了一個資料倉儲結構,包括ODS(操作資料儲存)、DWD(資料倉儲細節級)、DWS(資料倉儲匯總級)的儲存層級。通過Flink的批處理能力,對資料進行加工清洗後寫入Paimon表,從而實現資料分層結構的搭建。
準備工作
建立資料查詢。
通過查詢指令碼頁簽,您可以建立Catalog以及其中的資料庫和表,並且向表中插入一些類比的資料。
建立Paimon Catalog。
在查詢指令碼的文本編輯地區,輸入如下SQL語句。
CREATE CATALOG `my_catalog` WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = '<warehouse>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>' );
參數配置項如下。
配置項
說明
是否必填
備忘
type
Catalog類型。
是
固定值為Paimon。
metastore
中繼資料存放區類型。
是
本文樣本填寫filesystem,其他類型詳情請參見管理Paimon Catalog。
warehouse
OSS服務中所指定的數倉目錄。
是
格式為oss://<bucket>/<object>。其中:
bucket:表示您建立的OSS Bucket名稱。
object:表示您存放資料的路徑。
請在OSS管理主控台上查看您的Bucket和Object名稱。
fs.oss.endpoint
OSS服務的串連地址。
否
當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其它帳號下的OSS bucket時需要填寫。
請參見訪問網域名稱和資料中心。
fs.oss.accessKeyId
擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的AccessKey。
否
當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其它帳號下的OSS Bucket時需要填寫。擷取方法請參見建立AccessKey。
fs.oss.accessKeySecret
擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的AccessKey Secret。
否
選中上述代碼,單擊左側的運行。
返回
The following statement has been executed successfully!
資訊表示Catalog建立成功。此時可以在中繼資料管理頁面(或是SQL開發頁面的中繼資料子頁面),查看新建立的Catalog。
操作流程
步驟一:建立ODS表並插入測試資料
為了簡化本樣本,我們直接向ODS表中插入了一些測試資料,用於後續的DWD/DWS表的資料產生。在實際生產中,一般會使用Flink流處理從外部資料源讀取資料並寫入到湖中作為ODS層,具體可以參見 Paimon快速開始:準系統。
在查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行。
CREATE DATABASE `my_catalog`.`order_dw`; USE `my_catalog`.`order_dw`; CREATE TABLE orders ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee BIGINT, create_time TIMESTAMP, update_time TIMESTAMP, state INT ); CREATE TABLE orders_pay ( pay_id BIGINT, order_id BIGINT, pay_platform INT, create_time TIMESTAMP ); CREATE TABLE product_catalog ( product_id BIGINT, catalog_name STRING ); -- 插入測試資料 INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56')); INSERT INTO product_catalog VALUES (1, 'phone_aaa'), (2, 'phone_bbb'), (3, 'phone_ccc'), (4, 'phone_ddd'), (5, 'phone_eee');
說明本文建立的是不帶主鍵的Paimon Append Only表,其相比於Paimon主鍵表具有更好的批量寫入效能,但不支援基於主鍵的更新操作。
執行結果會包含多個子標籤頁,返回
The following statement has been executed successfully!
資訊表示對應的DDL語句執行成功。INSERT等DML語句則會返回一個JobId,表明產生了Flink作業並在Flink Session中執行,單擊結果欄左側的在Flink UI中查看,可觀察到這幾條SQL語句的執行情況,等待數秒至其執行完成。
探查ODS表資料。
在查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行。
SELECT count(*) as order_count FROM `my_catalog`.`order_dw`.`orders`; SELECT count(*) as pay_count FROM `my_catalog`.`order_dw`.`orders_pay`; SELECT * FROM `my_catalog`.`order_dw`.`product_catalog`;
這些SQL語句也會在Flink Session中執行,最終可以在3個查詢的結果頁面中查看返回結果。
步驟二:建立DWD和DWS表
在查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行。
USE `my_catalog`.`order_dw`;
CREATE TABLE dwd_orders (
order_id BIGINT,
order_user_id STRING,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP,
order_update_time TIMESTAMP,
order_state INT,
pay_id BIGINT,
pay_platform INT COMMENT 'platform 0: phone, 1: pc',
pay_create_time TIMESTAMP
) WITH (
'sink.parallelism' = '2'
);
CREATE TABLE dws_users (
user_id STRING,
ds STRING,
total_fee BIGINT COMMENT '當日完成支付的總金額'
) WITH (
'sink.parallelism' = '2'
);
CREATE TABLE dws_shops (
shop_id BIGINT,
ds STRING,
total_fee BIGINT COMMENT '當日完成支付總金額'
) WITH (
'sink.parallelism' = '2'
);
此處建立的仍然是Paimon Append Only表。Paimon表作為Flink Sink不支援自動並發推導,需要顯式設定其並發度,否則可能會報錯。
步驟三:建立與部署DWD和DWS作業
建立和部署DWD作業。
建立DWD表更新作業。
在
頁面建立空白的批作業草稿,命名為dwd_orders,將如下SQL語句複製到文本編輯地區中。由於DWD表是Paimon Append Only表,因此此處使用INSERT OVERWRITE語句進行整體的覆寫。INSERT OVERWRITE my_catalog.order_dw.dwd_orders SELECT o.order_id, o.user_id, o.shop_id, o.product_id, c.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, p.pay_id, p.pay_platform, p.create_time FROM my_catalog.order_dw.orders as o, my_catalog.order_dw.product_catalog as c, my_catalog.order_dw.orders_pay as p WHERE o.product_id = c.product_id AND o.order_id = p.order_id
單擊頁面右上方的部署,單擊確定,部署dwd_orders作業。
建立和部署DWS作業。
建立DWS表更新作業。
在
頁面建立兩個空白的批作業草稿,分別命名為dws_shops和dws_users,將下列SQL語句分別複製到對應草稿的文本編輯地區中。INSERT OVERWRITE my_catalog.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds, SUM(order_fee) as total_fee FROM my_catalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
INSERT OVERWRITE my_catalog.order_dw.dws_users SELECT order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds, SUM(order_fee) as total_fee FROM my_catalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');
單擊頁面右上方的部署,單擊確定,部署dws_shops和dws_users作業。
步驟四:啟動與查看DWD和DWS作業
啟動與查看DWD作業資料。
在
頁面,在下拉框中選擇批作業,單擊dwd_orders作業操作列下的啟動。對應批工作執行個體列表中,產生了一個啟動中的批工作執行個體,如下圖所示。
當該工作執行個體的狀態變為已完成時,表示資料處理完畢。
探查資料結果。
在查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行,查詢DWD表的資料。
SELECT * FROM `my_catalog`.`order_dw`.`dwd_orders`;
結果如下所示。
啟動與查看DWS作業資料。
在
頁面,在下拉框中選擇批作業,單擊dws_shops和dws_users作業操作列下的啟動。在查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行,查詢DWS表的資料。
SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`; SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;
結果如下所示。
步驟五:通過作業編排構建批處理鏈路
本部分將把前面建立的作業編排成一個工作流程,使得它們可以被統一的觸發並有序的執行。
建立工作流程。
單擊左側的
,單擊建立工作流程。在彈出的面板中,填入名稱wf_orders,調度類型保持不變(預設為手動觸發),資源隊列選擇default-queue後,單擊建立,進入工作流程編輯頁面。
編輯工作流程。
單擊初始的節點,命名為v_dwd_orders,選取其作業為dwd_orders。
單擊添加節點,建立節點v_dws_shops,選取其作業為dws_shops,上遊節點為v_dwd_orders。
再次單擊添加節點,建立節點v_dws_users,選取其作業為dws_users,上遊節點為v_dwd_orders。
單擊右上方的儲存並確定。
建立的工作流程如下所示。
手動觸發工作流程運行
說明工作流程也可以被修改為定時調度的工作流程,只需要在任務編排頁面,單擊工作流程右側的編輯工作流程,將調度模式修改為周期調度即可,詳情請參見任務編排(公測)。
在觸發工作流程運行前,先給ODS表插入一些新資料,用於驗證工作流程的執行結果。
在查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行。
USE `my_catalog`.`order_dw`; INSERT INTO orders VALUES (100008, 'user_001', 12346, 1, 10000, TO_TIMESTAMP('2023-02-15 17:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100009, 'user_002', 12347, 2, 20000, TO_TIMESTAMP('2023-02-15 18:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100010, 'user_003', 12348, 3, 30000, TO_TIMESTAMP('2023-02-15 19:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, TO_TIMESTAMP('2023-02-15 20:40:56')), (2009, 100009, 1, TO_TIMESTAMP('2023-02-15 20:40:56')), (2010, 100010, 1, TO_TIMESTAMP('2023-02-15 20:40:56'));
單擊結果欄左側的在Flink UI,觀察作業狀態。
在
頁面,單擊上一部分建立的工作流程操作列下的觸發運行,單擊確定,觸發工作流程運行。單擊工作流程名稱,進入工作流程執行個體列表與詳情頁面,可以看到工作流程執行個體列表。
單擊運行中的工作流程執行個體運行ID,即可進入工作流程執行個體的執行詳情頁面,觀察到各個節點的執行狀態。等待整個工作流程執行完成。
查看工作流程執行結果
在查詢指令碼文本編輯地區,輸入如下SQL語句並單擊左側的運行。
SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`; SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;
查看工作流程的執行結果。
可以看到,ODS層新增資料經過處理已經寫入DWS表中。
相關文檔
如果您想要對Flink批處理的原理和配置調優有更多瞭解,請參見 Flink批處理調優指南
如果您想要使用Flink+Paimon搭建即時數倉,操作步驟詳情請參見基於Flink+Paimon+StarRocks搭建流式湖倉。
除了在Realtime Compute開發控制台進行Flink作業開發等操作,您同樣可以在本地進行,具體操作請參見VS Code本地開發外掛程式。