全部產品
Search
文件中心

Realtime Compute for Apache Flink:OceanBase(公測中)

更新時間:Dec 12, 2024

本文為您介紹如何使用OceanBase連接器。

背景資訊

OceanBase資料庫是一款原生分布式的HTAP資料庫管理系統,詳情請參見OceanBase官網。為了降低您從MySQL資料庫或Oracle資料庫遷移到OceanBase資料庫時引發的業務系統改造成本,OceanBase資料庫支援Oracle和MySQL兩種相容模式,兩種模式下的資料類型、SQL功能、內部視圖等與MySQL資料庫或Oracle資料庫保持一致。兩種模式下建議使用的連接器如下:

  • Oracle模式:只能使用OceanBase連接器。

  • MySQL模式:與原生MySQL保持高度相容,支援使用OceanBase和MySQL兩種連接器,MySQL連接器詳情請參見MySQL

    • 在無需進階特性的情況下,維表和結果表建議優先考慮MySQL連接器,配置更簡單。

    • 使用OceanBase 3.2.4.4及以上版本時,源表建議優先使用MySQL連接器。這是因為OceanBase 3.2.4.4及以上版本MySQL模式開始支援開啟Binlog服務,輸出格式與原生MySQL Binlog一致(有關OceanBase Binlog的更多資訊,請參見概述Binlog 相關操作)。

      重要
      • 使用MySQL連接器讀取OceanBase Binlog目前處於公測階段,請在使用前充分評估並謹慎使用。

      • 使用MySQL連接器讀取OceanBase時,請確保OceanBase Binlog已開啟且被正確設定,詳情請參見Binlog 相關操作

OceanBase連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不適用

特有監控指標

暫無

API種類

SQL

是否支援更新或刪除結果表資料

前提條件

串連的資料庫和表都已被建立。具體操作可參考以下文檔:

使用限制

  • 維表和結果表

    • Flink計算引擎VVR 8.0.1及以上版本支援OceanBase連接器。

    • 語義上可以保證At-Least-Once,在結果表有主鍵的情況下,等冪可以保證資料的正確性。

  • 結果表:OceanBase資料庫沒有部署資料庫代理服務時,連接器會使用OCJ(OceanBase Connector Java)串連OceanBase資料庫,該模式需要用到config url,要求OceanBase資料庫已部署OceanBase雲平台。該工作方式只能用於OceanBase資料庫的MySQL相容模式,不支援Oracle相容模式。

    說明

    資料庫代理服務與OCJ實現了相同的路由功能,區別在於OCJ驅動整合於Java應用程式,而資料庫代理是一個獨立的代理服務。目前,OceanBase團隊推薦通過資料庫代理來串連OceanBase叢集,OCJ驅動主要用於相容一些歷史叢集和應用程式。

文法結構

CREATE TABLE oceanabse_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'oceanbase',
  'url' = '<yourJdbcUrl>',
  'tableName' = '<yourTableName>',
  'userName' = '<yourUserName>',
  'password' = '<yourPassword>'
);

說明

連接器寫入結果表原理:寫入結果表時,會將接收到的每條資料拼接成一條SQL去執行。具體執行的SQL情況如下:

  • 對於沒有主鍵的結果表,會拼接成INSERT INTO語句。

  • 對於包含主鍵的結果表,會根據資料庫的相容模式拼接成UPSERT語句。

