本文為您介紹如何使用OceanBase連接器。
背景資訊
OceanBase資料庫是一款原生分布式的HTAP資料庫管理系統,詳情請參見OceanBase官網。為了降低您從MySQL資料庫或Oracle資料庫遷移到OceanBase資料庫時引發的業務系統改造成本,OceanBase資料庫支援Oracle和MySQL兩種相容模式,兩種模式下的資料類型、SQL功能、內部視圖等與MySQL資料庫或Oracle資料庫保持一致。兩種模式下建議使用的連接器如下:
Oracle模式:只能使用OceanBase連接器。
MySQL模式:與原生MySQL保持高度相容,支援使用OceanBase和MySQL兩種連接器,MySQL連接器詳情請參見MySQL。
在無需進階特性的情況下,維表和結果表建議優先考慮MySQL連接器,配置更簡單。
使用OceanBase 3.2.4.4及以上版本時,源表建議優先使用MySQL連接器(因為OceanBase 3.2.4.4及以上版本MySQL模式開始支援開啟Binlog服務,輸出格式與原生MySQL Binlog一致),業務架構改造成本低。
OceanBase連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL |
是否支援更新或刪除結果表資料 | 是 |
前提條件
串連的資料庫和表都已被建立。具體操作可參考以下文檔:
MySQL模式
Oracle模式
使用限制
維表和結果表
Flink計算引擎VVR 8.0.1及以上版本支援OceanBase連接器。
語義上可以保證At-Least-Once,在結果表有主鍵的情況下,等冪可以保證資料的正確性。
結果表:OceanBase資料庫沒有部署OceanBase資料庫代理(OBProxy)時,連接器會使用OCJ(OceanBase Connector Java)串連OceanBase資料庫,該模式需要用到config url,要求OceanBase資料庫已部署OceanBase雲平台。該工作方式只能用於OceanBase資料庫的MySQL相容模式,不支援Oracle相容模式。
說明OBProxy與OCJ實現了相同的路由功能,區別在於OCJ驅動整合於Java應用程式,而OBProxy是一個獨立的代理服務。目前,OceanBase團隊推薦通過OBProxy來串連OceanBase叢集,OCJ驅動主要用於相容一些歷史叢集和應用程式。
文法結構
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);
連接器寫入結果表原理:寫入結果表時,會將接收到的每條資料拼接成一條SQL去執行。具體執行的SQL情況如下:
對於沒有主鍵的結果表,會拼接成INSERT INTO語句。
對於包含主鍵的結果表,會根據資料庫的相容模式拼接成UPSERT語句。
WITH參數
通用
參數
說明
是否必填
資料類型
預設值
備忘
connector
表類型。
是
STRING
無
作為CDC源表或維表時,固定值為
oceanbase
。作為結果表時,參數取值如下:
如果使用了OceanBase資料庫代理OBProxy,則表類型取值為
oceanbase
,如果直連OceanBase叢集,則表類型取值為
oceanbase-ocj
。
userName
使用者名稱。
是
STRING
無
無。
password
密碼。
是
STRING
無
無。
源表專屬
說明連接器支援通過資料庫名稱(database-name)和表名(table-name)的正則匹配和表列表(table-list)的精確匹配兩種模式來指定需要監聽的表。當同時使用兩種方式時,將會監聽兩種方式匹配的所有表。
參數
說明
是否必填
資料類型
預設值
備忘
logproxy.host
OceanBase日誌Proxy 伺服器的IP地址或主機名稱。
是
String
無
無。
logproxy.port
OceanBase日誌Proxy 伺服器的連接埠號碼。
是
Integer
無
無。
scan.startup.mode
OceanBase CDC的啟動模式。
是
String
無
參數取值如下:
initial:從初始位點開始拉取全部資料。
latest-offset:從當前位點開始拉取變更資料。
timestamp:從指定的時間戳記開始拉取變更資料。
tenant-name
OceanBase資料庫的租戶名。
是
String
無
無。
database-name
OceanBase資料庫名稱。
否
String
無
支援使用Regex指定資料庫名稱。
說明僅支援在啟動模式為initial時,使用該參數。
table-name
OceanBase資料庫的表名稱。
否
String
無
支援使用Regex指定表名稱。
說明僅支援在啟動模式為initial時,使用該參數。
table-list
OceanBase資料庫的全路徑的表名列表。
否
String
無
可以使用英文逗號(,)分隔,例如
db1.table1, db2.table2
。hostname
OceanBase資料庫或 OceanBase 代理OBProxy的IP地址或主機名稱。
否
String
無
無。
port
OceanBase資料庫伺服器的連接埠號碼。
否
Integer
無
可以是OceanBase伺服器的SQL連接埠號碼(預設值為2881)
或OceanBaseProxy 伺服器的連接埠號碼(預設值為2883)。
connect.timeout
串連到OceanBase資料庫伺服器之前的最長逾時時間。
否
Duration
30s
無。
server-time-zone
資料庫伺服器中的會話時區。
否
String
+00:00
會話時區值的合法格式為
±hh:mm
,表示與國際標準時間(UTC)的時區位移量。說明會話時區的設定會影響到時間類型的顯示和儲存方式。因此,如果您需要控制OceanBase的時間類型如何轉換為字串,則需要設定合理的會話時區資訊,以確保顯示正確的本地時間。
如果您在MySQL資料庫中已存在一個用於儲存時區資訊的表,則在設定時區時,可以使用這個表中已經建立的時區作為合法的值。
logproxy.client.id
OceanBase日誌Proxy 伺服器的用戶端串連ID。
否
String
規則產生
如果您沒有指定,則Flink會預設按照
{flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}
規則產生。rootserver-list
OceanBase根伺服器列表。
否
String
無
伺服器列表格式為
ip:rpc_port:sql_port
。您可以執行SHOW PARAMETERS LIKE 'rootservice_list';
SQL語句擷取伺服器列表資訊。說明OceanBase社區版本必填。
多個伺服器位址使用英文分號(;)分隔。
config-url
從設定管理員擷取伺服器資訊的url。
否
String
無
OceanBase企業版本必填。
working-mode
日誌代理中libobcdc的工作模式。
否
String
storage
參數取值如下:
storage:表示資料將被儲存在磁碟或其他持久性儲存介質中。
memory:表示資料將被儲存在記憶體中。
compatible-mode
OceanBase的相容模式。
否
String
mysql
參數取值如下:
mysql
oracle
jdbc.driver
全量讀取源表資料時使用的jdbc驅動類名。
否
String
com.mysql.jdbc.Driver
無。
jdbc.properties.*
傳遞自訂的JDBC URL屬性。
否
String
無
例如
'jdbc.properties.useSSL' = 'false'
表示不使用SSL加密。obcdc.properties.*
將自訂的 OBCDC參數傳遞給libobcdc。
否
String
無
例如
'obcdc.properties.sort_trans_participants' = '1'
。更多參數資訊見obcdc parameters。
維表專屬
參數
說明
是否必填
資料類型
預設值
備忘
url
JDBC url或config url。
是
STRING
無
當連接器類型為
oceanbase
時使用JDBC url,連接器類型為oceanbase-ocj
時,使用config url。url中需要包含MySQL database名或Oracle service名。
cache
緩衝策略。
否
STRING
ALL
支援以下三種緩衝策略:
ALL:緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。
適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。
LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料。如果沒有找到,則去物理維表中尋找。使用該緩衝策略時,必須配置cacheSize參數。
None:無緩衝。
重要使用ALL緩衝策略時,請注意節點記憶體大小,防止出現OOM。
因為系統會非同步載入維表資料,所以在使用ALL緩衝策略時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。
cacheSize
最大緩衝條數。
否
INTEGER
100000
當選擇LRU緩衝策略後,必須設定緩衝大小。
當選擇ALL緩衝策略後,可以不設定緩衝大小。
cacheTTLMs
緩衝逾時時間。
否
LONG
Long.MAX_VALUE
cacheTTLMs的配置和cache有關,詳情如下:
如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。
如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。
如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。
maxRetryTimeout
最大重試時間。
否
DURATION
60s
無。
結果表專屬
參數
說明
是否必填
資料類型
預設值
備忘
compatibleMode
OceanBase的相容模式。
否
STRING
mysql
參數取值如下:
mysql
oracle
說明oceanabse專屬參數。
databaseName
資料庫名。
是
STRING
無
應當與config url中保持一致。
說明oceanbase-ocj專屬參數。
passwordEncrypted
是否使用加密過的密碼。
否
Boolean
false
oceanbase-ocj專屬參數。
slowQueryThresholdMs
慢查詢等待閾值。
否
INTEGER
60000
單位毫秒。
說明oceanbase-ocj專屬參數。
url
JDBC url或config url。
是
STRING
無
當連接器類型為
oceanbase
時使用JDBC url,連接器類型為oceanbase-ocj
時,使用config url。url中需要包含MySQL database名或Oracle service名。
tableName
表名。
是
STRING
無
無。
maxRetryTimes
最大重試次數。
否
INTEGER
3
無。
poolInitialSize
資料庫連接池初始大小。
否
INTEGER
1
無。
poolMaxActive
資料庫連接池最大串連數。
否
INTEGER
8
無。
poolMaxWait
從資料庫連接池中擷取串連的最大等待時間。
否
INTEGER
2000
單位毫秒。
poolMinIdle
資料庫連接池中最小空閑串連數。
否
INTEGER
1
無。
connectionProperties
jdbc的串連屬性。
否
STRING
無
格式為"k1=v1;k2=v2;k3=v3"。
ignoreDelete
是否忽略資料Delete操作。
否
Boolean
false
無。
excludeUpdateColumns
指定要排除的列名。在執行更新操作時,這些列將不會被更新。
否
STRING
無
如果忽略指定的欄位為多個時,則需要使用英文逗號(,)分隔。例如
excludeUpdateColumns=column1,column2
。說明該值始終會包含主鍵列,也就是實際生效的列名為您指定的列名和主鍵列。
partitionKey
分區鍵。
否
STRING
無
當設定分區鍵時,連接器會先將資料按照分區鍵進行分組,各個分組將分別寫入資料庫。這裡的分組處理早於modRule的處理。
modRule
分組規則。
否
STRING
無
分組規則格式需要為"列名mod數字",列類型必須為數字類型。當設定分組規則時,資料會根據計算所得結果進行分組,各個分組將分別寫入資料庫。這裡的分組處理晚於partitionKey的處理。
bufferSize
資料緩衝區大小。
否
INTEGER
1000
無。
flushIntervalMs
清空緩衝的時間間隔。表示如果緩衝中的資料在等待指定時間後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。
否
LONG
1000
無。
retryIntervalMs
最大重試時間。
否
INTEGER
5000
單位毫秒。
類型映射
MySQL相容模式
OceanBase欄位類型
Flink欄位類型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
說明其中p <= 38。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink僅支援小於等於2,147,483,647(2^31 - 1)的BLOB類型的記錄。
BLOB
MEDIUMBLOB
LONGBLOB
Oracle相容模式
OceanBase欄位類型
Flink欄位類型
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
使用樣本
源表&結果表
CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'scan.startup.mode' = 'initial', 'username' = 'user', 'password' = 'password', 'tenant-name' = 'tenant', 'database-name' = '^test_db$', 'table-name' = '^orders$', 'hostname' = '11.22.33.44', 'port' = '2883', 'config-url' = 'http://11.22.33.44:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx', 'logproxy.host' = '11.22.33.44', 'logproxy.port' = '2983', 'working-mode' = 'memory' ); -- oceanbase結果表 CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); --oceanbase-ocj結果表 CREATE TEMPORARY TABLE oceanbase_ocj_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase-ocj', 'url' = '<yourConfigUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'databaseName' = '<yourDatabaseName>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; INSERT INTO oceanbase_ocj_sink SELECT * FROM oceanbase_source; END;
維表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
相關文檔
Flink支援的連接器,請參見支援的連接器。