本文介紹如何通過阿里雲Realtime ComputeFlink版即時讀寫雲原生資料倉儲AnalyticDB PostgreSQL版資料。
背景資訊
雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。Realtime ComputeFlink版是基於Apache Flink構建的⼀站式即時巨量資料分析平台,內建豐富上下遊連接器,滿足不同業務情境的需求,提供高效、靈活的Realtime Compute服務。通過Realtime ComputeFlink版讀取AnalyticDB PostgreSQL版資料,可以充分發揮雲原生資料倉儲的優勢,提高資料分析的效率和精度。
使用限制
該功能暫不支援AnalyticDB PostgreSQL版Serverless模式。
僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。
僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。
說明如果您使用了自訂連接器,具體操作請參見管理自訂連接器。
前提條件
AnalyticDB PostgreSQL版執行個體和Flink全託管工作空間需要位於同一VPC下。
說明不在同一VPC下時請參見網路連通性。
已建立Flink全託管工作空間。具體操作請參見開通Realtime ComputeFlink版。
步驟一:配置白名單並準備資料
- 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台。
將目標Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。
單擊頁面右上方的登入資料庫,並填寫帳號和密碼。串連資料庫的更多方式,請參見用戶端串連。
在對應執行個體的目標資料庫中建立一張名為adbpg_dim_table的表並插入50條測試資料。
建表SQL和插入資料SQL的樣本如下:
--建立名稱為adbpg_dim_table的表。 CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); --向adbpg_dim_table的表中插入50行資料,其中id欄位的值為從1到50的整數,而username欄位的值為username字串後面跟隨當前行數的文本表示。 INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);
您可以使用
select * from adbpg_dim_table order by id;
語句查看插入後的資料。建立一張名為adbpg_sink_table的表,用於Flink寫入結果資料。
CREATE TABLE adbpg_sink_table( id int, username text, score int );
步驟二:建立Flink作業
登入Realtime Compute控制台,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊
,單擊建立,選擇空白的流作業草稿,單擊下一步。在新增作業草稿對話方塊,填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
adbpg-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊表徵圖,建立子檔案夾。
作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-8.0.1-flink-1.17
單擊建立。
步驟三:編寫作業代碼並部署作業
將以下作業代碼拷貝到作業文本編輯區。
---建立一個datagen源表。本樣本中無需修改WITH參數。 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --建立adbpg維表。需根據您的實際情況修改WITH參數。 CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes'='2', --寫入資料失敗後,重試寫入的最大次數。 'cache'='lru', --緩衝策略, 'cacheSize'='100' --緩衝大小 ); --建立adbpg結果表。需根據您的實際情況修改WITH參數。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes' = '2', 'conflictMode' = 'ignore',--當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。 'retryWaitTime' = '200' --重試的時間間隔。 ); --維表和源表join後的結果插入adbpg結果表。 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;
根據實際情況修改參數。
本樣本中無需修改datagen源表。您需要根據實際情況修改adbpg維表和結果表參數,具體說明如下。涉及的連接器更多相關參數和類型映射請參見相關文檔。
參數
是否必填
說明
url
是
AnalyticDB PostgreSQL版的JDBC串連地址。格式為
jdbc:postgresql://<地址>:<連接埠>/<串連的資料庫名稱>
。您可在雲原生資料倉儲 AnalyticDB PostgreSQL版控制台對應執行個體的資料庫連接頁面查看。tablename
是
AnalyticDB PostgreSQL版的表名。
username
是
AnalyticDB PostgreSQL版的資料庫帳號。
password
是
AnalyticDB PostgreSQL版的資料庫帳號密碼。
targetSchema
否
Schema名稱。預設為public。如果您使用了對應資料庫下其他Schema,請填寫此參數。
在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
單擊部署。
在
頁面,單擊目標作業操作列下的啟動。
步驟四:查看寫入資料結果
- 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台。
單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連。
執行如下查詢語句,查看Flink寫入資料。
SELECT * FROM adbpg_sink_table ORDER BY id;
結果如下圖所示。