WITH參數

  • 通用

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    connector

    表類型。

    STRING

    • 作為CDC源表或維表時,固定值為oceanbase

    • 作為結果表時,參數取值如下:

      • 如果使用了OceanBase資料庫代理,則表類型取值為oceanbase

      • 如果直連OceanBase叢集,則表類型取值為oceanbase-ocj

    userName

    使用者名稱。

    STRING

    無。

    password

    密碼。

    STRING

    無。

  • 源表專屬

    說明

    連接器支援通過資料庫名稱(database-name)和表名(table-name)的正則匹配和表列表(table-list)的精確匹配兩種模式來指定需要監聽的表。當同時使用兩種方式時,將會監聽兩種方式匹配的所有表。

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    logproxy.host

    OceanBase日誌Proxy 伺服器的IP地址或主機名稱。

    String

    無。

    logproxy.port

    OceanBase日誌Proxy 伺服器的連接埠號碼。

    Integer

    無。

    scan.startup.mode

    OceanBase CDC的啟動模式。

    String

    參數取值如下:

    • initial:從初始位點開始拉取全部資料。

    • latest-offset:從當前位點開始拉取變更資料。

    • timestamp:從指定的時間戳記開始拉取變更資料。

    tenant-name

    OceanBase資料庫的租戶名。

    String

    無。

    database-name

    OceanBase資料庫名稱。

    String

    支援使用Regex指定資料庫名稱。

    說明

    僅支援在啟動模式為initial時,使用該參數。

    table-name

    OceanBase資料庫的表名稱。

    String

    支援使用Regex指定表名稱。

    說明

    僅支援在啟動模式為initial時,使用該參數。

    table-list

    OceanBase資料庫的全路徑的表名列表。

    String

    可以使用英文逗號(,)分隔,例如db1.table1, db2.table2

    hostname

    OceanBase資料庫或 OceanBase代理的IP地址或主機名稱。

    String

    無。

    port

    OceanBase資料庫伺服器的連接埠號碼。

    Integer

    可以是OceanBase伺服器的SQL連接埠號碼(預設值為2881)

    或OceanBaseProxy 伺服器的連接埠號碼(預設值為2883)。

    connect.timeout

    串連到OceanBase資料庫伺服器之前的最長逾時時間。

    Duration

    30s

    無。

    server-time-zone

    資料庫伺服器中的會話時區。

    String

    +00:00

    會話時區值的合法格式為±hh:mm,表示與國際標準時間(UTC)的時區位移量。

    說明
    • 會話時區的設定會影響到時間類型的顯示和儲存方式。因此,如果您需要控制OceanBase的時間類型如何轉換為字串,則需要設定合理的會話時區資訊,以確保顯示正確的本地時間。

    • 如果您在MySQL資料庫中已存在一個用於儲存時區資訊的表,則在設定時區時,可以使用這個表中已經建立的時區作為合法的值。

    logproxy.client.id

    OceanBase日誌Proxy 伺服器的用戶端串連ID。

    String

    規則產生

    如果您沒有指定,則Flink會預設按照{flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}規則產生。

    rootserver-list

    OceanBase根伺服器列表。

    String

    伺服器列表格式為ip:rpc_port:sql_port。您可以執行SHOW PARAMETERS LIKE 'rootservice_list';SQL語句擷取伺服器列表資訊。

    說明
    • OceanBase社區版本必填。

    • 多個伺服器位址使用英文分號(;)分隔。

    config-url

    從設定管理員擷取伺服器資訊的url。

    String

    OceanBase企業版本必填。

    working-mode

    日誌代理中libobcdc的工作模式。

    String

    storage

    參數取值如下:

    • storage:表示資料將被儲存在磁碟或其他持久性儲存介質中。

    • memory:表示資料將被儲存在記憶體中。

    compatible-mode

    OceanBase的相容模式。

    String

    mysql

    參數取值如下:

    • mysql

    • oracle

    jdbc.driver

    全量讀取源表資料時使用的jdbc驅動類名。

    String

    com.mysql.jdbc.Driver

    無。

    jdbc.properties.*

    傳遞自訂的JDBC URL屬性。

    String

    例如 'jdbc.properties.useSSL' = 'false'表示不使用SSL加密。

    obcdc.properties.*

    將自訂的 OBCDC參數傳遞給libobcdc。

    String

    例如'obcdc.properties.sort_trans_participants' = '1'

    更多參數資訊見obcdc parameters

  • 維表專屬

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    url

    JDBC url或config url。

    STRING

    • 當連接器類型為oceanbase時使用JDBC url,連接器類型為oceanbase-ocj時,使用config url。

    • url中需要包含MySQL database名或Oracle service名。

    cache

    緩衝策略。

    STRING

    ALL

    支援以下三種緩衝策略:

    • ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

      適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。

    • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。使用該緩衝策略時,必須配置cacheSize參數。

    • None:無緩衝。

    重要
    • 使用ALL緩衝策略時,請注意節點記憶體大小,防止出現OOM。

    • 因為系統會非同步載入維表資料,所以在使用ALL緩衝策略時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。

    cacheSize

    最大緩衝條數。

    INTEGER

    100000

    • 當選擇LRU緩衝策略後,必須設定緩衝大小。

    • 當選擇ALL緩衝策略後,可以不設定緩衝大小。

    cacheTTLMs

    緩衝逾時時間。

    LONG

    Long.MAX_VALUE

    cacheTTLMs的配置和cache有關,詳情如下:

    • 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。

    • 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。

    • 如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。

    maxRetryTimeout

    最大重試時間。

    DURATION

    60s

    無。

  • 結果表專屬

    參數

    說明

    是否必填

    資料類型

    預設值

    備忘

    compatibleMode

    OceanBase的相容模式。

    STRING

    mysql

    參數取值如下:

    • mysql

    • oracle

    說明

    oceanabse專屬參數。

    databaseName

    資料庫名。

    STRING

    應當與config url中保持一致。

    說明

    oceanbase-ocj專屬參數。

    passwordEncrypted

    是否使用加密過的密碼。

    Boolean

    false

    oceanbase-ocj專屬參數。

    slowQueryThresholdMs

    慢查詢等待閾值。

    INTEGER

    60000

    單位毫秒。

    說明

    oceanbase-ocj專屬參數。

    url

    JDBC url或config url。

    STRING

    • 當連接器類型為oceanbase時使用JDBC url,連接器類型為oceanbase-ocj時,使用config url。

    • url中需要包含MySQL database名或Oracle service名。

    tableName

    表名。

    STRING

    無。

    maxRetryTimes

    最大重試次數。

    INTEGER

    3

    無。

    poolInitialSize

    資料庫連接池初始大小。

    INTEGER

    1

    無。

    poolMaxActive

    資料庫連接池最大串連數。

    INTEGER

    8

    無。

    poolMaxWait

    從資料庫連接池中擷取串連的最大等待時間。

    INTEGER

    2000

    單位毫秒。

    poolMinIdle

    資料庫連接池中最小空閑串連數。

    INTEGER

    1

    無。

    connectionProperties

    jdbc的串連屬性。

    STRING

    格式為"k1=v1;k2=v2;k3=v3"。

    ignoreDelete

    是否忽略資料Delete操作。

    Boolean

    false

    無。

    excludeUpdateColumns

    指定要排除的列名。在執行更新操作時,這些列將不會被更新。

    STRING

    如果忽略指定的欄位為多個時,則需要使用英文逗號(,)分隔。例如excludeUpdateColumns=column1,column2

    說明

    該值始終會包含主鍵列,也就是實際生效的列名為您指定的列名和主鍵列。

    partitionKey

    分區鍵。

    STRING

    當設定分區鍵時,連接器會先將資料按照分區鍵進行分組,各個分組將分別寫入資料庫。這裡的分組處理早於modRule的處理。

    modRule

    分組規則。

    STRING

    分組規則格式需要為"列名mod數字",列類型必須為數字類型。當設定分組規則時,資料會根據計算所得結果進行分組,各個分組將分別寫入資料庫。這裡的分組處理晚於partitionKey的處理。

    bufferSize

    資料緩衝區大小。

    INTEGER

    1000

    無。

    flushIntervalMs

    清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

    LONG

    1000

    無。

    retryIntervalMs

    最大重試時間。

    INTEGER

    5000

    單位毫秒。

