全部產品
Search
文件中心

AnalyticDB:通過Flink CDC即時訂閱全量和增量資料(邀測中)

更新時間:Sep 12, 2025

AnalyticDB for PostgreSQL提供自研的CDC連接器,基於PostgreSQL的邏輯複製功能實現訂閱全量和增量資料,可與Flink無縫整合。該連接器能夠高效捕獲源表的即時變更資料,支援即時資料同步、串流等,助力企業快速響應動態資料需求。本文介紹如何通過阿里雲Realtime ComputeFlink版CDC即時訂閱AnalyticDB for PostgreSQL的全量和增量資料。

使用限制

  • 核心版本為7.2.1.4及以上的AnalyticDB for PostgreSQL7.0版執行個體。

    說明

    您可以在控制台執行個體的基本資料頁查看核心小版本。如不滿足上述版本要求,需要您升級核心小版本

  • 暫不支援AnalyticDB for PostgreSQL的Serverless模式。

前提條件

  • AnalyticDB for PostgreSQL執行個體和Flink全託管工作空間位於同一VPC。

  • 調整AnalyticDB for PostgreSQL執行個體參數配置

    • 開啟邏輯解析,即設定wal_level參數的值為logical。

    • 若您的AnalyticDB for PostgreSQL執行個體系列為高可用版,為保障遷移任務的順利進行,避免因主備切換而導致邏輯訂閱中斷,還需將hot_standbyhot_standby_feedbacksync_replication_slots參數的值均配置為on。

  • AnalyticDB for PostgreSQL帳號是初始帳號高許可權使用者RDS_SUPERUSER,且需要賦權REPLICATION,ALTER USER <username> WITH REPLICATION;

  • 已將Flink工作空間所屬的網段添加至AnalyticDB for PostgreSQL白名單

  • 下載flink-sql-connector-adbpg-cdc-3.3.jar,並在Flink工作空間內上傳CDC連接器

操作步驟

步驟一:準備測試表和測試資料

  1. 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台,找到目標執行個體,單擊執行個體ID。

  2. 基本資料頁面的右側下,單擊登入資料庫

  3. 建立測試資料庫和源表adbpg_source_table,並向源表寫入50條資料。

    -- 建立測試資料庫
    CREATE DATABASE testdb;
    -- 切換至testdb資料庫並建立schema
    CREATE SCHEMA testschema;
    -- 建立源表adbpg_source_table
    CREATE TABLE testschema.adbpg_source_table(
      id int,
      username text,
      PRIMARY KEY(id)
    );
    -- 向adbpg_source_table的表中寫入50條資料
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  4. 建立結果表adbpg_sink_table,用於Flink寫入結果資料。

    CREATE TABLE testschema.adbpg_sink_table(
      id int,
      username text,
      score int
    );

步驟二:建立Flink作業

  1. 登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  2. 在左側導覽列,單擊資料開發 > ETL

  3. 單擊頂部功能表列建立,選擇空白的流作業草稿,單擊下一步,在建立檔案草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    adbpg-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

    vvr-6.0.7-flink-1.15

  4. 單擊建立

