全部產品
Search
文件中心

Realtime Compute for Apache Flink:通過Flink讀寫雲原生資料倉儲AnalyticDB PostgreSQL版(ADB PG)資料

更新時間:Sep 13, 2024

本文介紹如何通過阿里雲Realtime ComputeFlink版即時讀寫雲原生資料倉儲AnalyticDB PostgreSQL版資料。

背景資訊

雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。Realtime ComputeFlink版是基於Apache Flink構建的⼀站式即時巨量資料分析平台,內建豐富上下遊連接器,滿足不同業務情境的需求,提供高效、靈活的Realtime Compute服務。通過Realtime ComputeFlink版讀取AnalyticDB PostgreSQL版資料,可以充分發揮雲原生資料倉儲的優勢,提高資料分析的效率和精度。

使用限制

  • 該功能暫不支援AnalyticDB PostgreSQL版Serverless模式

  • 僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。

  • 僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。

    說明

    如果您使用了自訂連接器,具體操作請參見管理自訂連接器

前提條件

步驟一:配置白名單並準備資料

  1. 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台
  2. 將目標Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。

    1. 查看目標Flink工作空間的虛擬交換器所屬網段,詳情請參見如何設定白名單?

    2. 添加至目標AnalyticDB PostgreSQL版執行個體的白名單中,請參見操作步驟

      說明

      如果您通過公網訪問,請添加公網IP至白名單。

  3. 單擊頁面右上方的登入資料庫,並填寫帳號和密碼。串連資料庫的更多方式,請參見用戶端串連

  4. 在對應執行個體的目標資料庫中建立一張名為adbpg_dim_table的表並插入50條測試資料。

    建表SQL和插入資料SQL的樣本如下:

    --建立名稱為adbpg_dim_table的表。
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    --向adbpg_dim_table的表中插入50行資料,其中id欄位的值為從1到50的整數,而username欄位的值為username字串後面跟隨當前行數的文本表示。
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);

    您可以使用select * from adbpg_dim_table order by id;語句查看插入後的資料。

  5. 建立一張名為adbpg_sink_table的表,用於Flink寫入結果資料。

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

步驟二:建立Flink作業

  1. 登入Realtime Compute控制台,單擊目標工作空間操作列下的控制台

  2. 在左側導覽列,單擊資料開發 > ETL,單擊建立,選擇空白的流作業草稿,單擊下一步

  3. 新增作業草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    adbpg-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

    vvr-8.0.1-flink-1.17

  4. 單擊建立

步驟三:編寫作業代碼並部署作業

  1. 將以下作業代碼拷貝到作業文本編輯區。

    ---建立一個datagen源表。本樣本中無需修改WITH參數。
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen', 
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --建立adbpg維表。需根據您的實際情況修改WITH參數。
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flinktest',
     'password' = '${secret_values.adb_password}',
     'maxRetryTimes'='2', --寫入資料失敗後,重試寫入的最大次數。
     'cache'='lru',  --緩衝策略,
     'cacheSize'='100'  --緩衝大小
    );
    
    --建立adbpg結果表。需根據您的實際情況修改WITH參數。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flinktest',
      'password' = '${secret_values.adb_password}',
      'maxRetryTimes' = '2',
      'conflictMode' = 'ignore',--當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。
      'retryWaitTime' = '200'  --重試的時間間隔。
    );
    
    --維表和源表join後的結果插入adbpg結果表。
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
    JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. 根據實際情況修改參數。

    本樣本中無需修改datagen源表。您需要根據實際情況修改adbpg維表和結果表參數,具體說明如下。涉及的連接器更多相關參數和類型映射請參見相關文檔

    參數

    是否必填

    說明

    url

    AnalyticDB PostgreSQL版的JDBC串連地址。格式為jdbc:postgresql://<地址>:<連接埠>/<串連的資料庫名稱>。您可在雲原生資料倉儲 AnalyticDB PostgreSQL版控制台對應執行個體的資料庫連接頁面查看。

    tablename

    AnalyticDB PostgreSQL版的表名。

    username

    AnalyticDB PostgreSQL版的資料庫帳號。

    password

    AnalyticDB PostgreSQL版的資料庫帳號密碼。

    targetSchema

    Schema名稱。預設為public。如果您使用了對應資料庫下其他Schema,請填寫此參數。

  3. 在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

  4. 單擊部署

  5. 營運中心 > 作業營運頁面,單擊目標作業操作列下的啟動

步驟四:查看寫入資料結果

  1. 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台
  2. 單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連

  3. 執行如下查詢語句,查看Flink寫入資料。

    SELECT * FROM adbpg_sink_table ORDER BY id;

    結果如下圖所示。

    image.png

相關文檔