本文為您介紹如何使用AnalyticDB PostgreSQL連接器。
背景資訊
雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。
AnalyticDB PostgreSQL版支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表(公測中)、維表和結果表 說明 源表目前尚未內建,需通過自訂連接器的進行讀取,具體的使用方法請參見Flink CDC即時訂閱全量和增量資料。 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立AnalyticDB PostgreSQL執行個體並建立表,詳情請參見建立執行個體和CREATE TABLE。
已設定白名單,詳情請參見設定白名單。
使用限制
僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL7.0版本。
暫不支援自建的PostgreSQL。
文法結構
CREATE TEMPORARY TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName',
'tableName'='yourDatabaseTableName',
'userName'='yourDatabaseUserName',
'password'='yourDatabasePassword'
);WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 是 | 無 |
|
url | JDBC串連地址。 | String | 是 | 無 | 格式為 |
tableName | 表名。 | String | 是 | 無 | 無。 |
userName | 使用者名稱。 | String | 是 | 無 | 無。 |
password | 密碼。 | String | 是 | 無 | 無。 |
maxRetryTimes | 寫入資料失敗後,重試寫入的最大次數。 | Integer | 否 | 3 | 無。 |
targetSchema | Schema名稱。 | String | 否 | public | 無。 |
caseSensitive | 大小寫是否敏感。 | String | 否 | false | 參數取值如下:
|
connectionMaxActive | 串連池的最大串連數。 | Integer | 否 | 5 | 系統會自動釋放與資料庫服務的空閑串連。 重要 此參數設定過大可能會導致服務端串連數出現異常。 |
源表專屬(公測中)
參數 | 說明 | 資料類型 | 是否必填 | 備忘 |
schema-name | Schema名稱。 | STRING | 是 | 該參數支援Regex,可以一次訂閱多個Schema。 |
port | AnalyticDB for PostgreSQL連接埠。 | INTEGER | 是 | 固定值為5432。 |
decoding.plugin.name | Postgres Logical Decoding外掛程式名稱。 | STRING | 是 | 固定值為pgoutput。 |
slot.name | 邏輯解碼槽的名字。 | STRING | 是 |
|
debezium.* | 細粒度地控制Debezium用戶端的行為。 | STRING | 否 | 例如,設定 |
scan.incremental.snapshot.enabled | 是否開啟增量快照。 | BOOLEAN | 否 | 取值如下:
|
scan.startup.mode | 消費資料時的啟動模式。 | STRING | 否 | 取值如下:
|
changelog-mode | 用於編碼流更改的變更日誌(Changelog)模式。 | STRING | 否 | 取值如下:
|
heartbeat.interval.ms | 發送心跳包的時間間隔。 | DURATION | 否 | 預設值為30秒(單位:毫秒)。 AnalyticDB for PostgreSQLCDC連接器通過主動向資料庫發送心跳包,確保Slot的位移量能夠持續推進。在表資料變更不頻繁的情況下,合理設定該參數可以及時清理WAL日誌,避免浪費磁碟空間。 |
scan.incremental.snapshot.chunk.key-column | 指定某一列作為快照階段分區的切分列。 | STRING | 否 | 預設情況下會從主鍵中選擇第一列。 |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
retryWaitTime | 重試的時間間隔。 | Integer | 否 | 100 | 單位毫秒。 |
batchSize | 一次批量寫入的資料條數。 | Integer | 否 | 500 | 無。 |
flushIntervalMs | 清空緩衝的時間間隔。 | Integer | 否 | 無 | 如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。單位毫秒。 |
writeMode | 第一次嘗試寫入時的寫入方式。 | String | 否 | insert | 參數取值如下:
|
conflictMode | 當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。 | String | 否 | strict | 參數取值如下:
|
維表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
maxJoinRows | 單行資料Join的最多行數。 | Integer | 否 | 1024 | 無。 |
cache | 緩衝策略。 | String | 否 | ALL | 支援以下三種緩衝策略:
|
cacheSize | 緩衝大小,即緩衝多少行資料。 | Long | 否 | 100000 | 僅當選擇LRU緩衝策略時,cacheSize參數生效。 |
cacheTTLMs | 緩衝失效的逾時時間。 | Long | 否 | Long.MAX_VALUE | cacheTTLMs配置和cache配置有關:
單位為毫秒。 |
類型映射
雲原生資料倉儲AnalyticDB PostgreSQL版欄位類型 | Flink欄位類型 |
boolean | boolean |
smallint | int |
int | int |
bigint | bigint |
float | double |
varchar | varchar |
text | varchar |
timestamp | timestamp |
date | date |
使用樣本
源表(公測中)
使用樣本詳情請參見Flink CDC即時訂閱全量和增量資料。
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName', 'tableName'='yourDatabaseTableName', 'userName'='yourDatabaseUserName', 'password'='yourDatabasePassword' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TEMPORARY TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://yourAddress:yourPortId/yourDatabaseName', 'tableName'='yourDatabaseTableName', 'userName'='yourDatabaseUserName', 'password'='yourDatabasePassword' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;