全部產品
Search
文件中心

Realtime Compute for Apache Flink:基於Paimon的資料庫即時入湖快速入門

更新時間:Nov 13, 2024

Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。本文通過Paimon Catalog和MySQL連接器,將雲資料庫RDS中的訂單資料和表結構變更匯入Paimon表中,並使用Flink對Paimon表進行簡單分析。

背景資訊

Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。目前阿里雲Realtime ComputeFlink版,以及開源巨量資料平台E-MapReduce上常見的計算引擎(例如Spark、Hive或Trino)都與Paimon有著較為完善的整合度。您可以藉助Apache Paimon快速地在HDFS或者OSS上構建自己的資料湖儲存服務,並接入計算引擎實現資料湖的分析。

前提條件

步驟一:準備資料來源

  1. 第一步:快捷建立RDS MySQL執行個體與設定資料庫

    說明

    RDS MySQL版執行個體需要與Flink工作空間處於同一VPC。不在同一VPC下時請參見網路連通性

    建立名稱為orders的資料庫,並建立高許可權帳號或具有資料庫orders讀寫權限的普通帳號。

  2. 第二步:串連RDS MySQL執行個體,在orders資料庫中建立表orders_1和orders_2。

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  3. 插入如下測試資料。

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

步驟二:建立Catalog

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 單擊左側的中繼資料管理

  2. 建立Paimon Catalog。

    1. 單擊建立Catalog內建Catalog頁簽,選擇Apache Paimon後,單擊下一步。

    2. 填寫配置資訊。

      image.png

      配置項

      說明

      備忘

      catalog name

      您自訂的Paimon Catalog名稱。

      填寫為自訂的英文名。

      metastore

      Paimon表的中繼資料存放區類型:

      • filesystem:僅將中繼資料存放區在OSS中。

      • dlf:除了將中繼資料存放區在OSS上外,還會將中繼資料同步到阿里雲資料湖構建服務DLF中。

      本文選擇filesystem。

      warehouse

      Paimon Catalog的儲存根目錄,是一個OSS目錄。可以選擇建立Realtime ComputeFlink版時使用的OSS Bucket,也可以使用同一帳號同一地區下的其他OSS Bucket。

      格式為oss://<bucket>/<object>。其中:

      • bucket:表示您建立的OSS Bucket名稱。

      • object:表示您存放資料的路徑。

      您可以在OSS管理主控台上查看您的bucket和object名稱。

      fs.oss.endpoint

      OSS服務的串連地址。

      • 如果Flink與DLF位於同一地區,則使用VPC網路Endpoint,否則使用公網Endpoint。

      • 當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其它帳號下的OSS Bucket時需要填寫。

      擷取方法請參見OSS地區和訪問網域名稱建立AccessKey

      fs.oss.accessKeyId

      擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的Accesskey ID。

      fs.oss.accessKeySecret

      擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的Accesskey secret。

    3. 單擊確定

步驟三:建立Flink作業

  1. 資料開發 > ETL頁面,單擊建立

  2. 選擇空白的流作業草稿,單擊下一步

  3. 新增作業草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    儲存位置

    指定該作業的儲存位置。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

  4. 單擊建立

  5. 輸入以下語句,即時捕獲orders資料庫中相關表的變化,並同步到Paimon表中。

    -- 使用剛剛建立的Paimon Catalog
    USE CATALOG `test`;
    
    -- 建立一張MySQL暫存資料表,捕獲表名符合Regexorders_\d+的MySQL表的變化
    CREATE TEMPORARY TABLE mysql_orders (
        orderkey BIGINT,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        `comment` VARCHAR(100),
        PRIMARY KEY (orderkey) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql',
        'hostname' = 'rm-bp1s1xgll21ey****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'your_username',
        'password' = '${secret_values.mysql_pw}',
        'database-name' = 'orders',
        'table-name' = 'orders_\d+',
        'server-time-zone' = 'Asia/Shanghai'
    );
    
    -- 將MySQL表的變化同步到Paimon表中
    CREATE TABLE IF NOT EXISTS orders AS TABLE mysql_orders;

    ​參數說明如下,您可以根據實際情況進行修改。MySQL連接器更多參數詳情請參見MySQL

    參數

    說明

    備忘

    connector

    連接器類型。

    本樣本固定值為mysql

    hostname

    MySQL資料庫的IP地址或者Hostname。

    本文填寫為RDS執行個體的內網地址。

    username

    MySQL資料庫服務的使用者名稱。

    無。

    password

    MySQL資料庫服務的密碼。

    本樣本通過使用名為mysql_pw密鑰的方式填寫密碼值,避免資訊泄露,詳情請參見變數管理

    database-name

    MySQL資料庫名稱。

    本樣本填寫為步驟一:準備資料來源中建立的資料庫。

    table-name

    MySQL表名。

    作為源表時,表名支援Regex以讀取多個表的資料。

    port

    MySQL資料庫服務的連接埠號碼。

    無。

  6. (可選)單擊右上方的深度檢查,確認作業Flink SQL語句中是否存在語法錯誤。

  7. 單擊右上方的部署,單擊確定

  8. 在左側導覽列,單擊營運中心 > 作業營運,單擊目標作業名稱,進入作業部署詳情頁面。

  9. 單擊運行參數配置地區右側的編輯

    本文為了更快觀察到任務啟動並執行結果,將系統檢查點間隔兩次系統檢查點之間的最短時間間隔均改為10s,單擊儲存

    image

  10. 在目標作業部署詳情頁頂部,單擊啟動,選擇無狀態啟動

    image.png

  11. 查詢Paimon資料。

    1. 資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

      select custkey, sum(total_price) from `test`.`default`.`orders` group by custkey;
    2. 結果瀏覽完成後,單擊左側的image.png停止調試。

      image.png

步驟四:更新MySQL表結構

本部分將示範MySQL表結構變更同步到Paimon表的功能。

  1. 登入雲資料庫RDS控制台

  2. 在orders資料庫,輸入如下SQL語句,然後單擊執行,為兩張資料表添加一列,並填充一些資料。

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. Realtime Compute控制台資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行

    select * from `test`.`default`.`orders` where `quantity` is not null;

    結果如下,瀏覽完成後,可單擊左側的image.png停止調試。

    Image 32

相關文檔