本文为您介绍如何使用OceanBase连接器。
背景信息
OceanBase数据库是一款原生分布式的HTAP数据库管理系统,详情请参见OceanBase官网。为了降低您从MySQL数据库或Oracle数据库迁移到OceanBase数据库时引发的业务系统改造成本,OceanBase数据库支持Oracle和MySQL两种兼容模式,两种模式下的数据类型、SQL功能、内部视图等与MySQL数据库或Oracle数据库保持一致。两种模式下建议使用的连接器如下:
Oracle模式:只能使用OceanBase连接器。
MySQL模式:与原生MySQL保持高度兼容,支持使用OceanBase和MySQL两种连接器,MySQL连接器详情请参见MySQL。
在无需高级特性的情况下,维表和结果表建议优先考虑MySQL连接器,配置更简单。
使用OceanBase 3.2.4.4及以上版本时,源表建议优先使用MySQL连接器(因为OceanBase 3.2.4.4及以上版本MySQL模式开始支持开启Binlog服务,输出格式与原生MySQL Binlog一致),业务架构改造成本低。
OceanBase连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
连接的数据库和表都已被创建。具体操作可参考以下文档:
MySQL模式
Oracle模式
使用限制
维表和结果表
Flink计算引擎VVR 8.0.1及以上版本支持OceanBase连接器。
语义上可以保证At-Least-Once,在结果表有主键的情况下,幂等可以保证数据的正确性。
结果表:OceanBase数据库没有部署OceanBase数据库代理(OBProxy)时,连接器会使用OCJ(OceanBase Connector Java)连接OceanBase数据库,该模式需要用到config url,要求OceanBase数据库已部署OceanBase云平台。该工作方式只能用于OceanBase数据库的MySQL兼容模式,不支持Oracle兼容模式。
说明OBProxy与OCJ实现了相同的路由功能,区别在于OCJ驱动集成于Java应用程序,而OBProxy是一个独立的代理服务。目前,OceanBase团队推荐通过OBProxy来连接OceanBase集群,OCJ驱动主要用于兼容一些历史集群和应用程序。
语法结构
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);
连接器写入结果表原理:写入结果表时,会将接收到的每条数据拼接成一条SQL去执行。具体执行的SQL情况如下:
对于没有主键的结果表,会拼接成INSERT INTO语句。
对于包含主键的结果表,会根据数据库的兼容模式拼接成UPSERT语句。
WITH参数
通用
参数
说明
是否必填
数据类型
默认值
备注
connector
表类型。
是
STRING
无
作为CDC源表或维表时,固定值为
oceanbase
。作为结果表时,参数取值如下:
如果使用了OceanBase数据库代理OBProxy,则表类型取值为
oceanbase
,如果直连OceanBase集群,则表类型取值为
oceanbase-ocj
。
userName
用户名。
是
STRING
无
无。
password
密码。
是
STRING
无
无。
源表独有
说明连接器支持通过数据库名称(database-name)和表名(table-name)的正则匹配和表列表(table-list)的精确匹配两种模式来指定需要监听的表。当同时使用两种方式时,将会监听两种方式匹配的所有表。
参数
说明
是否必填
数据类型
默认值
备注
logproxy.host
OceanBase日志代理服务器的IP地址或主机名。
是
String
无
无。
logproxy.port
OceanBase日志代理服务器的端口号。
是
Integer
无
无。
scan.startup.mode
OceanBase CDC的启动模式。
是
String
无
参数取值如下:
initial:从初始位点开始拉取全部数据。
latest-offset:从当前位点开始拉取变更数据。
timestamp:从指定的时间戳开始拉取变更数据。
tenant-name
OceanBase数据库的租户名。
是
String
无
无。
database-name
OceanBase数据库名称。
否
String
无
支持使用正则表达式指定数据库名称。
说明仅支持在启动模式为initial时,使用该参数。
table-name
OceanBase数据库的表名称。
否
String
无
支持使用正则表达式指定表名称。
说明仅支持在启动模式为initial时,使用该参数。
table-list
OceanBase数据库的全路径的表名列表。
否
String
无
可以使用英文逗号(,)分隔,例如
db1.table1, db2.table2
。hostname
OceanBase数据库或 OceanBase 代理OBProxy的IP地址或主机名。
否
String
无
无。
port
OceanBase数据库服务器的端口号。
否
Integer
无
可以是OceanBase服务器的SQL端口号(默认值为2881)
或OceanBase代理服务器的端口号(默认值为2883)。
connect.timeout
连接到OceanBase数据库服务器之前的最长超时时间。
否
Duration
30s
无。
server-time-zone
数据库服务器中的会话时区。
否
String
+00:00
会话时区值的合法格式为
±hh:mm
,表示与协调世界时(UTC)的时区偏移量。说明会话时区的设置会影响到时间类型的显示和存储方式。因此,如果您需要控制OceanBase的时间类型如何转换为字符串,则需要设置合理的会话时区信息,以确保显示正确的本地时间。
如果您在MySQL数据库中已存在一个用于存储时区信息的表,则在设置时区时,可以使用这个表中已经创建的时区作为合法的值。
logproxy.client.id
OceanBase日志代理服务器的客户端连接ID。
否
String
规则生成
如果您没有指定,则Flink会默认按照
{flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}
规则生成。rootserver-list
OceanBase根服务器列表。
否
String
无
服务器列表格式为
ip:rpc_port:sql_port
。您可以执行SHOW PARAMETERS LIKE 'rootservice_list';
SQL语句获取服务器列表信息。说明OceanBase社区版本必填。
多个服务器地址使用英文分号(;)分隔。
config-url
从配置服务器获取服务器信息的url。
否
String
无
OceanBase企业版本必填。
working-mode
日志代理中libobcdc的工作模式。
否
String
storage
参数取值如下:
storage:表示数据将被存储在磁盘或其他持久性存储介质中。
memory:表示数据将被存储在内存中。
compatible-mode
OceanBase的兼容模式。
否
String
mysql
参数取值如下:
mysql
oracle
jdbc.driver
全量读取源表数据时使用的jdbc驱动类名。
否
String
com.mysql.jdbc.Driver
无。
jdbc.properties.*
传递自定义的JDBC URL属性。
否
String
无
例如
'jdbc.properties.useSSL' = 'false'
表示不使用SSL加密。obcdc.properties.*
将自定义的 OBCDC参数传递给libobcdc。
否
String
无
例如
'obcdc.properties.sort_trans_participants' = '1'
。更多参数信息见obcdc parameters。
维表独有
参数
说明
是否必填
数据类型
默认值
备注
url
JDBC url或config url。
是
STRING
无
当连接器类型为
oceanbase
时使用JDBC url,连接器类型为oceanbase-ocj
时,使用config url。url中需要包含MySQL database名或Oracle service名。
cache
缓存策略。
否
STRING
ALL
支持以下三种缓存策略:
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。使用该缓存策略时,必须配置cacheSize参数。
None:无缓存。
重要使用ALL缓存策略时,请注意节点内存大小,防止出现OOM。
因为系统会异步加载维表数据,所以在使用ALL缓存策略时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize
最大缓存条数。
否
INTEGER
100000
当选择LRU缓存策略后,必须设置缓存大小。
当选择ALL缓存策略后,可以不设置缓存大小。
cacheTTLMs
缓存超时时间。
否
LONG
Long.MAX_VALUE
cacheTTLMs的配置和cache有关,详情如下:
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
maxRetryTimeout
最大重试时间。
否
DURATION
60s
无。
结果表独有
参数
说明
是否必填
数据类型
默认值
备注
compatibleMode
OceanBase的兼容模式。
否
STRING
mysql
参数取值如下:
mysql
oracle
说明oceanabse独有参数。
databaseName
数据库名。
是
STRING
无
应当与config url中保持一致。
说明oceanbase-ocj独有参数。
passwordEncrypted
是否使用加密过的密码。
否
Boolean
false
oceanbase-ocj独有参数。
slowQueryThresholdMs
慢查询等待阈值。
否
INTEGER
60000
单位毫秒。
说明oceanbase-ocj独有参数。
url
JDBC url或config url。
是
STRING
无
当连接器类型为
oceanbase
时使用JDBC url,连接器类型为oceanbase-ocj
时,使用config url。url中需要包含MySQL database名或Oracle service名。
tableName
表名。
是
STRING
无
无。
maxRetryTimes
最大重试次数。
否
INTEGER
3
无。
poolInitialSize
数据库连接池初始大小。
否
INTEGER
1
无。
poolMaxActive
数据库连接池最大连接数。
否
INTEGER
8
无。
poolMaxWait
从数据库连接池中获取连接的最大等待时间。
否
INTEGER
2000
单位毫秒。
poolMinIdle
数据库连接池中最小空闲连接数。
否
INTEGER
1
无。
connectionProperties
jdbc的连接属性。
否
STRING
无
格式为"k1=v1;k2=v2;k3=v3"。
ignoreDelete
是否忽略数据Delete操作。
否
Boolean
false
无。
excludeUpdateColumns
指定要排除的列名。在执行更新操作时,这些列将不会被更新。
否
STRING
无
如果忽略指定的字段为多个时,则需要使用英文逗号(,)分隔。例如
excludeUpdateColumns=column1,column2
。说明该值始终会包含主键列,也就是实际生效的列名为您指定的列名和主键列。
partitionKey
分区键。
否
STRING
无
当设置分区键时,连接器会先将数据按照分区键进行分组,各个分组将分别写入数据库。这里的分组处理早于modRule的处理。
modRule
分组规则。
否
STRING
无
分组规则格式需要为"列名mod数字",列类型必须为数字类型。当设置分组规则时,数据会根据计算所得结果进行分组,各个分组将分别写入数据库。这里的分组处理晚于partitionKey的处理。
bufferSize
数据缓冲区大小。
否
INTEGER
1000
无。
flushIntervalMs
清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
否
LONG
1000
无。
retryIntervalMs
最大重试时间。
否
INTEGER
5000
单位毫秒。
类型映射
MySQL兼容模式
OceanBase字段类型
Flink字段类型
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
说明其中p <= 38。
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
重要Flink仅支持小于等于2,147,483,647(2^31 - 1)的BLOB类型的记录。
BLOB
MEDIUMBLOB
LONGBLOB
Oracle兼容模式
OceanBase字段类型
Flink字段类型
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
使用示例
源表&结果表
CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'scan.startup.mode' = 'initial', 'username' = 'user', 'password' = 'password', 'tenant-name' = 'tenant', 'database-name' = '^test_db$', 'table-name' = '^orders$', 'hostname' = '11.22.33.44', 'port' = '2883', 'config-url' = 'http://11.22.33.44:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx', 'logproxy.host' = '11.22.33.44', 'logproxy.port' = '2983', 'working-mode' = 'memory' ); -- oceanbase结果表 CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); --oceanbase-ocj结果表 CREATE TEMPORARY TABLE oceanbase_ocj_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase-ocj', 'url' = '<yourConfigUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'databaseName' = '<yourDatabaseName>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; INSERT INTO oceanbase_ocj_sink SELECT * FROM oceanbase_source; END;
维表
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
相关文档
Flink支持的连接器,请参见支持的连接器。