This topic describes how to use the AnalyticDB for PostgreSQL connector.
Background information
AnalyticDB for PostgreSQL is a data warehouse for massively parallel processing (MPP). It provides online analysis services for a large amount of data.
The following table describes the capabilities supported by the AnalyticDB for PostgreSQL connector.
Item | Description |
Table type | Dimension table and sink table |
Running mode | Streaming mode and batch mode. |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL |
Data update or deletion in a sink table | Supported |
Prerequisites
An AnalyticDB for PostgreSQL instance and an AnalyticDB for PostgreSQL table are created. For more information, see Create an instance and CREATE TABLE.
An IP address whitelist is configured for the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.
Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.
Self-managed PostgreSQL databases are not supported.
Syntax
CREATE TABLE adbpg_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='adbpg',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
Parameters in the WITH clause
Category | Parameter | Description | Data type | Required | Default value | Remarks |
Common parameters | connector | The type of the table. | STRING | Yes | No default value | Set the value to adbpg. |
url | The Java Database Connectivity (JDBC) URL of the database. | STRING | Yes | No default value | The URL is in the | |
tableName | The name of the table in the database. | STRING | Yes | No default value | N/A. | |
userName | The username that is used to access the AnalyticDB for PostgreSQL database. | STRING | Yes | No default value | N/A. | |
password | The password that is used to access the AnalyticDB for PostgreSQL database. | STRING | Yes | No default value | N/A. | |
maxRetryTimes | The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. | INTEGER | No | 3 | N/A. | |
targetSchema | The name of the schema. | STRING | No | public | N/A. | |
caseSensitive | Specifies whether to enable case sensitivity. | STRING | No | false | Valid values:
| |
connectionMaxActive | The maximum number of connections in the connection pool. | INTEGER | No | 5 | The system automatically releases idle connections to the database service. Important If this parameter is set to an excessively large value, the number of server connections may be abnormal. | |
Parameters only for sink tables | retryWaitTime | The interval between retries. | INTEGER | No | 100 | Unit: milliseconds. |
batchSize | The number of data records that can be written to the table at a time. | INTEGER | No | 500 | N/A. | |
flushIntervalMs | The interval at which the cache is cleared. | INTEGER | No | N/A. | If the number of cached data records does not reach the upper limit within the specified period of time, all cached data is written to the sink table. Unit: milliseconds. | |
writeMode | The write mode in which the system attempts to write data to the table for the first time. | STRING | No | insert | Valid values:
| |
conflictMode | The policy based on which a primary key conflict or index conflict is handled when data is inserted into a table. | STRING | No | strict | Valid values:
| |
Parameters only for dimension tables | maxJoinRows | The maximum number of rows to join in a row of data. | INTEGER | No | 1024 | N/A. |
cache | The cache policy. | STRING | No | ALL | Valid values:
| |
cacheSize | The maximum number of rows of data that can be cached. | LONG | No | 100000 | The cacheSize parameter takes effect only when you set the cache parameter to LRU. | |
cacheTTLMs | The cache timeout period. | LONG | No | Long.MAX_VALUE | The configuration of the cacheTTLMs parameter varies based on the cache parameter.
Unit: milliseconds. |
Data type mappings
Data type of AnalyticDB for PostgreSQL | Data type of Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
SMALLINT | INT |
INT | INT |
BIGINT | BIGINT |
FLOAT | DOUBLE |
VARCHAR | VARCHAR |
TEXT | VARCHAR |
TIMESTAMP | TIMESTAMP |
DATE | DATE |
Sample code
Sample code for a sink table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE adbpg_sink ( name VARCHAR, age INT ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO adbpg_sink SELECT * FROM datagen_source;
Sample code for a dimension table
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' }; CREATE TABLE adbpg_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='adbpg', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) COMMENT 'blackhole sink table' WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;