類型映射

  • MySQL相容模式

    OceanBase欄位類型

    Flink欄位類型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    REAL

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    NUMERIC(p, s)

    DECIMAL(p, s)

    說明

    其中p <= 38。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(⌈n/8⌉)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink僅支援小於等於2,147,483,647(2^31 - 1)的BLOB類型的記錄。

    BLOB

    MEDIUMBLOB

    LONGBLOB

  • Oracle相容模式

    OceanBase欄位類型

    Flink欄位類型

    NUMBER(p, s <= 0), p - s < 3

    TINYINT

    NUMBER(p, s <= 0), p - s < 5

    SMALLINT

    NUMBER(p, s <= 0), p - s < 10

    INT

    NUMBER(p, s <= 0), p - s < 19

    BIGINT

    NUMBER(p, s <= 0), 19 <= p - s <= 38

    DECIMAL(p - s, 0)

    NUMBER(p, s > 0)

    DECIMAL(p, s)

    NUMBER(p, s <= 0), p - s > 38

    STRING

    FLOAT

    FLOAT

    BINARY_FLOAT

    BINARY_DOUBLE

    DOUBLE

    NUMBER(1)

    BOOLEAN

    DATE

    TIMESTAMP [(p)] [WITHOUT TIMEZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    STRING

    NCHAR(n)

    NVARCHAR2(n)

    VARCHAR(n)

    VARCHAR2(n)

    CLOB

    BLOB

    BYTES

    ROWID

使用樣本

  • 源表&結果表

    CREATE TEMPORARY TABLE oceanbase_source (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'scan.startup.mode' = 'initial',
      'username' = 'user',
      'password' = 'password',
      'tenant-name' = 'tenant',
      'database-name' = '^test_db$',
      'table-name' = '^orders$',
      'hostname' = '11.22.xx.xx',
      'port' = '2883',
      'config-url' = 'http://11.22.xx.xx:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
      'logproxy.host' = '11.22.xx.xx',
      'logproxy.port' = '2983',
      'working-mode' = 'memory'
    );
    
    -- oceanbase結果表
    CREATE TEMPORARY TABLE oceanbase_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTableName>'
    );
    
    --oceanbase-ocj結果表
    CREATE TEMPORARY TABLE oceanbase_ocj_sink (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase-ocj',
      'url' = '<yourConfigUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'databaseName' = '<yourDatabaseName>',
      'tableName' = '<yourTableName>'
    );
    
    BEGIN STATEMENT SET;  
    INSERT INTO oceanbase_sink
    SELECT * FROM oceanbase_source;
    INSERT INTO oceanbase_ocj_sink
    SELECT * FROM oceanbase_source;
    END; 

  • 維表

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE oceanbase_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'oceanbase',
      'url' = '<yourJdbcUrl>',
      'userName' = '<yourUserName>',
      'password' = '${secret_values.password}',
      'tableName' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T 
    JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H 
    ON T.a = H.a;

相關文檔

Flink支援的連接器,請參見支援的連接器