全部產品
Search
文件中心

Realtime Compute for Apache Flink:雲原生資料倉儲AnalyticDB PostgreSQL版(ADB PG)

更新時間:Jul 30, 2025

本文為您介紹如何使用AnalyticDB PostgreSQL連接器。

背景資訊

雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。

AnalyticDB PostgreSQL版支援的資訊如下。

類別

詳情

支援類型

源表(公測中)、維表和結果表

說明

源表目前尚未內建,需通過自訂連接器的進行讀取,具體的使用方法請參見Flink CDC即時訂閱全量和增量資料

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

  • 結果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • 維表:無

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL7.0版本。

  • 暫不支援自建的PostgreSQL。

文法結構

CREATE TEMPORARY TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
 'tableName'='yourDatabaseTableName',
 'userName'='yourDatabaseUserName',
 'password'='yourDatabasePassword'
);

WITH參數

通用

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

  • 源表固定值為adbpg-cdc。

  • 結果表和維表固定值為adbpg。

url

JDBC串連地址。

String

格式為jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

tableName

表名。

String

無。

userName

使用者名稱。

String

無。

password

密碼。

String

無。

maxRetryTimes

寫入資料失敗後,重試寫入的最大次數。

Integer

3

無。

targetSchema

Schema名稱。

String

public

無。

caseSensitive

大小寫是否敏感。

String

false

參數取值如下:

  • true:大小寫敏感。

  • false(預設值):大小寫不敏感。

connectionMaxActive

串連池的最大串連數。

Integer

5

系統會自動釋放與資料庫服務的空閑串連。

重要

此參數設定過大可能會導致服務端串連數出現異常。

源表專屬(公測中)

參數

說明

資料類型

是否必填

備忘

schema-name

Schema名稱。

STRING

該參數支援Regex,可以一次訂閱多個Schema。

port

AnalyticDB for PostgreSQL連接埠。

INTEGER

固定值為5432。

decoding.plugin.name

Postgres Logical Decoding外掛程式名稱。

STRING

固定值為pgoutput。

slot.name

邏輯解碼槽的名字。

STRING

  • 對於同一個Flink作業涉及的源表,建議使用相同的slot.name

  • 如果不同的Flink作業涉及同一張表,則建議為每個作業分別設定獨立的slot.name參數,以避免出現以下錯誤:PSQLException: ERROR: replication slot "debezium" is active for PID 974

debezium.*

細粒度地控制Debezium用戶端的行為。

STRING

例如,設定 'debezium.snapshot.mode' = 'never' 可以禁用快照功能。您可以通過配置屬性擷取更多配置詳情。

scan.incremental.snapshot.enabled

是否開啟增量快照。

BOOLEAN

取值如下:

  • false(預設值):不開啟增量快照。

  • true:開啟增量快照。

scan.startup.mode

消費資料時的啟動模式。

STRING

取值如下:

  • initial(預設值):在初次開機時,先掃描歷史全量資料,然後讀取最新的WAL日誌資料,實現全量與增量資料的無縫銜接。

  • latest-offset:在初次開機時,不掃描歷史全量資料,直接從 WAL 日誌的末尾(即最新的日誌位置)開始讀取,僅捕獲連接器啟動後的最新變更資料。

  • snapshot:先掃描歷史全量資料,同時讀取全量階段新產生的 WAL 日誌,最終作業會在完成全量掃描後停止運行。

changelog-mode

用於編碼流更改的變更日誌(Changelog)模式。

STRING

取值如下:

  • ALL(預設值):支援所有操作類型,包括 INSERTDELETEUPDATE_BEFORE 和 UPDATE_AFTER

  • UPSERT:僅支援 UPSERT 類型的操作,包括 INSERTDELETE 和 UPDATE_AFTER

heartbeat.interval.ms

發送心跳包的時間間隔。

DURATION

預設值為30秒(單位:毫秒)。

AnalyticDB for PostgreSQLCDC連接器通過主動向資料庫發送心跳包,確保Slot的位移量能夠持續推進。在表資料變更不頻繁的情況下,合理設定該參數可以及時清理WAL日誌,避免浪費磁碟空間。

scan.incremental.snapshot.chunk.key-column

指定某一列作為快照階段分區的切分列。

STRING

預設情況下會從主鍵中選擇第一列。

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

retryWaitTime

重試的時間間隔。

Integer

100

單位毫秒。

batchSize

一次批量寫入的資料條數。

Integer

500

無。

flushIntervalMs

清空緩衝的時間間隔。

Integer

如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。單位毫秒。

writeMode

第一次嘗試寫入時的寫入方式。

String

insert

參數取值如下:

  • insert(預設值):直接插入,衝突時參考conflictMode

  • upsert:衝突時自動update,只能用於有主鍵的表。

  • copy:使用COPY語句進行寫入。

    說明

    僅Realtime Compute引擎VVR 11.1及以上版本支援copy模式。

conflictMode

當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。

String

strict

參數取值如下:

  • strict(預設值):衝突時報錯。

  • ignore:衝突時忽略。

  • update:衝突時自動更新,可用於無主鍵表,執行效率較低。

  • upsert:衝突時自動更新,只能用於有主鍵表。

維表專屬

參數

說明

資料類型

是否必填

預設值

備忘

maxJoinRows

單行資料Join的最多行數。

Integer

1024

無。

cache

緩衝策略。

String

ALL

支援以下三種緩衝策略:

  • ALL(預設值):緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

  • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。

  • None:無緩衝。

cacheSize

緩衝大小,即緩衝多少行資料。

Long

100000

僅當選擇LRU緩衝策略時,cacheSize參數生效。

cacheTTLMs

緩衝失效的逾時時間。

Long

Long.MAX_VALUE

cacheTTLMs配置和cache配置有關:

  • 如果cache配置為LRU,則cacheTTLMs為緩衝失效的逾時時間。預設不到期。

  • 如果cache配置為ALL,則cacheTTLMs為設定緩衝重新載入的間隔時間,預設不重新載入。

單位為毫秒。

類型映射

雲原生資料倉儲AnalyticDB PostgreSQL版欄位類型

Flink欄位類型

boolean

boolean

smallint

int

int

int

bigint

bigint

float

double

varchar

varchar

text

varchar

timestamp

timestamp

date

date

使用樣本

  • 源表(公測中)

    使用樣本詳情請參見Flink CDC即時訂閱全量和增量資料

  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
     'tableName'='yourDatabaseTableName',
     'userName'='yourDatabaseUserName',
     'password'='yourDatabasePassword'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TEMPORARY TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
     'tableName'='yourDatabaseTableName',
     'userName'='yourDatabaseUserName',
     'password'='yourDatabasePassword'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

相關文檔