全部產品
Search
文件中心

Realtime Compute for Apache Flink:雲原生資料倉儲AnalyticDB PostgreSQL版(ADB PG)

更新時間:Jul 13, 2024

本文為您介紹如何使用AnalyticDB PostgreSQL連接器。

背景資訊

雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。

AnalyticDB PostgreSQL版支援的資訊如下。

類別

詳情

支援類型

維表和結果表

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

  • 結果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • 維表:無

說明

指標含義詳情,請參見監控指標說明

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

使用限制

  • 僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。

  • 僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。

  • 暫不支援自建的Postgres SQL。

文法結構

CREATE TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

WITH參數

類型

參數

說明

資料類型

是否必填

預設值

備忘

通用

connector

表類型。

String

固定值為adbpg。

url

JDBC串連地址。

String

格式為jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

tableName

表名。

String

無。

userName

使用者名稱。

String

無。

password

密碼。

String

無。

maxRetryTimes

寫入資料失敗後,重試寫入的最大次數。

Integer

3

無。

targetSchema

Schema名稱。

String

public

無。

caseSensitive

大小寫是否敏感。

String

false

參數取值如下:

  • true:大小寫敏感。

  • false(預設值):大小寫不敏感。

connectionMaxActive

串連池的最大串連數。

Integer

5

系統會自動釋放與資料庫服務的空閑串連。

重要

此參數設定過大可能會導致服務端串連數出現異常。

結果表專屬

retryWaitTime

重試的時間間隔。

Integer

100

單位毫秒。

batchSize

一次批量寫入的資料條數。

Integer

500

無。

flushIntervalMs

清空緩衝的時間間隔。

Integer

如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。單位毫秒。

writeMode

第一次嘗試寫入時的寫入方式。

String

insert

參數取值如下:

  • insert(預設值):直接插入,衝突時參考conflictMode

  • upsert:衝突時自動update,只能用於有主鍵的表。

conflictMode

當Insert寫入出現主鍵衝突或者唯一索引衝突時的處理策略。

String

strict

參數取值如下:

  • strict(預設值):衝突時報錯。

  • ignore:衝突時忽略。

  • update:衝突時自動更新,可用於無主鍵表,執行效率較低。

  • upsert:衝突時自動更新,只能用於有主鍵表。

維表專屬

maxJoinRows

單行資料Join的最多行數。

Integer

1024

無。

cache

緩衝策略。

String

ALL

支援以下三種緩衝策略:

  • ALL(預設值):緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

  • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。

  • None:無緩衝。

cacheSize

緩衝大小,即緩衝多少行資料。

Long

100000

僅當選擇LRU緩衝策略時,cacheSize參數生效。

cacheTTLMs

緩衝失效的逾時時間。

Long

Long.MAX_VALUE

cacheTTLMs配置和cache配置有關:

  • 如果cache配置為LRU,則cacheTTLMs為緩衝失效的逾時時間。預設不到期。

  • 如果cache配置為ALL,則cacheTTLMs為設定緩衝重新載入的間隔時間,預設不重新載入。

單位為毫秒。

類型映射

雲原生資料倉儲AnalyticDB PostgreSQL版欄位類型

Flink欄位類型

boolean

boolean

smallint

int

int

int

bigint

bigint

float

double

varchar

varchar

text

varchar

timestamp

timestamp

date

date

使用樣本

  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

相關文檔