步驟三:編寫作業代碼並部署

  1. 建立資料來源datagen_source(用於產生類比資料)和source_adbpg(用於從 AnalyticDB for PostgreSQL資料庫中捕獲即時的資料變更),將兩個資料來源關聯,並將結果寫入結果表sink_adbpg(將處理後的資料寫入AnalyticDB for PostgreSQL)。

    將以下作業代碼拷貝到作業文本編輯區。

    ---建立datagen源表,使用datagen連接器產生流式資料
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='100',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --建立adbpg源表,使用adbpg-cdc連接器,通過slot.name和邏pgoutput捕獲adbpg_source_table的資料變更
    CREATE TEMPORARY TABLE source_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) NOT ENFORCED
    ) WITH(
      'connector' = 'adbpg-cdc', 
      'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'testschema',
      'table-name' = 'adbpg_source_table',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput'
    );
    
    --建立adbpg結果表,將處理後的結果寫入資料庫的目標表adbpg_sink_table。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb',
      'tablename' = 'testschema.adbpg_sink_table',  
      'username' = 'account****',
      'password' = 'password****',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    -- datagen_source表和source_adbpg源表關聯後的結果寫入adbpg結果表
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds
    JOIN source_adbpg AS ts ON ds.id = ts.id;

    參數說明

    參數名稱

    是否必填

    資料類型

    描述

    connector

    STRING

    連線類型。源表固定值為adbpg-cdc,結果表固定值為adbpg

    hostname

    STRING

    AnalyticDB for PostgreSQL執行個體的內網地址。您可在目標執行個體的基本資料頁簽下擷取內網地址

    username

    STRING

    AnalyticDB for PostgreSQL資料庫的帳號和密碼。

    password

    STRING

    database-name

    STRING

    資料庫名稱。

    schema-name

    STRING

    Schema名稱。該參數支援Regex,可以一次訂閱多個Schema。

    table-name

    STRING

    表名稱。該參數支援Regex,可以一次訂閱多個表。

    port

    INTEGER

    AnalyticDB for PostgreSQL連接埠,固定值為5432。

    decoding.plugin.name

    STRING

    Postgres Logical Decoding外掛程式名稱。固定值為pgoutput。

    slot.name

    STRING

    邏輯解碼槽的名字。

    • 對於同一個Flink作業涉及的源表,建議使用相同的slot.name

    • 如果不同的Flink作業涉及同一張表,則建議為每個作業分別設定獨立的slot.name參數,以避免出現以下錯誤:PSQLException: ERROR: replication slot "debezium" is active for PID 974

    debezium.*

    STRING

    該參數可以更細粒度地控制Debezium用戶端的行為。例如,設定 'debezium.snapshot.mode' = 'never' 可以禁用快照功能。您可以通過配置屬性擷取更多配置詳情。

    scan.incremental.snapshot.enabled

    BOOLEAN

    是否開啟增量快照,取值如下。

    • false(預設值):不開啟增量快照。

    • true:開啟增量快照。

    scan.startup.mode

    STRING

    消費資料時的啟動模式,取值如下。

    • initial(預設值):在初次開機時,先掃描歷史全量資料,然後讀取最新的WAL日誌資料,實現全量與增量資料的無縫銜接。

    • latest-offset:在初次開機時,不掃描歷史全量資料,直接從 WAL 日誌的末尾(即最新的日誌位置)開始讀取,僅捕獲連接器啟動後的最新變更資料。

    • snapshot:先掃描歷史全量資料,同時讀取全量階段新產生的 WAL 日誌,最終作業會在完成全量掃描後停止運行。

    changelog-mode

    STRING

    用於編碼流更改的變更日誌(Changelog)模式,取值如下。

    • ALL(預設值):支援所有操作類型,包括 INSERTDELETEUPDATE_BEFORE 和 UPDATE_AFTER

    • UPSERT:僅支援 UPSERT 類型的操作,包括 INSERTDELETE 和 UPDATE_AFTER

    heartbeat.interval.ms

    DURATION

    發送心跳包的時間間隔,預設值為30秒(單位:毫秒)。

    AnalyticDB for PostgreSQLCDC連接器通過主動向資料庫發送心跳包,確保Slot的位移量能夠持續推進。在表資料變更不頻繁的情況下,合理設定該參數可以及時清理WAL日誌,避免浪費磁碟空間。

    scan.incremental.snapshot.chunk.key-column

    STRING

    指定某一列作為快照階段分區的切分列,預設情況下會從主鍵中選擇第一列。

    url

    STRING

    格式為jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

  2. 在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

  3. 單擊部署,單擊確定

  4. 單擊右上方前往營運,在作業營運頁面,單擊啟動

