全部產品
Search
文件中心

Realtime Compute for Apache Flink:PolarDB PostgreSQL版(Oracle文法相容1.0)

更新時間:Jul 13, 2024

本文為您介紹如何使用PolarDB PostgreSQL版(Oracle文法相容1.0)連接器。

背景資訊

PolarDB PostgreSQL版(相容Oracle)是阿里巴巴自研的新一代雲原生資料庫,在儲存計算分離架構下,利用了軟硬體結合的優勢,為您提供具備極致彈性、高效能、海量儲存、安全可靠的資料庫服務,高度相容Oracle。

PolarDB PostgreSQL版(Oracle文法相容1.0)連接器支援的資訊如下。

類別

詳情

支援類型

結果表

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

  • 結果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

僅FlinkRealtime Compute引擎VVR 8.0.5及以上版本支援PolarDB PostgreSQL版(Oracle文法相容1.0)連接器。

文法結構

CREATE TABLE polardbo_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='polardbo',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

固定值為polardbo。

url

JDBC串連地址。

String

格式為jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

tableName

表名。

String

無。

userName

使用者名稱。

String

無。

password

密碼。

String

為了避免您的密碼資訊泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變數和密鑰管理

maxRetryTimes

寫入資料失敗後,重試寫入的最大次數。

Integer

3

無。

targetSchema

Schema名稱。

String

public

無。

caseSensitive

大小寫是否敏感。

String

false

參數取值如下:

  • true:大小寫敏感。

  • false(預設值):大小寫不敏感。

connectionMaxActive

串連池的最大串連數。

Integer

5

系統會自動釋放與資料庫服務的空閑串連。

重要

此參數設定過大可能會導致服務端串連數出現異常。

retryWaitTime

重試的時間間隔。

Integer

100

單位毫秒。

batchSize

一次批量寫入的資料條數。

Integer

500

無。

flushIntervalMs

清空緩衝的時間間隔。

Integer

單位毫秒。如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

writeMode

第一次嘗試寫入時的寫入方式。

String

insert

參數取值如下:

  • insert(預設值):直接插入,衝突時參考conflictMode。

  • upsert:衝突時自動update,只能用於有主鍵的表。

conflictMode

當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。

String

strict

參數取值如下:

  • strict(預設值):衝突時報錯。

  • ignore:衝突時忽略。

  • update:衝突時自動更新,可用於無主鍵表,執行效率較低。

類型映射

下面是將connector用於結果表的情境下,Flink欄位到PolarDB PostgreSQL(Oracle相容1.0)欄位的映射關係。

PolarDB PostgreSQL版(Oracle文法相容1.0)欄位類型

Flink欄位類型

boolean

boolean

int

int

number

bigint

number

double

varchar

varchar

timestamp

timestamp

varchar

date

使用樣本

  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE polardbo_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='polardbo',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO polardbo_sink
    SELECT * FROM datagen_source;