本文为您介绍如何使用JDBC连接器。
背景信息
此连接器为开源Flink的JDBC连接器,JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持。JDBC连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
连接的数据库和表都已被创建。
使用限制
仅实时计算引擎VVR 6.0.1及以上版本支持JDBC连接器。
JDBC源表为Bounded Source,表中数据读取完,对应的Task就会结束。如果需要捕获实时变更数据,则请使用CDC连接器,详情请参见MySQL的CDC源表和Postgres的CDC源表(公测中)。
使用JDBC结果表连接PostgreSQL数据库时,需要数据库版本为PostgreSQL 9.5及以上。因为DDL中定义主键的情况下,PostgreSQL采用ON CONFLICT语法进行插入或更新,此语法需要PostgreSQL 9.5及以上版本才支持。
Flink中只提供了开源JDBC连接器的实现,不包含具体的数据库的Driver。在使用JDBC连接器时,需要手动上传目标数据库Driver的JAR包作为附加依赖文件,具体操作请参见步骤三:进行更多配置。目前支持的Driver如下表所示。
Driver
Group Id
Artifact Id
MySQL
mysql
Oracle
com.oracle.database.jdbc
PostgreSQL
org.postgresql
如果您采用非列表中的JDBC Driver,则其正确性和可用性需要您自行充分测试并保证。
JDBC连接器在向MySQL结果表写入数据时,会将接收到的每条数据拼接成一条SQL去执行。对于包含主键的MySQL结果表,会拼接执行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
语句。需要注意的是,如果物理表存在除主键外的唯一索引约束,当插入两条主键不同但唯一索引相同的记录时,下游数据会因为唯一索引冲突导致数据覆盖引发数据丢失。
语法结构
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为jdbc。
url
数据库的URL。
String
是
无
无。
table-name
JDBC表的名称。
String
是
无
无。
username
JDBC用户名称。
String
否
无
如果指定了username和password中的任一参数,则两者必须都被指定。
password
JDBC用户密码。
String
否
无
源表独有
参数
说明
数据类型
是否必填
默认值
备注
scan.partition.column
对输入进行分区的列名。
String
否
无
该列必须是数值类型或时间戳类型,且该类型在数据库中需要支持与数值类型进行比较。关于分区扫描的详情请参见Partitioned Scan。
scan.partition.num
分区数。
Integer
否
无
无。
scan.partition.lower-bound
第一个分区的最小值。
Long
否
无
无。
scan.partition.upper-bound
最后一个分区的最大值。
Long
否
无
无。
scan.fetch-size
每次循环读取时,从数据库中获取的行数。
Integer
否
0
如果指定的值为0,则该配置项会被忽略。
scan.auto-commit
是否开启auto-commit。
Boolean
否
true
无。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
sink.buffer-flush.max-rows
flush数据前,缓存记录的最大值。
Integer
否
100
您可以设置为0来禁用它,即不再缓存记录,直接flush数据。
sink.buffer-flush.interval
flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。
Duration
否
1 s
您可以设置为0来禁用它,即不再缓存记录,直接flush数据。
说明如果您需要完全异步地处理缓存的flush事件,则可以将sink.buffer-flush.max-rows设置为0,并配置适当的flush时间间隔。
sink.max-retries
写入记录到数据库失败后的最大重试次数。
Integer
否
3
无。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
lookup.cache.max-rows
指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。
Integer
否
无
默认情况下,维表Cache是未开启的。您可以设置lookup.cache.max-rows和lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。
lookup.cache.ttl
指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。
Duration
否
无
lookup.cache.caching-missing-key
是否缓存空的查询结果。
Boolean
否
true
参数取值如下:
true(默认值):缓存空的查询结果。
false:不缓存空的查询结果。
lookup.max-retries
查询数据库失败的最大重试次数。
Integer
否
3
无。
PostgreSQL独有
参数
说明
数据类型
是否必填
默认值
备注
source.extend-type.enabled
作为源表和维表时,是否允许读取JSONB和UUID拓展类型,并映射到Flink支持的类型。
Boolean
否
false
参数取值如下:
true:支持读取和映射拓展类型。
false(默认值):不支持读取和映射拓展类型。
类型映射
MySQL类型 | Oracle类型 | PostgreSQL类型 | FlinkSQL类型 |
TINYINT | 无 | 无 | TINYINT |
| 无 |
| SMALLINT |
| 无 |
| INT |
| 无 |
| BIGINT |
BIGINT UNSIGNED | 无 | 无 | DECIMAL(20, 0) |
BIGINT | 无 | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| 无 | BOOLEANcan | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
无 | 无 | ARRAY | ARRAY |
使用示例
源表
CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;
维表
CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;