步驟四:查看Flink寫入資料

  1. 在測試資料庫中執行如下語句,查看Flink寫入的資料。

    SELECT * FROM testschema.adbpg_sink_table;
    SELECT COUNT(*) FROM testschema.adbpg_sink_table; 
  2. 在源表中新增50條資料,並在結果表中查看Flink寫入的增量資料的總數。

    -- 源表寫入50條增量資料
    INSERT INTO testschema.adbpg_source_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(51, 100) AS t(i);
    
    -- 檢查目標表新增資料
    SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;

    結果如下。

     count 
    -------
        50
    (1 row)

注意事項

  • 請及時管理Replication Slot,以避免磁碟空間的浪費。

    為了防止Flink作業重啟過程中因Checkpoint對應的WAL(Write-Ahead Log)被清理而導致資料丟失,Flink不會自動刪除Replication Slot。因此,如果確認某個Flink作業不再需要重啟,請手動刪除其對應的Replication Slot,以釋放其佔用的資源。此外,如果Replication Slot的確認位點長時間未推進,AnalyticDB for PostgreSQL將無法清理該位點之後的WAL條目,可能會導致未使用的WAL資料持續積累,佔用大量磁碟空間。

  • AnalyticDB for PostgreSQL執行個體正常運行期間,能保證Exactly Once資料處理語義。但在故障情境下,僅能提供至少一次(At-Least Once)的語義保障。

  • 訂閱表的REPLICA IDENTITY參數會被CDC連接器修改為FULL,以保障該表資料同步的一致性,影響如下:

    • 磁碟空間佔用增加。在頻繁更新或刪除操作的情境下,該設定將增加WAL日誌的大小,導致磁碟空間佔用增加。

    • 寫操作效能下降。在高並發寫入情境下,效能可能會受到明顯影響。

    • 檢查點壓力增大。更大的WAL日誌意味著檢查點(Checkpoint)需要處理更多的資料,可能導致檢查點時間延長。

最佳實務

Flink CDC支援通過Flink SQL API或DataStream API進行作業開發,可實現對源庫單表或多表的全量與增量一體化的資料同步;此外,它還支援對異構資料來源進行雙流Join等計算。在整個資料處理過程中,Flink架構能夠確保精確一次(Exactly-once)的事件處理語義。然而,它不適合對PostgreSQL係數據庫進行全庫同步,一方面是因為它不支援DDL的同步,另一方面需要在Flink SQL中定義每張表的結構,後期維護相對複雜。

本章節將從AnalyticDB for PostgreSQL同步資料到Kafka為例,介紹Flink CDC SQL作業開發的最佳實務。在開發Flink CDC作業前,請確保已完成前提條件中的資源準備與配置。

步驟一:準備測試表

AnalyticDB for PostgreSQL執行個體中建立兩張源表。

CREATE TABLE products (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(200) NOT NULL,
    sku CHAR(12) NOT NULL,
    description TEXT,
    price NUMERIC(10,2) NOT NULL,
    discount_price DECIMAL(10,2),
    stock_quantity INTEGER DEFAULT 0,
    weight REAL,
    volume DOUBLE PRECISION,
    dimensions BOX,
    release_date DATE,
    is_featured BOOLEAN DEFAULT FALSE,
    rating FLOAT,
    warranty_period INTERVAL,
    metadata JSON,
    tags TEXT[]
);

CREATE TABLE documents (
    document_id UUID PRIMARY KEY,
    title VARCHAR(200) NOT NULL,
    content TEXT,
    summary TEXT,
    publication_date TIMESTAMP WITHOUT TIME ZONE,
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    author_id BIGINT,
    file_data BYTEA,
    xml_content XML,
    json_metadata JSON,
    reading_time INTERVAL,
    is_public BOOLEAN DEFAULT TRUE,
    views_count INTEGER DEFAULT 0,
    category VARCHAR(50),
    tags TEXT[]
);

步驟二:準備Kafka資源

  1. 購買和部署Kafka執行個體

  2. 將Flink工作空間所屬的網段添加至Kafka的白名單中。

  3. 在Kafka執行個體中建立資源

