全部產品
Search
文件中心

Realtime Compute for Apache Flink:基於Paimon的資料庫即時入湖快速入門

更新時間:Dec 21, 2024

Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。本文通過Paimon Catalog和MySQL連接器,將雲資料庫RDS中的訂單資料和表結構變更匯入Paimon表中,並使用Flink對Paimon表進行簡單分析。

背景資訊

Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。目前阿里雲Realtime ComputeFlink版,以及開源巨量資料平台E-MapReduce上常見的計算引擎(例如Spark、Hive或Trino)都與Paimon有著較為完善的整合度。您可以藉助Apache Paimon快速地在HDFS或者OSS上構建自己的資料湖儲存服務,並接入計算引擎實現資料湖的分析。

前提條件

步驟一:準備資料來源

  1. 第一步:快捷建立RDS MySQL執行個體與設定資料庫

    說明

    RDS MySQL版執行個體需要與Flink工作空間處於同一VPC。不在同一VPC下時請參見網路連通性

    建立名稱為orders的資料庫,並建立高許可權帳號或具有資料庫orders讀寫權限的普通帳號。

  2. 第二步:串連RDS MySQL執行個體,在orders資料庫中建立表orders_1和orders_2。

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  3. 插入如下測試資料。

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

步驟二:建立Catalog

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台

    2. 單擊目標工作空間操作列下的控制台

    3. 單擊中繼資料管理

  2. 建立Paimon Catalog。

    1. 單擊建立Catalog內建Catalog頁簽,選擇Apache Paimon後,單擊下一步。

    2. 填寫配置資訊。

      image.png

      配置項

      說明

      備忘

      catalog name

      填寫自訂的Paimon Catalog名稱。

      在本例中為paimon-catalog。

      metastore

      Paimon表的中繼資料存放區類型:

      • filesystem:僅將中繼資料存放區在OSS中。

      • dlf:除了將中繼資料存放區在OSS上外,還會將中繼資料同步到阿里雲資料湖構建服務DLF中。

      本文選擇filesystem。

      warehouse

      Paimon Catalog的儲存根目錄,是一個OSS目錄。可以選擇建立Realtime ComputeFlink版時使用的OSS Bucket,也可以使用同一帳號同一地區下的其他OSS Bucket。

      格式為oss://<bucket>/<object>。其中:

      • bucket:表示您建立的OSS Bucket名稱。

      • object:表示您存放資料的路徑。

      您可以在OSS管理主控台上查看您的bucket和object名稱。

      fs.oss.endpoint

      OSS服務的串連地址。

      如果Flink與DLF位於同一地區,則使用VPC網路Endpoint,否則使用公網Endpoint。擷取方法請參見OSS地區和訪問網域名稱

      fs.oss.accessKeyId

      擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的Accesskey ID。

      如果您沒有Accesskey ID,詳情請參見建立AccessKey

      fs.oss.accessKeySecret

      擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的Accesskey secret。

      本文使用變數的方式填寫AccessKey Secret取值,避免明文泄露的風險,詳情請參見專案變數

    3. 單擊確定

  3. 建立MySQL Catalog。

    1. 單擊建立Catalog內建Catalog頁簽,選擇MySQL後,單擊下一步。

    2. 填寫配置資訊。

      mysql-catalog.png

      參數

      說明

      備忘

      catalogname

      MySQL Catalog名稱。

      本樣本填寫為mysql-catalog。

      hostname

      MySQL資料庫的IP地址或者Hostname。

      本樣本填寫為RDS執行個體的內網地址。

      port

      MySQL資料庫服務的連接埠號碼。

      預設值為3306。

      default-database

      預設的MySQL資料庫名稱。

      本樣本填寫步驟一:準備資料來源中建立的orders資料庫。

      username

      MySQL資料庫服務的使用者名稱。

      填寫您的資料庫使用者名稱。

      password

      MySQL資料庫服務的密碼。

      本文使用變數的方式填寫AccessKey Secret取值,避免明文泄露的風險,詳情請參見專案變數

    3. 單擊確定

步驟三:建立Flink作業

  1. 資料開發 > ETL頁面,單擊建立

  2. 選擇空白的流作業草稿,單擊下一步

  3. 新增作業草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    儲存位置

    指定該作業的儲存位置。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    引擎版本

    當前作業使用的Flink的引擎版本。

    建議使用帶有推薦標籤的版本,這些版本具有更高的可靠性和效能表現,引擎版本詳情請參見功能發布記錄引擎版本介紹

  4. 單擊建立

  5. 輸入以下語句,即時捕獲orders資料庫中相關表的變化,並同步到Paimon表中。

    -- 捕獲表名符合Regexorders_\d+的MySQL表,將MySQL表的變化同步到Paimon的預設資料庫orders表中
    CREATE TABLE IF NOT EXISTS `paimon-catalog`.`default`.`orders` AS TABLE `mysql-catalog`.`orders`.`orders_\d+`;

    CREATE TABLE AS文法的使用,詳情請參見CREATE TABLE AS(CTAS)語句

  6. (可選)單擊右上方的深度檢查,確認作業Flink SQL語句中是否存在語法錯誤。

  7. 單擊右上方的部署,單擊確定

  8. 在左側導覽列,單擊營運中心 > 作業營運,單擊目標作業名稱,進入作業部署詳情頁面。

  9. 單擊運行參數配置地區右側的編輯

    本文為了更快觀察到任務啟動並執行結果,將系統檢查點間隔兩次系統檢查點之間的最短時間間隔均改為10s,單擊儲存

    image

  10. 在目標作業部署詳情頁頂部,單擊啟動,選擇無狀態啟動後,單擊啟動

    image.png

  11. 查詢Paimon資料。

    1. 資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼。

      select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;
    2. 選中目標片段後,單擊左側程式碼上的運行

      image.png

步驟四:更新MySQL表結構

本部分將示範MySQL表結構變更同步到Paimon表的功能。

  1. 登入雲資料庫RDS控制台

  2. 在orders資料庫,輸入如下SQL語句,然後單擊執行,為兩張資料表添加一列,並填充一些資料。

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. Realtime Compute控制台資料開發 > 資料查詢頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼。

    select * from `test`.`default`.`orders` where `quantity` is not null;

    選中目標片段後單擊左側程式碼上的運行

    Image 32

相關文檔