全部產品
Search
文件中心

Realtime Compute for Apache Flink:Postgres CDC(公測中)

更新時間:Sep 13, 2024

Postgres CDC可用於依次讀取PostgreSQL資料庫全量快照資料和變更資料,保證不多讀一條也不少讀一條資料。即使發生故障,也能採用Exactly Once方式處理。本文為您介紹如何使用Postgres CDC連接器。

背景資訊

Postgres CDC連接器支援的資訊如下。

類別

詳情

支援類型

源表

說明

您可以使用JDBC作為結果表和維表連接器。

運行模式

僅支援流模式

資料格式

暫不適用

特有監控指標

  • currentFetchEventTimeLag:資料產生到拉取到Source Operator的間隔。

  • currentEmitEventTimeLag:資料產生到離開Source Operator的間隔。

  • sourceIdleTime:source至今有多久不產生新資料。

說明
  • currentFetchEventTimeLag與currentEmitEventTimeLag指標僅在增量階段有效,全量階段該值恒為0。

  • 指標含義詳情,請參見監控指標說明

API種類

SQL

是否支援更新或刪除結果表資料

不涉及

特色功能

Postgres CDC連接器接入CDC增量快照框架(Realtime Compute引擎VVR 8.0.6及以上版本)。Postgres CDC讀取歷史全量資料後,自動切換到WAL變更日誌讀取,保證不多讀也不少讀資料。即使發生故障,也能保證Exactly Once語義處理資料。Postgres CDC源表提供了並發讀取全量資料,無鎖讀取和斷點續傳的能力。

作為源表,功能與優勢詳情如下:

  • 流批一體,支援讀取全量和增量資料,無需維護兩套流程。

  • 支援並發讀取全量資料,效能水平擴充。

  • 全量讀取無縫切換增量讀取,自動縮容,節省計算資源。

  • 全量階段讀取支援斷點續傳,更穩定。

  • 無鎖讀取全量資料,不影響線上業務。

前提條件

Postgres CDC連接器通過PostgreSQL資料庫的邏輯複製讀取CDC變更流資料,支援阿里雲RDS PostgreSQLAmazon RDS PostgreSQL自建PostgreSQL

重要

阿里雲RDS PostgreSQLAmazon RDS PostgreSQL或者自建PostgreSQL上相應的配置可能有差異,請您在使用之前詳細閱讀配置Postgres文檔進行相關配置

完成配置後確保有下列的條件:

  • wal_level參數的值需設定為logical,即在預寫式日誌WAL(Write-ahead logging)中增加支援邏輯編碼所需的資訊。

  • 訂閱表的REPLICA IDENTITY為FULL(發出的插入和更新操作事件包含表中所有列的舊值),以保障該表資料同步的一致性。

    說明

    REPLICA IDENTITY是PostgreSQL特有的表級設定,它決定了邏輯解碼外掛程式在發生(INSERT)和更新(UPDATE)事件時,是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情請參見REPLICA IDENTITY

  • 需要確保max_wal_senders和max_replication_slots的參數值均大於當前資料庫複寫槽已使用數與Flink作業所需要的slot數量。

  • 確保賬戶系統許可權為SUPERUSER或者同時擁有LOGIN和REPLICATION許可權,並且具有訂閱表的SELECT許可權用於全量資料查詢。

注意事項

  • 僅Realtime Compute引擎8.0.6及以上版本支援Postgres CDC增量快照功能。

  • 請及時管理Replication Slot,以免出現磁碟空間浪費的問題。

    為了防止在Flink作業重啟過程中由於Checkpoint對應的WAL(Write-Ahead Log)段被清除而引發資料丟失,Flink作業不會自動移除Replication Slot。因此,如果確認特定的Flink作業不會再次啟動,應當手動刪除相關的Replication Slot,以釋放其佔用的資源。另外,如果PostgreSQL的Replication Slot的確認位點長時間不向前推進,PostgreSQL不會清理該槽位點之後的WAL條目,這可能會導致未使用的WAL積累而佔用過多的磁碟空間。

  • 開啟增量快照時,Postgres CDC連接器必須開啟Checkpoint,並且Source表必須聲明主鍵。Source多並發讀取全量資料時會建立多個臨時的Replication Slot。

    不開啟增量快照讀取的PostgreSQL CDC Source僅支援單一併發,因此只需要一個全域Slot。當開啟增量快照時,PostgreSQL CDC Source在全量階段所需的最大Slot數量為Source數量 * 並發數 + 1。進入增量階段後,系統自動回收在全量階段建立的Slot,僅保留一個全域Slot。如果Slot數量有限,需要控制全量階段的並發數量,這樣做的缺點是會降低讀取速度。如果下遊運算元或儲存支援等冪性,可以啟用scan.incremental.snapshot.backfill.skip = true以跳過全量階段的日誌讀取,這樣做的缺點是僅能提供至少一次(At-Least Once)的語義保證。如果SQL要做彙總、關聯等操作,不建議跳過全量階段日誌的讀取。

  • 不開啟增量快照時,Postgres CDC連接器不支援在全表掃描階段執行Checkpoint。

    不開啟增量快照時,如果您的作業在全表掃描階段觸發Checkpoint,則可能由於Checkpoint逾時導致作業Failover。因此,建議您在其他配置中配置如下參數,具體操作請參見如何配置作業運行參數?。避免在全量同步階段由於Checkpoint逾時導致Failover。

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    相關的參數說明詳情如下表所示。

    參數

    說明

    備忘

    execution.checkpointing.interval

    Checkpoint的時間間隔。

    單位是Duration類型,例如10min或30s。

    execution.checkpointing.tolerable-failed-checkpoints

    容忍Checkpoint失敗的次數。

    該參數的取值與Checkpoint調度間隔時間的乘積就是允許的快照讀取時間。

    說明

    如果表特別大,建議將該參數值配置得大一些。

    restart-strategy

    重啟策略。

    參數取值如下:

    • fixed-delay:固定延遲重啟策略。

    • failure-rate:故障率重啟策略。

    • exponential-delay:指數延遲重啟策略。

    詳情請參見Restart Strategies

    restart-strategy.fixed-delay.attempts

    固定延遲重啟策略下,嘗試重啟的最大次數。

    無。

