All Products
Search
Document Center

Realtime Compute for Apache Flink:PolarDB for Oracle 1.0 connector

Last Updated:Jul 17, 2024

This topic describes how to use the PolarDB for Oracle 1.0 connector.

Background information

PolarDB for PostgreSQL(Compatible with Oracle) is a new generation of PolarDB that is developed by Alibaba Cloud. PolarDB for PostgreSQL(Compatible with Oracle) decouples computing from storage and uses integrated software and hardware. PolarDB for PostgreSQL(Compatible with Oracle) is a secure and reliable database service that provides auto scaling, high performance, and mass storage. This service is highly compatible with Oracle.

The following table describes the capabilities supported by the PolarDB for Oracle 1.0 connector.

Item

Description

Table type

Sink table

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

  • Metrics for sink tables

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Metrics.

API type

SQL API

Data update or deletion in a sink table

Supported

Prerequisites

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.5 or later supports the PolarDB for Oracle 1.0 connector.

Syntax

CREATE TABLE polardbo_table (
 id INT,
 len INT,
 content VARCHAR, 
 PRIMARY KEY(id)
) WITH (
 'connector'='polardbo',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

Parameters in the WITH clause

Parameter

Description

Data type

Required

Default value

Remarks

connector

The type of the table.

STRING

Yes

No default value

Set the value to polardbo.

url

The Java Database Connectivity (JDBC) URL of the database.

STRING

Yes

No default value

The URL is in the jdbc:postgresql://<Address>:<PortId>/<DatabaseName> format.

tableName

The name of the table in the database.

STRING

Yes

No default value

N/A.

userName

The username that is used to access the database.

STRING

Yes

No default value

N/A.

password

The password that is used to access the database.

STRING

Yes

No default value

To prevent password leaks, we recommend that you use the key management method to specify your password. For more information, see Manage variables and keys.

maxRetryTimes

The maximum number of retries that are allowed to write data to the table if a data writing attempt fails.

INTEGER

No

3

N/A.

targetSchema

The name of the schema.

STRING

No

public

N/A.

caseSensitive

Specifies whether to enable case sensitivity.

STRING

No

false

Valid values:

  • true: Case sensitivity is enabled.

  • false: Case sensitivity is disabled. This is the default value.

connectionMaxActive

The maximum number of connections in the connection pool.

INTEGER

No

5

The system automatically releases idle connections to the database service.

Important

If this parameter is set to an excessively large value, the number of server connections may be abnormal.

retryWaitTime

The interval between retries.

INTEGER

No

100

Unit: milliseconds.

batchSize

The number of data records that can be written to the table at a time.

INTEGER

No

500

N/A.

flushIntervalMs

The interval at which the cache is cleared.

INTEGER

No

No default value

Unit: milliseconds. If the number of cached data records does not reach the upper limit within the specified period of time, all cached data is written to the sink table.

writeMode

The write mode in which the system attempts to write data to the table for the first time.

STRING

No

insert

Valid values:

  • insert: Data is directly inserted into the sink table. If a conflict occurs, the handling policy is determined by the conflictMode parameter. This is the default value.

  • upsert: Data in the table is automatically updated when a conflict occurs. This value is suitable only for tables that have a primary key.

conflictMode

The policy based on which a primary key conflict or index conflict is handled when data is inserted into a table.

STRING

No

strict

Valid values:

  • strict: If a conflict occurs, the system reports an error. This is the default value.

  • ignore: If a conflict occurs, the system ignores the conflict.

  • update: If a conflict occurs, the system automatically updates data in the table. This value is suitable for tables that do not have a primary key. This policy reduces the data processing efficiency.

Data type mappings

The following table provides the data type mappings from Flink fields to fields of PolarDB for Oracle 1.0 when the PolarDB for Oracle 1.0 connector is used for a sink table.

Data type of PolarDB for Oracle 1.0

Data type of Flink

BOOLEAN

BOOLEAN

INT

INT

NUMBER

BIGINT

NUMBER

DOUBLE

VARCHAR

VARCHAR

TIMESTAMP

TIMESTAMP

VARCHAR

DATE

Sample code

  • Sample code for a sink table

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE polardbo_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='polardbo',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO polardbo_sink
    SELECT * FROM datagen_source;