全部產品
Search
文件中心

Realtime Compute for Apache Flink:JDBC

更新時間:Jul 13, 2024

本文為您介紹如何使用JDBC連接器。

背景資訊

此連接器為開源Flink的JDBC連接器,JDBC連接器提供了對MySQL、PostgreSQL和Oracle等常見的資料庫讀寫支援。JDBC連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

暫無

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

串連的資料庫和表都已被建立。

使用限制

  • 僅Realtime Compute引擎VVR 6.0.1及以上版本支援JDBC連接器。

  • JDBC源表為Bounded Source,表中資料讀取完,對應的Task就會結束。如果需要捕獲即時變更資料,則請使用CDC連接器,詳情請參見MySQL的CDC源表Postgres的CDC源表(公測中)

  • 使用JDBC結果表串連PostgreSQL資料庫時,需要資料庫版本為PostgreSQL 9.5及以上。因為DDL中定義主鍵的情況下,PostgreSQL採用ON CONFLICT文法進行插入或更新,此文法需要PostgreSQL 9.5及以上版本才支援。

  • Flink中只提供了開源JDBC連接器的實現,不包含具體的資料庫的Driver。在使用JDBC連接器時,需要手動上傳目標資料庫Driver的JAR包作為附加依賴檔案,具體操作請參見步驟三:進行更多配置。目前支援的Driver如下表所示。

    Driver

    Group Id

    Artifact Id

    MySQL

    mysql

    mysql-connector-java

    Oracle

    com.oracle.database.jdbc

    ojdbc8

    PostgreSQL

    org.postgresql

    postgresql

    • 如果您採用非列表中的JDBC Driver,則其正確性和可用性需要您自行充分測試並保證。

    • JDBC連接器在向MySQL結果表寫入資料時,會將接收到的每條資料拼接成一條SQL去執行。對於包含主鍵的MySQL結果表,會拼接執行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;語句。需要注意的是,如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下遊資料會因為唯一索引衝突導致資料覆蓋引發資料丟失。

文法結構

CREATE TABLE jdbc_table (
  `id` BIGINT,
  `name` VARCHAR,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:xxx',
  'table-name' = '<yourTable>',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>'
);

WITH參數

  • 通用

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    connector

    表類型。

    String

    固定值為jdbc。

    url

    資料庫的URL。

    String

    無。

    table-name

    JDBC表的名稱。

    String

    無。

    username

    JDBC使用者名稱稱。

    String

    如果指定了username和password中的任一參數,則兩者必須都被指定。

    password

    JDBC使用者密碼。

    String

  • 源表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    scan.partition.column

    對輸入進行分區的列名。

    String

    該列必須是數實值型別或時間戳記類型,且該類型在資料庫中需要支援與數實值型別進行比較。關於分區掃描的詳情請參見Partitioned Scan

    scan.partition.num

    分區數。

    Integer

    無。

    scan.partition.lower-bound

    第一個分區的最小值。

    Long

    無。

    scan.partition.upper-bound

    最後一個分區的最大值。

    Long

    無。

    scan.fetch-size

    每次迴圈讀取時,從資料庫中擷取的行數。

    Integer

    0

    如果指定的值為0,則該配置項會被忽略。

    scan.auto-commit

    是否開啟auto-commit

    Boolean

    true

    無。

  • 結果表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    sink.buffer-flush.max-rows

    flush資料前,緩衝記錄的最大值。

    Integer

    100

    您可以設定為0來禁用它,即不再緩衝記錄,直接flush資料。

    sink.buffer-flush.interval

    flush資料的時間間隔。資料在Flink中緩衝的時間超過該參數指定的時間後,非同步線程將flush資料到資料庫中。

    Duration

    1 s

    您可以設定為0來禁用它,即不再緩衝記錄,直接flush資料。

    說明

    如果您需要完全非同步地處理緩衝的flush事件,則可以將sink.buffer-flush.max-rows設定為0,並配置適當的flush時間間隔。

    sink.max-retries

    寫入記錄到資料庫失敗後的最大重試次數。

    Integer

    3

    無。

  • 維表專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    lookup.cache.max-rows

    指定緩衝的最大行數。如果超過該值,則最老的行記錄將會到期,會被新的記錄替換掉。

    Integer

    預設情況下,維表Cache是未開啟的。您可以設定lookup.cache.max-rowslookup.cache.ttl參數來啟用維表Cache。啟用緩衝時,採用的是LRU策略緩衝。

    lookup.cache.ttl

    指定緩衝中每行記錄的最大存活時間。如果某行記錄超過該時間,則該行記錄將會到期。

    Duration

    lookup.cache.caching-missing-key

    是否緩衝空的查詢結果。

    Boolean

    true

    參數取值如下:

    • true(預設值):緩衝空的查詢結果。

    • false:不緩衝空的查詢結果。

    lookup.max-retries

    查詢資料庫失敗的最大重試次數。

    Integer

    3

    無。

  • PostgreSQL專屬

    參數

    說明

    資料類型

    是否必填

    預設值

    備忘

    source.extend-type.enabled

    作為源表和維表時,是否允許讀取JSONB和UUID拓展類型,並映射到Flink支援的類型。

    Boolean

    false

    參數取值如下:

    • true:支援讀取和映射拓展類型。

    • false(預設值):不支援讀取和映射拓展類型。

類型映射

MySQL類型

Oracle類型

PostgreSQL類型

FlinkSQL類型

TINYINT

TINYINT

  • SMALLINT

  • TINYINT UNSIGNED

  • SMALLINT

  • INT2

  • SMALLSERIAL

  • SERIAL2

SMALLINT

  • INT

  • MEDIUMINT

  • SMALLINT UNSIGNED

  • INTEGER

  • SERIAL

INT

  • BIGINT

  • INT UNSIGNED

  • BIGINT

  • BIGSERIAL

BIGINT

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT

BIGINT

BIGINT

FLOAT

BINARY_FLOAT

  • REAL

  • FLOAT4

FLOAT

  • DOUBLE

  • DOUBLE PRECISION

BINARY_DOUBLE

  • FLOAT8

  • DOUBLE PRECISION

DOUBLE

  • NUMERIC(p, s)

  • DECIMAL(p, s)

  • SMALLINT

  • FLOAT(s)

  • DOUBLE PRECISION

  • REAL

  • NUMBER(p, s)

  • NUMERIC(p, s)

  • DECIMAL(p, s)

DECIMAL(p, s)

  • BOOLEAN

  • TINYINT(1)

BOOLEANcan

BOOLEAN

DATE

DATE

DATE

DATE

TIME [(p)]

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

  • CHAR(n)

  • VARCHAR(n)

  • TEXT

  • CHAR(n)

  • VARCHAR(n)

  • CLOB

  • CHAR(n)

  • CHARACTER(n)

  • VARCHAR(n)

  • CHARACTER VARYING(n)

  • TEXT

  • JSONB

  • UUID

STRING

  • BINARY

  • VARBINARY

  • BLOB

  • RAW(s)

  • BLOB

BYTEA

BYTES

ARRAY

ARRAY

使用樣本

  • 源表

    CREATE TEMPORARY TABLE jdbc_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM jdbc_source ;
  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    INSERT INTO jdbc_sink
    SELECT * FROM datagen_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
     `id` INT,
     `data` BIGINT,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE jdbc_dim (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:xxx',
      'table-name' = '<yourTable>',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `data` BIGINT,
      `name` VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.`id`,T.`data`, H.`name`
    FROM datagen_source AS T
    JOIN jdbc_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.id = H.id;