Postgres CDC可用於依次讀取PostgreSQL資料庫全量快照資料和變更資料,保證不多讀一條也不少讀一條資料。即使發生故障,也能採用Exactly Once方式處理。本文為您介紹如何使用Postgres CDC連接器。
背景資訊
Postgres CDC連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表 說明 您可以使用JDBC作為結果表和維表連接器。 |
運行模式 | 僅支援流模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明
|
API種類 | SQL |
是否支援更新或刪除結果表資料 | 不涉及 |
特色功能
Postgres CDC連接器接入CDC增量快照框架(Realtime Compute引擎VVR 8.0.6及以上版本)。Postgres CDC讀取歷史全量資料後,自動切換到WAL變更日誌讀取,保證不多讀也不少讀資料。即使發生故障,也能保證Exactly Once語義處理資料。Postgres CDC源表提供了並發讀取全量資料,無鎖讀取和斷點續傳的能力。
作為源表,功能與優勢詳情如下:
流批一體,支援讀取全量和增量資料,無需維護兩套流程。
支援並發讀取全量資料,效能水平擴充。
全量讀取無縫切換增量讀取,自動縮容,節省計算資源。
全量階段讀取支援斷點續傳,更穩定。
無鎖讀取全量資料,不影響線上業務。
前提條件
Postgres CDC連接器通過PostgreSQL資料庫的邏輯複製讀取CDC變更流資料,支援阿里雲RDS PostgreSQL、Amazon RDS PostgreSQL和自建PostgreSQL。
阿里雲RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上相應的配置可能有差異,請您在使用之前詳細閱讀配置Postgres文檔進行相關配置。
完成配置後確保有下列的條件:
wal_level參數的值需設定為logical,即在預寫式日誌WAL(Write-ahead logging)中增加支援邏輯編碼所需的資訊。
訂閱表的REPLICA IDENTITY為FULL(發出的插入和更新操作事件包含表中所有列的舊值),以保障該表資料同步的一致性。
說明REPLICA IDENTITY是PostgreSQL特有的表級設定,它決定了邏輯解碼外掛程式在發生(INSERT)和更新(UPDATE)事件時,是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情請參見REPLICA IDENTITY。
需要確保max_wal_senders和max_replication_slots的參數值均大於當前資料庫複寫槽已使用數與Flink作業所需要的slot數量。
確保賬戶系統許可權為SUPERUSER或者同時擁有LOGIN和REPLICATION許可權,並且具有訂閱表的SELECT許可權用於全量資料查詢。
注意事項
僅Realtime Compute引擎8.0.6及以上版本支援Postgres CDC增量快照功能。
請及時管理Replication Slot,以免出現磁碟空間浪費的問題。
為了防止在Flink作業重啟過程中由於Checkpoint對應的WAL(Write-Ahead Log)段被清除而引發資料丟失,Flink作業不會自動移除Replication Slot。因此,如果確認特定的Flink作業不會再次啟動,應當手動刪除相關的Replication Slot,以釋放其佔用的資源。另外,如果PostgreSQL的Replication Slot的確認位點長時間不向前推進,PostgreSQL不會清理該槽位點之後的WAL條目,這可能會導致未使用的WAL積累而佔用過多的磁碟空間。
開啟增量快照時,Postgres CDC連接器必須開啟Checkpoint,並且Source表必須聲明主鍵。Source多並發讀取全量資料時會建立多個臨時的Replication Slot。
不開啟增量快照讀取的PostgreSQL CDC Source僅支援單一併發,因此只需要一個全域Slot。當開啟增量快照時,PostgreSQL CDC Source在全量階段所需的最大Slot數量為
Source數量 * 並發數 + 1
。進入增量階段後,系統自動回收在全量階段建立的Slot,僅保留一個全域Slot。如果Slot數量有限,需要控制全量階段的並發數量,這樣做的缺點是會降低讀取速度。如果下遊運算元或儲存支援等冪性,可以啟用scan.incremental.snapshot.backfill.skip = true
以跳過全量階段的日誌讀取,這樣做的缺點是僅能提供至少一次(At-Least Once)的語義保證。如果SQL要做彙總、關聯等操作,不建議跳過全量階段日誌的讀取。
不開啟增量快照時,Postgres CDC連接器不支援在全表掃描階段執行Checkpoint。
不開啟增量快照時,如果您的作業在全表掃描階段觸發Checkpoint,則可能由於Checkpoint逾時導致作業Failover。因此,建議您在其他配置中配置如下參數,具體操作請參見空間管理與操作。避免在全量同步階段由於Checkpoint逾時導致Failover。
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
相關的參數說明詳情如下表所示。
參數
說明
備忘
execution.checkpointing.interval
Checkpoint的時間間隔。
單位是Duration類型,例如10min或30s。
execution.checkpointing.tolerable-failed-checkpoints
容忍Checkpoint失敗的次數。
該參數的取值與Checkpoint調度間隔時間的乘積就是允許的快照讀取時間。
說明如果表特別大,建議將該參數值配置得大一些。
restart-strategy
重啟策略。
參數取值如下:
fixed-delay:固定延遲重啟策略。
failure-rate:故障率重啟策略。
exponential-delay:指數延遲重啟策略。
詳情請參見Restart Strategies。
restart-strategy.fixed-delay.attempts
固定延遲重啟策略下,嘗試重啟的最大次數。
無。
文法結構
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);
WITH參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | connector類型。 | STRING | 是 | 無 | 固定值為 |
hostname | Postgres資料庫的IP地址或者Hostname。 | STRING | 是 | 無 | 無。 |
username | Postgres資料庫服務的使用者名稱。 | STRING | 是 | 無 | 無。 |
password | Postgres資料庫服務的密碼。 | STRING | 是 | 無 | 無。 |
database-name | 資料庫名稱。 | STRING | 是 | 無 | 資料庫名稱。 |
schema-name | Postgres Schema名稱。 | STRING | 是 | 無 | Schema名稱支援Regex以讀取多個Schema的資料。 |
table-name | Postgres表名。 | STRING | 是 | 無 | 表名支援Regex以讀取多個表的資料。 |
port | Postgres資料庫服務的連接埠號碼。 | INTEGER | 否 | 5432 | 無。 |
decoding.plugin.name | Postgres Logical Decoding外掛程式名稱。 | STRING | 否 | decoderbufs | 根據Postgres服務上安裝的外掛程式確定。支援的外掛程式列表如下:
|
slot.name | 邏輯解碼槽的名字。 | STRING | 8.0.1版本之前為非必填,從8.0.1版本開始為必填 | 8.0.1版本之前預設值為flink,從8.0.1版本開始無預設值 | 建議每個表都設定 |
debezium.* | Debezium屬性參數。 | STRING | 否 | 無 | 更細粒度控制Debezium用戶端的行為。例如 |
scan.incremental.snapshot.enabled | 是否開啟增量快照。 | BOOLEAN | 否 | false | 參數取值如下:
|
scan.startup.mode | 消費資料時的啟動模式。 | STRING | 否 | initial | 參數取值如下:
|
changelog-mode | 用於編碼流更改的變更日誌(Changelog)模式。 | String | 否 | all | 支援的Changelog模式包括:
|
heartbeat.interval.ms | 發送心跳包的時間間隔。 | Duration | 否 | 30s | 單位為毫秒。 Postgres CDC連接器主動向資料庫發送心跳包來保證推進Slot的位移量。當表變更不頻繁時,設定該值可以及時回收WAL日誌。 |
scan.incremental.snapshot.chunk.key-column | 指定某一列作為快照階段切分分區的切分列。 | STRING | 否 | 無 | 預設從主鍵中選擇第一列。 |
scan.incremental.close-idle-reader.enabled | 是否在快照結束後關閉閒置Reader。 | Boolean | 否 | false | 該配置生效需要設定 |
scan.incremental.snapshot.backfill.skip | 是否跳過全量階段的日誌讀取。 | Boolean | 否 | false | 參數取值如下:
|
類型映射
PostgreSQL和Flink欄位類型對應關係如下。
PostgreSQL欄位類型 | Flink欄位類型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
使用樣本
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;
相關文檔
Realtime ComputeFlink版支援的連接器列表,請參見支援的連接器。
將資料寫入PolarDB PostgreSQL版(Oracle文法相容1.0)結果表,請參見PolarDB PostgreSQL版(Oracle文法相容1.0)。
如果您需要讀寫RDS MySQL、PolarDB for MySQL或者自建MySQL資料庫,請使用MySQL連接器。