步驟三:建立Flink作業

  1. 登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  2. 在左側導覽列,單擊資料開發 > ETL

  3. 單擊頂部功能表列建立,選擇空白的流作業草稿,單擊下一步,在建立檔案草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    adbpg-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

    vvr-6.0.7-flink-1.15

  4. 單擊建立

步驟四:編寫作業代碼並部署

  1. 在Flink工作空間內編寫SQL作業。將以下作業代碼拷貝到作業文本編輯區,並將相關配置替換為實際值。

    -- 使用一個source實現多表採集
    CREATE TEMPORARY TABLE ADBPGSource(
        table_name STRING METADATA FROM 'table_name' VIRTUAL,
        row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING
    ) WITH (
      'connector' = 'adbpg-cdc',
      'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com',
      'port' = '5432',
      'username' = 'account****',
      'password' = 'password****',
      'database-name' = 'testdb',
      'schema-name' = 'public',
      'table-name' = '(products|documents)',
      'slot.name' = 'flink',
      'decoding.plugin.name' = 'pgoutput',
      'debezium.snapshot.mode' = 'never'
    );
    
    CREATE TEMPORARY TABLE KafkaProducts (
        product_id BIGINT,
        product_name STRING,
        sku STRING,
        description STRING,
        price STRING,
        discount_price STRING,
        stock_quantity INT,
        weight STRING,
        volume STRING,
        dimensions STRING,
        release_date STRING,
        is_featured BOOLEAN,
        rating FLOAT,
        warranty_period STRING,
        metadata STRING,
        tags STRING,
        PRIMARY KEY(product_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    CREATE TEMPORARY TABLE KafkaDocuments (
        document_id STRING,
        title STRING,
        content STRING,
        summary STRING,
        publication_date STRING,
        last_updated STRING,
        author_id BIGINT,
        file_data STRING,
        xml_content STRING,
        json_metadata STRING,
        reading_time STRING,
        is_public BOOLEAN,
        views_count INT,
        category STRING,
        tags STRING,
        PRIMARY KEY(document_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = '****',
      'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092',
      'key.format'='avro',
      'value.format'='avro'
    );
    
    -- 使用 STATEMENT SET 封裝多條語句
    BEGIN STATEMENT SET;
    -- 使用中繼資料 table_name 路由到目標表
    INSERT INTO KafkaProducts
    SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags
    FROM ADBPGSource
    WHERE table_name = 'products';
    
    INSERT INTO KafkaDocuments
    SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags
    FROM ADBPGSource
    WHERE table_name = 'documents';
    
    END;

    對於此SQL作業需要關注以下幾點:

    • 對於多表同步的任務,建議參考此SQL使用一個source表實現多表採集,需要在此source表中定義全部源表的列(重名列保留一個);向目標表寫入時通過METADATAtable_name 來路由到指定表。這種做法的優勢在於,僅需要在AnalyticDB for PostgreSQL中建立一個Replication Slot,能降低對源庫資源的佔用,提升同步效能,便於後期維護。

    • 使用table-name參數來指定多張源表,表名放在括弧中,使用|符號分割, 例如(table1|table2|table3)

    • debezium.snapshot.mode配置為never表示只同步源表增量資料;若需要同步全量及增量資料,修改配置為initial

  2. 在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

  3. 單擊部署,單擊確定

  4. 單擊右上方前往營運,在作業營運頁面,單擊啟動

步驟五:插入測試資料

AnalyticDB for PostgreSQL執行個體中分別對兩張源表進行資料更新,並觀察Kafka Topic中的訊息變化

您可以參考以下SQL插入測試資料:

INSERT INTO products (
    product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
    '測試商品', 'Test-2025', '一條測試商品資料', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"測試1", "測試2"}'
);

相關文檔

Flink即時讀寫全量資料,訂閱AnalyticDB for PostgreSQL的全量資料。