本文為您介紹如何使用JDBC連接器。
背景資訊
此連接器為開源Flink的JDBC連接器,JDBC連接器提供了對MySQL、PostgreSQL和Oracle等常見的資料庫讀寫支援。JDBC連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
串連的資料庫和表都已被建立。
使用限制
僅Realtime Compute引擎VVR 6.0.1及以上版本支援JDBC連接器。
JDBC源表為Bounded Source,表中資料讀取完,對應的Task就會結束。如果需要捕獲即時變更資料,則請使用CDC連接器,詳情請參見MySQL的CDC源表和Postgres的CDC源表(公測中)。
使用JDBC結果表串連PostgreSQL資料庫時,需要資料庫版本為PostgreSQL 9.5及以上。因為DDL中定義主鍵的情況下,PostgreSQL採用ON CONFLICT文法進行插入或更新,此文法需要PostgreSQL 9.5及以上版本才支援。
Flink中只提供了開源JDBC連接器的實現,不包含具體的資料庫的Driver。在使用JDBC連接器時,需要手動上傳目標資料庫Driver的JAR包作為附加依賴檔案,具體操作請參見步驟三:進行更多配置。目前支援的Driver如下表所示。
Driver
Group Id
Artifact Id
MySQL
mysql
Oracle
com.oracle.database.jdbc
PostgreSQL
org.postgresql
如果您採用非列表中的JDBC Driver,則其正確性和可用性需要您自行充分測試並保證。
JDBC連接器在向MySQL結果表寫入資料時,會將接收到的每條資料拼接成一條SQL去執行。對於包含主鍵的MySQL結果表,會拼接執行
INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;
語句。需要注意的是,如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下遊資料會因為唯一索引衝突導致資料覆蓋引發資料丟失。
文法結構
CREATE TABLE jdbc_table (
`id` BIGINT,
`name` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:xxx',
'table-name' = '<yourTable>',
'username' = '<yourUsername>',
'password' = '<yourPassword>'
);
WITH參數
通用
參數
說明
資料類型
是否必填
預設值
備忘
connector
表類型。
String
是
無
固定值為jdbc。
url
資料庫的URL。
String
是
無
無。
table-name
JDBC表的名稱。
String
是
無
無。
username
JDBC使用者名稱稱。
String
否
無
如果指定了username和password中的任一參數,則兩者必須都被指定。
password
JDBC使用者密碼。
String
否
無
源表專屬
參數
說明
資料類型
是否必填
預設值
備忘
scan.partition.column
對輸入進行分區的列名。
String
否
無
該列必須是數實值型別或時間戳記類型,且該類型在資料庫中需要支援與數實值型別進行比較。關於分區掃描的詳情請參見Partitioned Scan。
scan.partition.num
分區數。
Integer
否
無
無。
scan.partition.lower-bound
第一個分區的最小值。
Long
否
無
無。
scan.partition.upper-bound
最後一個分區的最大值。
Long
否
無
無。
scan.fetch-size
每次迴圈讀取時,從資料庫中擷取的行數。
Integer
否
0
如果指定的值為0,則該配置項會被忽略。
scan.auto-commit
是否開啟auto-commit。
Boolean
否
true
無。
結果表專屬
參數
說明
資料類型
是否必填
預設值
備忘
sink.buffer-flush.max-rows
flush資料前,緩衝記錄的最大值。
Integer
否
100
您可以設定為0來禁用它,即不再緩衝記錄,直接flush資料。
sink.buffer-flush.interval
flush資料的時間間隔。資料在Flink中緩衝的時間超過該參數指定的時間後,非同步線程將flush資料到資料庫中。
Duration
否
1 s
您可以設定為0來禁用它,即不再緩衝記錄,直接flush資料。
說明如果您需要完全非同步地處理緩衝的flush事件,則可以將sink.buffer-flush.max-rows設定為0,並配置適當的flush時間間隔。
sink.max-retries
寫入記錄到資料庫失敗後的最大重試次數。
Integer
否
3
無。
維表專屬
參數
說明
資料類型
是否必填
預設值
備忘
lookup.cache.max-rows
指定緩衝的最大行數。如果超過該值,則最老的行記錄將會到期,會被新的記錄替換掉。
Integer
否
無
預設情況下,維表Cache是未開啟的。您可以設定lookup.cache.max-rows和lookup.cache.ttl參數來啟用維表Cache。啟用緩衝時,採用的是LRU策略緩衝。
lookup.cache.ttl
指定緩衝中每行記錄的最大存活時間。如果某行記錄超過該時間,則該行記錄將會到期。
Duration
否
無
lookup.cache.caching-missing-key
是否緩衝空的查詢結果。
Boolean
否
true
參數取值如下:
true(預設值):緩衝空的查詢結果。
false:不緩衝空的查詢結果。
lookup.max-retries
查詢資料庫失敗的最大重試次數。
Integer
否
3
無。
PostgreSQL專屬
參數
說明
資料類型
是否必填
預設值
備忘
source.extend-type.enabled
作為源表和維表時,是否允許讀取JSONB和UUID拓展類型,並映射到Flink支援的類型。
Boolean
否
false
參數取值如下:
true:支援讀取和映射拓展類型。
false(預設值):不支援讀取和映射拓展類型。
類型映射
MySQL類型 | Oracle類型 | PostgreSQL類型 | FlinkSQL類型 |
TINYINT | 無 | 無 | TINYINT |
| 無 |
| SMALLINT |
| 無 |
| INT |
| 無 |
| BIGINT |
BIGINT UNSIGNED | 無 | 無 | DECIMAL(20, 0) |
BIGINT | 無 | BIGINT | BIGINT |
FLOAT | BINARY_FLOAT |
| FLOAT |
| BINARY_DOUBLE |
| DOUBLE |
|
|
| DECIMAL(p, s) |
| 無 | BOOLEANcan | BOOLEAN |
DATE | DATE | DATE | DATE |
TIME [(p)] | DATE | TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
|
|
| STRING |
|
| BYTEA | BYTES |
無 | 無 | ARRAY | ARRAY |
使用樣本
源表
CREATE TEMPORARY TABLE jdbc_source ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
結果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); INSERT INTO jdbc_sink SELECT * FROM datagen_source;
維表
CREATE TEMPORARY TABLE datagen_source( `id` INT, `data` BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE jdbc_dim ( `id` INT, `name` VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:xxx', 'table-name' = '<yourTable>', 'username' = '<yourUsername>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink( `id` INT, `data` BIGINT, `name` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.`id`,T.`data`, H.`name` FROM datagen_source AS T JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;