Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。本文通過Paimon Catalog和MySQL連接器,將雲資料庫RDS中的訂單資料和表結構變更匯入Paimon表中,並使用Flink對Paimon表進行簡單分析。
背景資訊
Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。目前阿里雲Realtime ComputeFlink版,以及開源巨量資料平台E-MapReduce上常見的計算引擎(例如Spark、Hive或Trino)都與Paimon有著較為完善的整合度。您可以藉助Apache Paimon快速地在HDFS或者OSS上構建自己的資料湖儲存服務,並接入計算引擎實現資料湖的分析。
前提條件
如果您使用RAM使用者或RAM角色等身份訪問,需要確認已具有Flink控制台相關許可權,詳情請參見許可權管理。
已建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版。
步驟一:準備資料來源
- 說明
RDS MySQL版執行個體需要與Flink工作空間處於同一VPC。不在同一VPC下時請參見網路連通性。
建立名稱為orders的資料庫,並建立高許可權帳號或具有資料庫orders讀寫權限的普通帳號。
第二步:串連RDS MySQL執行個體,在orders資料庫中建立表orders_1和orders_2。
CREATE TABLE `orders_1` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) ); CREATE TABLE `orders_2` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) );
插入如下測試資料。
INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among '); INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot'); INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos'); INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro'); INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly'); INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia'); INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests '); INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep'); INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request'); INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe'); INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio'); INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu'); INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never'); INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re'); INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir'); INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin'); INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit'); INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate'); INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron'); INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');
步驟二:建立Catalog
進入中繼資料管理頁面。
單擊目標工作空間操作列下的控制台。
單擊左側的中繼資料管理。
建立Paimon Catalog。
單擊建立Catalog,在內建Catalog頁簽,選擇Apache Paimon後,單擊下一步。
填寫配置資訊。
配置項
說明
備忘
catalog name
您自訂的Paimon Catalog名稱。
填寫為自訂的英文名。
metastore
Paimon表的中繼資料存放區類型:
filesystem:僅將中繼資料存放區在OSS中。
dlf:除了將中繼資料存放區在OSS上外,還會將中繼資料同步到阿里雲資料湖構建服務DLF中。
本文選擇filesystem。
warehouse
Paimon Catalog的儲存根目錄,是一個OSS目錄。可以選擇建立Realtime ComputeFlink版時使用的OSS Bucket,也可以使用同一帳號同一地區下的其他OSS Bucket。
格式為oss://<bucket>/<object>。其中:
bucket:表示您建立的OSS Bucket名稱。
object:表示您存放資料的路徑。
您可以在OSS管理主控台上查看您的bucket和object名稱。
fs.oss.endpoint
OSS服務的串連地址。
如果Flink與DLF位於同一地區,則使用VPC網路Endpoint,否則使用公網Endpoint。
當warehouse指定的OSS Bucket與Flink工作空間不在同一地區,或使用其它帳號下的OSS Bucket時需要填寫。
擷取方法請參見OSS地區和訪問網域名稱和建立AccessKey。
fs.oss.accessKeyId
擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的Accesskey ID。
fs.oss.accessKeySecret
擁有讀寫OSS許可權的阿里雲帳號或RAM帳號的Accesskey secret。
單擊確定。
步驟三:建立Flink作業
在
頁面,單擊建立。選擇空白的流作業草稿,單擊下一步。
在新增作業草稿對話方塊,填寫作業配置資訊。
作業參數
說明
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
儲存位置
指定該作業的儲存位置。
您還可以在現有檔案夾右側,單擊表徵圖,建立子檔案夾。
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
單擊建立。
輸入以下語句,即時捕獲orders資料庫中相關表的變化,並同步到Paimon表中。
-- 使用剛剛建立的Paimon Catalog USE CATALOG `test`; -- 建立一張MySQL暫存資料表,捕獲表名符合Regexorders_\d+的MySQL表的變化 CREATE TEMPORARY TABLE mysql_orders ( orderkey BIGINT, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, `comment` VARCHAR(100), PRIMARY KEY (orderkey) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21ey****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'orders', 'table-name' = 'orders_\d+', 'server-time-zone' = 'Asia/Shanghai' ); -- 將MySQL表的變化同步到Paimon表中 CREATE TABLE IF NOT EXISTS orders AS TABLE mysql_orders;
參數說明如下,您可以根據實際情況進行修改。MySQL連接器更多參數詳情請參見MySQL。
參數
說明
備忘
connector
連接器類型。
本樣本固定值為
mysql
。hostname
MySQL資料庫的IP地址或者Hostname。
本文填寫為RDS執行個體的內網地址。
username
MySQL資料庫服務的使用者名稱。
無。
password
MySQL資料庫服務的密碼。
本樣本通過使用名為mysql_pw密鑰的方式填寫密碼值,避免資訊泄露,詳情請參見變數管理。
database-name
MySQL資料庫名稱。
本樣本填寫為步驟一:準備資料來源中建立的資料庫。
table-name
MySQL表名。
作為源表時,表名支援Regex以讀取多個表的資料。
port
MySQL資料庫服務的連接埠號碼。
無。
(可選)單擊右上方的深度檢查,確認作業Flink SQL語句中是否存在語法錯誤。
單擊右上方的部署,單擊確定。
在左側導覽列,單擊
,單擊目標作業名稱,進入作業部署詳情頁面。單擊運行參數配置地區右側的編輯。
本文為了更快觀察到任務啟動並執行結果,將系統檢查點間隔和兩次系統檢查點之間的最短時間間隔均改為10s,單擊儲存。
在目標作業部署詳情頁頂部,單擊啟動,選擇無狀態啟動。
查詢Paimon資料。
在
頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行。select custkey, sum(total_price) from `test`.`default`.`orders` group by custkey;
結果瀏覽完成後,單擊左側的停止調試。
步驟四:更新MySQL表結構
本部分將示範MySQL表結構變更同步到Paimon表的功能。
登入雲資料庫RDS控制台。
在orders資料庫,輸入如下SQL語句,然後單擊執行,為兩張資料表添加一列,並填充一些資料。
ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5; UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
在Realtime Compute控制台 頁面的查詢指令碼頁簽,將如下代碼拷貝到查詢指令碼後,選中目標片段後單擊左側程式碼上的運行。
select * from `test`.`default`.`orders` where `quantity` is not null;
結果如下,瀏覽完成後,可單擊左側的停止調試。
相關文檔
流式資料湖倉Paimon連接器可以配合Paimon Catalog使用,使用方法、功能等詳情請參見流式資料湖倉Paimon。
Paimon Catalog的使用,詳情請參見管理Paimon Catalog。