本文為您介紹如何使用AnalyticDB PostgreSQL連接器。
背景資訊
雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。
AnalyticDB PostgreSQL版支援的資訊如下。
類別 | 詳情 |
支援類型 | 維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立AnalyticDB PostgreSQL執行個體並建立表,詳情請參見建立執行個體和CREATE TABLE。
已設定白名單,詳情請參見設定白名單。
使用限制
僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。
僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。
暫不支援自建的Postgres SQL。
文法結構
CREATE TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
WITH參數
類型 | 參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
通用 | connector | 表類型。 | String | 是 | 無 | 固定值為adbpg。 |
url | JDBC串連地址。 | String | 是 | 無 | 格式為 | |
tableName | 表名。 | String | 是 | 無 | 無。 | |
userName | 使用者名稱。 | String | 是 | 無 | 無。 | |
password | 密碼。 | String | 是 | 無 | 無。 | |
maxRetryTimes | 寫入資料失敗後,重試寫入的最大次數。 | Integer | 否 | 3 | 無。 | |
targetSchema | Schema名稱。 | String | 否 | public | 無。 | |
caseSensitive | 大小寫是否敏感。 | String | 否 | false | 參數取值如下:
| |
connectionMaxActive | 串連池的最大串連數。 | Integer | 否 | 5 | 系統會自動釋放與資料庫服務的空閑串連。 重要 此參數設定過大可能會導致服務端串連數出現異常。 | |
結果表專屬 | retryWaitTime | 重試的時間間隔。 | Integer | 否 | 100 | 單位毫秒。 |
batchSize | 一次批量寫入的資料條數。 | Integer | 否 | 500 | 無。 | |
flushIntervalMs | 清空緩衝的時間間隔。 | Integer | 否 | 無 | 如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。單位毫秒。 | |
writeMode | 第一次嘗試寫入時的寫入方式。 | String | 否 | insert | 參數取值如下:
| |
conflictMode | 當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。 | String | 否 | strict | 參數取值如下:
| |
維表專屬 | maxJoinRows | 單行資料Join的最多行數。 | Integer | 否 | 1024 | 無。 |
cache | 緩衝策略。 | String | 否 | ALL | 支援以下三種緩衝策略:
| |
cacheSize | 緩衝大小,即緩衝多少行資料。 | Long | 否 | 100000 | 僅當選擇LRU緩衝策略時,cacheSize參數生效。 | |
cacheTTLMs | 緩衝失效的逾時時間。 | Long | 否 | Long.MAX_VALUE | cacheTTLMs配置和cache配置有關:
單位為毫秒。 |
類型映射
雲原生資料倉儲AnalyticDB PostgreSQL版欄位類型 | Flink欄位類型 |
boolean | boolean |
smallint | int |
int | int |
bigint | bigint |
float | double |
varchar | varchar |
text | varchar |
timestamp | timestamp |
date | date |
使用樣本
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;