AnalyticDB for PostgreSQL提供自研的CDC連接器,基於PostgreSQL的邏輯複製功能實現訂閱全量和增量資料,可與Flink無縫整合。該連接器能夠高效捕獲源表的即時變更資料,支援即時資料同步、串流等,助力企業快速響應動態資料需求。本文介紹如何通過阿里雲Realtime ComputeFlink版CDC即時訂閱AnalyticDB for PostgreSQL的全量和增量資料。
使用限制
核心版本為7.2.1.4及以上的AnalyticDB for PostgreSQL7.0版執行個體。
暫不支援AnalyticDB for PostgreSQL的Serverless模式。
前提條件
AnalyticDB for PostgreSQL執行個體和Flink全託管工作空間位於同一VPC。
調整AnalyticDB for PostgreSQL執行個體參數配置:
開啟邏輯解析,即設定
wal_level參數的值為logical。若您的AnalyticDB for PostgreSQL執行個體系列為高可用版,為保障遷移任務的順利進行,避免因主備切換而導致邏輯訂閱中斷,還需將
hot_standby、hot_standby_feedback和sync_replication_slots參數的值均配置為on。
AnalyticDB for PostgreSQL帳號是初始帳號或高許可權使用者RDS_SUPERUSER,且需要賦權REPLICATION,
ALTER USER <username> WITH REPLICATION;。已將Flink工作空間所屬的網段添加至AnalyticDB for PostgreSQL的白名單。
下載flink-sql-connector-adbpg-cdc-3.3.jar,並在Flink工作空間內上傳CDC連接器。
操作步驟
步驟一:準備測試表和測試資料
登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台,找到目標執行個體,單擊執行個體ID。
在基本資料頁面的右側下,單擊登入資料庫。
建立測試資料庫和源表adbpg_source_table,並向源表寫入50條資料。
-- 建立測試資料庫 CREATE DATABASE testdb; -- 切換至testdb資料庫並建立schema CREATE SCHEMA testschema; -- 建立源表adbpg_source_table CREATE TABLE testschema.adbpg_source_table( id int, username text, PRIMARY KEY(id) ); -- 向adbpg_source_table的表中寫入50條資料 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);建立結果表adbpg_sink_table,用於Flink寫入結果資料。
CREATE TABLE testschema.adbpg_sink_table( id int, username text, score int );
步驟二:建立Flink作業
登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊。
單擊頂部功能表列建立,選擇空白的流作業草稿,單擊下一步,在建立檔案草稿對話方塊,填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
adbpg-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊
表徵圖,建立子檔案夾。作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.7-flink-1.15
單擊建立。
步驟三:編寫作業代碼並部署
建立資料來源datagen_source(用於產生類比資料)和source_adbpg(用於從 AnalyticDB for PostgreSQL資料庫中捕獲即時的資料變更),將兩個資料來源關聯,並將結果寫入結果表sink_adbpg(將處理後的資料寫入AnalyticDB for PostgreSQL)。
將以下作業代碼拷貝到作業文本編輯區。
---建立datagen源表,使用datagen連接器產生流式資料 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --建立adbpg源表,使用adbpg-cdc連接器,通過slot.name和邏pgoutput捕獲adbpg_source_table的資料變更 CREATE TEMPORARY TABLE source_adbpg( id int, username varchar, PRIMARY KEY(id) NOT ENFORCED ) WITH( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'testschema', 'table-name' = 'adbpg_source_table', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput' ); --建立adbpg結果表,將處理後的結果寫入資料庫的目標表adbpg_sink_table。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-bp16v8cgx46ns****-master.gpdb.rds.aliyuncs.com:5432/testdb', 'tablename' = 'testschema.adbpg_sink_table', 'username' = 'account****', 'password' = 'password****', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); -- datagen_source表和source_adbpg源表關聯後的結果寫入adbpg結果表 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN source_adbpg AS ts ON ds.id = ts.id;參數說明
參數名稱
是否必填
資料類型
描述
connector
是
STRING
連線類型。源表固定值為
adbpg-cdc,結果表固定值為adbpg。hostname
是
STRING
AnalyticDB for PostgreSQL執行個體的內網地址。您可在目標執行個體的基本資料頁簽下擷取內網地址。
username
是
STRING
AnalyticDB for PostgreSQL資料庫的帳號和密碼。
password
是
STRING
database-name
是
STRING
資料庫名稱。
schema-name
是
STRING
Schema名稱。該參數支援Regex,可以一次訂閱多個Schema。
table-name
是
STRING
表名稱。該參數支援Regex,可以一次訂閱多個表。
port
是
INTEGER
AnalyticDB for PostgreSQL連接埠,固定值為5432。
decoding.plugin.name
是
STRING
Postgres Logical Decoding外掛程式名稱。固定值為pgoutput。
slot.name
是
STRING
邏輯解碼槽的名字。
對於同一個Flink作業涉及的源表,建議使用相同的
slot.name。如果不同的Flink作業涉及同一張表,則建議為每個作業分別設定獨立的
slot.name參數,以避免出現以下錯誤:PSQLException: ERROR: replication slot "debezium" is active for PID 974。
debezium.*
否
STRING
該參數可以更細粒度地控制Debezium用戶端的行為。例如,設定
'debezium.snapshot.mode' = 'never'可以禁用快照功能。您可以通過配置屬性擷取更多配置詳情。scan.incremental.snapshot.enabled
否
BOOLEAN
是否開啟增量快照,取值如下。
false(預設值):不開啟增量快照。
true:開啟增量快照。
scan.startup.mode
否
STRING
消費資料時的啟動模式,取值如下。
initial(預設值):在初次開機時,先掃描歷史全量資料,然後讀取最新的WAL日誌資料,實現全量與增量資料的無縫銜接。
latest-offset:在初次開機時,不掃描歷史全量資料,直接從 WAL 日誌的末尾(即最新的日誌位置)開始讀取,僅捕獲連接器啟動後的最新變更資料。
snapshot:先掃描歷史全量資料,同時讀取全量階段新產生的 WAL 日誌,最終作業會在完成全量掃描後停止運行。
changelog-mode
否
STRING
用於編碼流更改的變更日誌(Changelog)模式,取值如下。
ALL(預設值):支援所有操作類型,包括
INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。UPSERT:僅支援
UPSERT類型的操作,包括INSERT、DELETE和UPDATE_AFTER。
heartbeat.interval.ms
否
DURATION
發送心跳包的時間間隔,預設值為30秒(單位:毫秒)。
AnalyticDB for PostgreSQLCDC連接器通過主動向資料庫發送心跳包,確保Slot的位移量能夠持續推進。在表資料變更不頻繁的情況下,合理設定該參數可以及時清理WAL日誌,避免浪費磁碟空間。
scan.incremental.snapshot.chunk.key-column
否
STRING
指定某一列作為快照階段分區的切分列,預設情況下會從主鍵中選擇第一列。
url
是
STRING
格式為
jdbc:postgresql://<Address>:<PortId>/<DatabaseName>。在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
單擊部署,單擊確定。
單擊右上方前往營運,在作業營運頁面,單擊啟動。
步驟四:查看Flink寫入資料
在測試資料庫中執行如下語句,查看Flink寫入的資料。
SELECT * FROM testschema.adbpg_sink_table; SELECT COUNT(*) FROM testschema.adbpg_sink_table;在源表中新增50條資料,並在結果表中查看Flink寫入的增量資料的總數。
-- 源表寫入50條增量資料 INSERT INTO testschema.adbpg_source_table(id, username) SELECT i, 'username'||i::text FROM generate_series(51, 100) AS t(i); -- 檢查目標表新增資料 SELECT COUNT(*) FROM testschema.adbpg_sink_table where id > 50;結果如下。
count ------- 50 (1 row)
注意事項
請及時管理Replication Slot,以避免磁碟空間的浪費。
為了防止Flink作業重啟過程中因Checkpoint對應的WAL(Write-Ahead Log)被清理而導致資料丟失,Flink不會自動刪除Replication Slot。因此,如果確認某個Flink作業不再需要重啟,請手動刪除其對應的Replication Slot,以釋放其佔用的資源。此外,如果Replication Slot的確認位點長時間未推進,AnalyticDB for PostgreSQL將無法清理該位點之後的WAL條目,可能會導致未使用的WAL資料持續積累,佔用大量磁碟空間。
在AnalyticDB for PostgreSQL執行個體正常運行期間,能保證Exactly Once資料處理語義。但在故障情境下,僅能提供至少一次(At-Least Once)的語義保障。
訂閱表的REPLICA IDENTITY參數會被CDC連接器修改為
FULL,以保障該表資料同步的一致性,影響如下:磁碟空間佔用增加。在頻繁更新或刪除操作的情境下,該設定將增加WAL日誌的大小,導致磁碟空間佔用增加。
寫操作效能下降。在高並發寫入情境下,效能可能會受到明顯影響。
檢查點壓力增大。更大的WAL日誌意味著檢查點(Checkpoint)需要處理更多的資料,可能導致檢查點時間延長。
最佳實務
Flink CDC支援通過Flink SQL API或DataStream API進行作業開發,可實現對源庫單表或多表的全量與增量一體化的資料同步;此外,它還支援對異構資料來源進行雙流Join等計算。在整個資料處理過程中,Flink架構能夠確保精確一次(Exactly-once)的事件處理語義。然而,它不適合對PostgreSQL係數據庫進行全庫同步,一方面是因為它不支援DDL的同步,另一方面需要在Flink SQL中定義每張表的結構,後期維護相對複雜。
本章節將從AnalyticDB for PostgreSQL同步資料到Kafka為例,介紹Flink CDC SQL作業開發的最佳實務。在開發Flink CDC作業前,請確保已完成前提條件中的資源準備與配置。
步驟一:準備測試表
在AnalyticDB for PostgreSQL執行個體中建立兩張源表。
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
sku CHAR(12) NOT NULL,
description TEXT,
price NUMERIC(10,2) NOT NULL,
discount_price DECIMAL(10,2),
stock_quantity INTEGER DEFAULT 0,
weight REAL,
volume DOUBLE PRECISION,
dimensions BOX,
release_date DATE,
is_featured BOOLEAN DEFAULT FALSE,
rating FLOAT,
warranty_period INTERVAL,
metadata JSON,
tags TEXT[]
);
CREATE TABLE documents (
document_id UUID PRIMARY KEY,
title VARCHAR(200) NOT NULL,
content TEXT,
summary TEXT,
publication_date TIMESTAMP WITHOUT TIME ZONE,
last_updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
author_id BIGINT,
file_data BYTEA,
xml_content XML,
json_metadata JSON,
reading_time INTERVAL,
is_public BOOLEAN DEFAULT TRUE,
views_count INTEGER DEFAULT 0,
category VARCHAR(50),
tags TEXT[]
);步驟二:準備Kafka資源
將Flink工作空間所屬的網段添加至Kafka的白名單中。
步驟三:建立Flink作業
登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊。
單擊頂部功能表列建立,選擇空白的流作業草稿,單擊下一步,在建立檔案草稿對話方塊,填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
adbpg-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊
表徵圖,建立子檔案夾。作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.7-flink-1.15
單擊建立。
步驟四:編寫作業代碼並部署
在Flink工作空間內編寫SQL作業。將以下作業代碼拷貝到作業文本編輯區,並將相關配置替換為實際值。
-- 使用一個source實現多表採集 CREATE TEMPORARY TABLE ADBPGSource( table_name STRING METADATA FROM 'table_name' VIRTUAL, row_kind STRING METADATA FROM 'row_kind' VIRTUAL, product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING ) WITH ( 'connector' = 'adbpg-cdc', 'hostname' = 'gp-2zev887z58390***-master.gpdb.rds.aliyuncs.com', 'port' = '5432', 'username' = 'account****', 'password' = 'password****', 'database-name' = 'testdb', 'schema-name' = 'public', 'table-name' = '(products|documents)', 'slot.name' = 'flink', 'decoding.plugin.name' = 'pgoutput', 'debezium.snapshot.mode' = 'never' ); CREATE TEMPORARY TABLE KafkaProducts ( product_id BIGINT, product_name STRING, sku STRING, description STRING, price STRING, discount_price STRING, stock_quantity INT, weight STRING, volume STRING, dimensions STRING, release_date STRING, is_featured BOOLEAN, rating FLOAT, warranty_period STRING, metadata STRING, tags STRING, PRIMARY KEY(product_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); CREATE TEMPORARY TABLE KafkaDocuments ( document_id STRING, title STRING, content STRING, summary STRING, publication_date STRING, last_updated STRING, author_id BIGINT, file_data STRING, xml_content STRING, json_metadata STRING, reading_time STRING, is_public BOOLEAN, views_count INT, category STRING, tags STRING, PRIMARY KEY(document_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '****', 'properties.bootstrap.servers' = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092', 'key.format'='avro', 'value.format'='avro' ); -- 使用 STATEMENT SET 封裝多條語句 BEGIN STATEMENT SET; -- 使用中繼資料 table_name 路由到目標表 INSERT INTO KafkaProducts SELECT product_id,product_name,sku,description,price,discount_price,stock_quantity,weight,volume,dimensions,release_date,is_featured,rating,warranty_period,metadata,tags FROM ADBPGSource WHERE table_name = 'products'; INSERT INTO KafkaDocuments SELECT document_id,title,content,summary,publication_date,last_updated,author_id,file_data,xml_content,json_metadata,reading_time,is_public,views_count,category,tags FROM ADBPGSource WHERE table_name = 'documents'; END;對於此SQL作業需要關注以下幾點:
對於多表同步的任務,建議參考此SQL使用一個source表實現多表採集,需要在此source表中定義全部源表的列(重名列保留一個);向目標表寫入時通過
METADATAtable_name 來路由到指定表。這種做法的優勢在於,僅需要在AnalyticDB for PostgreSQL中建立一個Replication Slot,能降低對源庫資源的佔用,提升同步效能,便於後期維護。使用
table-name參數來指定多張源表,表名放在括弧中,使用|符號分割, 例如(table1|table2|table3)。debezium.snapshot.mode配置為never表示只同步源表增量資料;若需要同步全量及增量資料,修改配置為initial。
在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
單擊部署,單擊確定。
單擊右上方前往營運,在作業營運頁面,單擊啟動。
步驟五:插入測試資料
在AnalyticDB for PostgreSQL執行個體中分別對兩張源表進行資料更新,並觀察Kafka Topic中的訊息變化。
您可以參考以下SQL插入測試資料:
INSERT INTO products (
product_name, sku, description, price, discount_price, stock_quantity, weight, volume, dimensions, release_date, is_featured, rating, warranty_period, metadata, tags
) VALUES (
'測試商品', 'Test-2025', '一條測試商品資料', 299.99, 279.99, 150, 50.5, 120.75, '(10,20),(30,40)', '2023-05-01', TRUE, 4.8, INTERVAL '1 year', '{"brand": "TechCo", "model": "X1"}', '{"測試1", "測試2"}'
);相關文檔
Flink即時讀寫全量資料,訂閱AnalyticDB for PostgreSQL的全量資料。