本文為您介紹如何使用PolarDB PostgreSQL版(Oracle文法相容1.0)連接器。
背景資訊
PolarDB PostgreSQL版(相容Oracle)是阿里巴巴自研的新一代雲原生資料庫,在儲存計算分離架構下,利用了軟硬體結合的優勢,為您提供具備極致彈性、高效能、海量儲存、安全可靠的資料庫服務,高度相容Oracle。
PolarDB PostgreSQL版(Oracle文法相容1.0)連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已建立PolarDB PostgreSQL版(Oracle文法相容1.0)執行個體並建立表,詳情請參見建立PolarDB PostgreSQL版(相容Oracle)叢集和建立表。
已設定白名單,詳情請參見設定叢集白名單。
使用限制
僅FlinkRealtime Compute引擎VVR 8.0.5及以上版本支援PolarDB PostgreSQL版(Oracle文法相容1.0)連接器。
文法結構
CREATE TABLE polardbo_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='polardbo',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
WITH參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 是 | 無 | 固定值為polardbo。 |
url | JDBC串連地址。 | String | 是 | 無 | 格式為 |
tableName | 表名。 | String | 是 | 無 | 無。 |
userName | 使用者名稱。 | String | 是 | 無 | 無。 |
password | 密碼。 | String | 是 | 無 | 為了避免您的密碼資訊泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變數和密鑰管理。 |
maxRetryTimes | 寫入資料失敗後,重試寫入的最大次數。 | Integer | 否 | 3 | 無。 |
targetSchema | Schema名稱。 | String | 否 | public | 無。 |
caseSensitive | 大小寫是否敏感。 | String | 否 | false | 參數取值如下:
|
connectionMaxActive | 串連池的最大串連數。 | Integer | 否 | 5 | 系統會自動釋放與資料庫服務的空閑串連。 重要 此參數設定過大可能會導致服務端串連數出現異常。 |
retryWaitTime | 重試的時間間隔。 | Integer | 否 | 100 | 單位毫秒。 |
batchSize | 一次批量寫入的資料條數。 | Integer | 否 | 500 | 無。 |
flushIntervalMs | 清空緩衝的時間間隔。 | Integer | 否 | 無 | 單位毫秒。如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。 |
writeMode | 第一次嘗試寫入時的寫入方式。 | String | 否 | insert | 參數取值如下:
|
conflictMode | 當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。 | String | 否 | strict | 參數取值如下:
|
類型映射
下面是將connector用於結果表的情境下,Flink欄位到PolarDB PostgreSQL(Oracle相容1.0)欄位的映射關係。
PolarDB PostgreSQL版(Oracle文法相容1.0)欄位類型 | Flink欄位類型 |
boolean | boolean |
int | int |
number | bigint |
number | double |
varchar | varchar |
timestamp | timestamp |
varchar | date |
使用樣本
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE polardbo_sink ( name VARCHAR, age INT ) WITH ( 'connector'='polardbo', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO polardbo_sink SELECT * FROM datagen_source;