This topic describes how to use the PolarDB for Oracle 1.0 connector.
Background information
PolarDB for PostgreSQL(Compatible with Oracle) is a new generation of PolarDB that is developed by Alibaba Cloud. PolarDB for PostgreSQL(Compatible with Oracle) decouples computing from storage and uses integrated software and hardware. PolarDB for PostgreSQL(Compatible with Oracle) is a secure and reliable database service that provides auto scaling, high performance, and mass storage. This service is highly compatible with Oracle.
The following table describes the capabilities supported by the PolarDB for Oracle 1.0 connector.
Item | Description |
Table type | Sink table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric |
Note For more information about the metrics, see Metrics. |
API type | SQL API |
Data update or deletion in a sink table | Supported |
Prerequisites
A PolarDB for Oracle 1.0 cluster is created and a table is created. For more information, see Create a PolarDB for PostgreSQL (Compatible with Oracle) cluster and Create a table.
A whitelist of PolarDB for Oracle 1.0 is configured. For more information, see Configure a whitelist for a cluster.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.5 or later supports the PolarDB for Oracle 1.0 connector.
Syntax
CREATE TABLE polardbo_table (
id INT,
len INT,
content VARCHAR,
PRIMARY KEY(id)
) WITH (
'connector'='polardbo',
'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
'tableName'='<yourDatabaseTableName>',
'userName'='<yourDatabaseUserName>',
'password'='<yourDatabasePassword>'
);
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The type of the table. | STRING | Yes | No default value | Set the value to polardbo. |
url | The Java Database Connectivity (JDBC) URL of the database. | STRING | Yes | No default value | The URL is in the |
tableName | The name of the table in the database. | STRING | Yes | No default value | N/A. |
userName | The username that is used to access the database. | STRING | Yes | No default value | N/A. |
password | The password that is used to access the database. | STRING | Yes | No default value | To prevent password leaks, we recommend that you use the key management method to specify your password. For more information, see Manage variables and keys. |
maxRetryTimes | The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. | INTEGER | No | 3 | N/A. |
targetSchema | The name of the schema. | STRING | No | public | N/A. |
caseSensitive | Specifies whether to enable case sensitivity. | STRING | No | false | Valid values:
|
connectionMaxActive | The maximum number of connections in the connection pool. | INTEGER | No | 5 | The system automatically releases idle connections to the database service. Important If this parameter is set to an excessively large value, the number of server connections may be abnormal. |
retryWaitTime | The interval between retries. | INTEGER | No | 100 | Unit: milliseconds. |
batchSize | The number of data records that can be written to the table at a time. | INTEGER | No | 500 | N/A. |
flushIntervalMs | The interval at which the cache is cleared. | INTEGER | No | No default value | Unit: milliseconds. If the number of cached data records does not reach the upper limit within the specified period of time, all cached data is written to the sink table. |
writeMode | The write mode in which the system attempts to write data to the table for the first time. | STRING | No | insert | Valid values:
|
conflictMode | The policy based on which a primary key conflict or index conflict is handled when data is inserted into a table. | STRING | No | strict | Valid values:
|
Data type mappings
The following table provides the data type mappings from Flink fields to fields of PolarDB for Oracle 1.0 when the PolarDB for Oracle 1.0 connector is used for a sink table.
Data type of PolarDB for Oracle 1.0 | Data type of Flink |
BOOLEAN | BOOLEAN |
INT | INT |
NUMBER | BIGINT |
NUMBER | DOUBLE |
VARCHAR | VARCHAR |
TIMESTAMP | TIMESTAMP |
VARCHAR | DATE |
Sample code
Sample code for a sink table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) COMMENT 'datagen source table' WITH ( 'connector' = 'datagen' ); CREATE TABLE polardbo_sink ( name VARCHAR, age INT ) WITH ( 'connector'='polardbo', 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>', 'tableName'='<yourDatabaseTableName>', 'userName'='<yourDatabaseUserName>', 'password'='<yourDatabasePassword>' ); INSERT INTO polardbo_sink SELECT * FROM datagen_source;