This topic describes how to use the Java Database Connectivity (JDBC) connector.
Background information
The JDBC connector is provided by Apache Flink and can be used to read data from and write data to common databases, such as MySQL, PostgreSQL, and Oracle. The following table describes the capabilities supported by the JDBC connector.
Item | Description |
Table type | Source table, dimension table, and result table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric | N/A |
API type | SQL API |
Data update or deletion in a result table | Supported |
Prerequisites
The database and table to which you want to connect are created.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.1 or later supports the JDBC connector.
A JDBC source table is a bounded source. After the JDBC source connector reads all data from a table in an upstream database and writes the data to a source table, the task for the JDBC source table is complete. If you want to capture real-time change data, use a Change Data Capture (CDC) connector. For more information, see Create a MySQL CDC source table and Create a PostgreSQL CDC source table (public preview).
If you use the JDBC sink connector to connect to a PostgreSQL database, make sure that the database version is PostgreSQL 9.5 or later. PostgreSQL uses the ON CONFLICT syntax to insert or update data when a primary key is specified in a DDL statement. The ON CONFLICT syntax is supported only in PostgreSQL 9.5 or later.
Realtime Compute for Apache Flink supports only the open source JDBC connector that does not include a JDBC driver for a specific database. When you use the JDBC connector, you must manually upload the JAR package of the driver of the destination database as a dependency file. For more information, see Step 3: Configure parameters on the Configurations tab. The following table describes the JDBC drivers supported by Realtime Compute for Apache Flink.
Driver
Group ID
Artifact ID
MySQL
mysql
Oracle
com.oracle.database.jdbc
PostgreSQL
org.postgresql
If you use a JDBC driver that is not included in the table, you must test the validity and availability of the JDBC driver.
When the JDBC connector writes data to a MySQL result table, the JDBC connector concatenates each received data record into an SQL statement and executes the SQL statement. In a MySQL result table that contains a primary key, the
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
statement is concatenated and executed. If a physical table contains a unique index constraint besides a primary key constraint and two records with different primary keys but the same unique index are inserted into the physical table, downstream data is overwritten due to a conflict between the unique indexes. This causes data loss.
Syntax
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
STRING
Yes
No default value
Set the value to jdbc.
url
The URL of the database.
STRING
Yes
No default value
N/A.
table-name
The name of the JDBC table.
STRING
Yes
No default value
N/A.
username
The name of the JDBC user.
STRING
No
No default value
If you configure one of the username and password parameters, you must also configure the other parameter.
password
The password of the JDBC user.
STRING
No
No default value
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
scan.partition.column
The name of the column that is used to partition the input data.
STRING
No
No default value
The values in the column must be of the NUMERIC or TIMESTAMP type and support comparison with values of the NUMERIC type in the database. For more information about partitioned scan, see Partitioned Scan.
scan.partition.num
The number of partitions.
INTEGER
No
No default value
N/A.
scan.partition.lower-bound
The smallest value of the first partition.
LONG
No
No default value
N/A.
scan.partition.upper-bound
The largest value of the last partition.
LONG
No
No default value
N/A.
scan.fetch-size
The number of rows of data that are obtained from the database each time data is read from a source table.
INTEGER
No
0
If you set this parameter to 0, this parameter is ignored.
scan.auto-commit
Specifies whether to enable auto-commit.
BOOLEAN
No
true
N/A.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
sink.buffer-flush.max-rows
The maximum number of data records that can be cached before the flush operation is performed.
INTEGER
No
100
If you set this parameter to 0, data records are not cached before the flush operation is performed.
sink.buffer-flush.interval
The interval at which data is flushed. If data records are cached for a period that exceeds the duration specified by this parameter, the flush operation is performed in an asynchronous thread.
DURATION
No
1 s
If you set this parameter to 0, data records are not cached before the flush operation is performed.
NoteIf you want to process cached flush events in asynchronous mode, you can set the sink.buffer-flush.max-rows parameter to 0 and configure the sink.buffer-flush.interval parameter based on your business requirements.
sink.max-retries
The maximum number of retries that are allowed when data fails to be written to the database.
INTEGER
No
3
N/A.
Parameters only for dimension tables
Parameter
Description
Data type
Required
Default value
Remarks
lookup.cache.max-rows
The maximum number of rows of data that can be cached. If the number of rows of data in the cache exceeds the value of this parameter, the earliest row of data expires and is replaced by a new row of data.
INTEGER
No
No default value
By default, caching for dimension tables is disabled. You can configure the lookup.cache.max-rows and lookup.cache.ttl parameters to enable caching for dimension tables. If caching for dimension tables is enabled, the LRU cache policy is used.
lookup.cache.ttl
The maximum time to live (TTL) of each row of data in the cache. If the time period for which a row of data is cached exceeds the value of this parameter, the row of data expires.
DURATION
No
No default value
lookup.cache.caching-missing-key
Specifies whether to cache empty query results.
BOOLEAN
No
true
Valid values:
true: Empty query results are cached. This is the default value.
false: Empty query results are not cached.
lookup.max-retries
The maximum number of retries when the database fails to be queried.
INTEGER
No
3
N/A.
Parameters only for PostgreSQL databases
Parameter
Description
Data type
Required
Default value
Remarks
source.extend-type.enabled
Specifies whether data of the JSONB and UUID extension types can be read and mapped to the data types supported by Flink when a PostgreSQL table is used as a source table or a dimension table.
BOOLEAN
No
false
Valid values:
true: Data of the JSONB and UUID extension types can be read and mapped to the data types supported by Flink.
false: Data of the JSONB and UUID extension types cannot be read or mapped to the data types supported by Flink. This is the default value.
Data type mappings
Data type of MySQL | Data type of Oracle | Data type of PostgreSQL | Data type of Flink SQL |
TINYINT | N/A | N/A | TINYINT |
| No default value |
| SMALLINT |
| No default value |
| INT |
| No default value |
| BIGINT |
BIGINT UNSIGNED | N/A | N/A | DECIMAL(20, 0) |
BIGINT | No default value | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| No default value | BOOLEANcan | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
N/A | N/A | ARRAY | ARRAY |
Sample code
Sample code for a source table
CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
Sample code for a result table
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;
Sample code for a dimension table
CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;