本文为您介绍如何使用PolarDB PostgreSQL版(Oracle语法兼容1.0)连接器。
背景信息
PolarDB PostgreSQL版(兼容Oracle)是阿里巴巴自研的新一代云原生数据库,在存储计算分离架构下,利用了软硬件结合的优势,为您提供具备极致弹性、高性能、海量存储、安全可靠的数据库服务,高度兼容Oracle。
PolarDB PostgreSQL版(Oracle语法兼容1.0)连接器支持的信息如下。
类别 | 详情 |
支持类型 | 结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
已创建PolarDB PostgreSQL版(Oracle语法兼容1.0)实例并创建表,详情请参见创建PolarDB PostgreSQL版(兼容Oracle)集群和创建表。
已设置白名单,详情请参见设置集群白名单。
使用限制
仅Flink实时计算引擎VVR 8.0.5及以上版本支持PolarDB PostgreSQL版(Oracle语法兼容1.0)连接器。
语法结构
CREATE TABLE polardbo_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='polardbo',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
WITH参数
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 | 固定值为polardbo。 |
url | JDBC连接地址。 | String | 是 | 无 | 格式为 |
tableName | 表名。 | String | 是 | 无 | 无。 |
userName | 用户名。 | String | 是 | 无 | 无。 |
password | 密码。 | String | 是 | 无 | 为了避免您的密码信息泄露,建议您通过密钥管理的方式填写密码取值,详情请参见密钥管理。 |
maxRetryTimes | 写入数据失败后,重试写入的最大次数。 | Integer | 否 | 3 | 无。 |
targetSchema | Schema名称。 | String | 否 | public | 无。 |
caseSensitive | 大小写是否敏感。 | String | 否 | false | 参数取值如下:
|
connectionMaxActive | 连接池的最大连接数。 | Integer | 否 | 5 | 系统会自动释放与数据库服务的空闲连接。 重要 此参数设置过大可能会导致服务端连接数出现异常。 |
retryWaitTime | 重试的时间间隔。 | Integer | 否 | 100 | 单位毫秒。 |
batchSize | 一次批量写入的数据条数。 | Integer | 否 | 500 | 无。 |
flushIntervalMs | 清空缓存的时间间隔。 | Integer | 否 | 无 | 单位毫秒。如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。 |
writeMode | 第一次尝试写入时的写入方式。 | String | 否 | insert | 参数取值如下:
|
conflictMode | 当Insert写入出现主键冲突或者唯一索引冲突时的处理策略。 | String | 否 | strict | 参数取值如下:
|
类型映射
下面是将connector用于结果表的场景下,Flink字段到PolarDB PostgreSQL(Oracle兼容1.0)字段的映射关系。
PolarDB PostgreSQL版(Oracle语法兼容1.0)字段类型 | Flink字段类型 |
boolean | boolean |
int | int |
number | bigint |
number | double |
varchar | varchar |
timestamp | timestamp |
varchar | date |
使用示例
结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE polardbo_sink ( name VARCHAR, age INT ) WITH ( 'connector'='polardbo', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO polardbo_sink SELECT * FROM datagen_source;