文法結構

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

connector類型。

STRING

固定值為postgres-cdc

hostname

Postgres資料庫的IP地址或者Hostname。

STRING

無。

username

Postgres資料庫服務的使用者名稱。

STRING

無。

password

Postgres資料庫服務的密碼。

STRING

無。

database-name

資料庫名稱。

STRING

資料庫名稱。

schema-name

Postgres Schema名稱。

STRING

Schema名稱支援Regex以讀取多個Schema的資料。

table-name

Postgres表名。

STRING

表名支援Regex以讀取多個表的資料。

port

Postgres資料庫服務的連接埠號碼。

INTEGER

5432

無。

decoding.plugin.name

Postgres Logical Decoding外掛程式名稱。

STRING

decoderbufs

根據Postgres服務上安裝的外掛程式確定。支援的外掛程式列表如下:

  • decoderbufs(預設值)

  • pgoutput

  • wal2json

  • wal2json_rds

  • wal2json_streaming

  • wal2json_rds_streaming

slot.name

邏輯解碼槽的名字。

STRING

8.0.1版本之前為非必填,從8.0.1版本開始為必填

8.0.1版本之前預設值為flink,從8.0.1版本開始無預設值

建議每個表都設定slot.name參數,以避免出現PSQLException: ERROR: replication slot "debezium" is active for PID 974報錯。

debezium.*

Debezium屬性參數。

STRING

更細粒度控制Debezium用戶端的行為。例如'debezium.snapshot.mode' = 'never',詳情請參見配置屬性

scan.incremental.snapshot.enabled

是否開啟增量快照。

BOOLEAN

false

參數取值如下:

  • false(預設值):不開啟增量快照。

  • true:開啟增量快照。

說明
  • 此功能為實驗性功能。僅Realtime Compute引擎8.0.6及以上版本支援該參數。

  • 增量快照的功能優勢,前提條件和使用限制詳情請參見特色功能前提條件注意事項

scan.startup.mode

消費資料時的啟動模式。

STRING

initial

參數取值如下:

  • initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的WAL日誌資料。

  • latest-offset:在第一次啟動時,不會掃描歷史全量資料,直接從WAL日誌的末尾(最新的WAL日誌處)開始讀取,即唯讀取該連接器啟動以後的最新變更。

  • snapshot:先掃描歷史全量資料,再讀取全量階段新產生的WAL日誌,最終作業會停止。

changelog-mode

用於編碼流更改的變更日誌(Changelog)模式。

String

all

支援的Changelog模式包括:

  • ALL:支援所有類型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。

  • UPSERT:僅支援Upsert類型,包括INSERT、DELETE和UPDATE_AFTER。

heartbeat.interval.ms

發送心跳包的時間間隔。

Duration

30s

單位為毫秒。

Postgres CDC連接器主動向資料庫發送心跳包來保證推進Slot的位移量。當表變更不頻繁時,設定該值可以及時回收WAL日誌。

scan.incremental.snapshot.chunk.key-column

指定某一列作為快照階段切分分區的切分列。

STRING

預設從主鍵中選擇第一列。

scan.incremental.close-idle-reader.enabled

是否在快照結束後關閉閒置Reader。

Boolean

false

該配置生效需要設定execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

scan.incremental.snapshot.backfill.skip

是否跳過全量階段的日誌讀取。

Boolean

false

參數取值如下:

  • true:跳過。

    增量階段從低水位線開始讀取日誌。

    如果下遊運算元或儲存支援等冪性,建議跳過全量階段日誌的讀取,這樣做的優點是能夠減少WAL Slot數量,缺點是僅能提供至少一次(At-Least Once)的語義保證

  • false:不跳過。

    全量階段讀取分區時,會讀取低水位線和高水位線之間的日誌來保證一致性。

    如果SQL要做彙總、關聯等操作,不建議跳過全量階段日誌的讀取。

類型映射

PostgreSQL和Flink欄位類型對應關係如下。

PostgreSQL欄位類型

Flink欄位類型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

使用樣本

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

相關文檔