This topic describes how to use the OceanBase connector.
Background information
OceanBase is a native distributed hybrid transactional/analytical processing (HTAP) database management system. For more information, visit the OceanBase official website. To reduce the business system transformation costs incurred when you migrate data from a MySQL database or an Oracle database to an OceanBase database, OceanBase supports MySQL-compatible and Oracle-compatible modes. This helps ensure the consistency of data types, SQL features, and internal views between MySQL or Oracle databases and OceanBase databases. The connector that can be used for the OceanBase database varies based on the mode of the OceanBase database.
OceanBase database in Oracle-compatible mode: Only the OceanBase connector can be used.
OceanBase database in MySQL-compatible mode: The OceanBase connector and MySQL connector can be used. The OceanBase database in MySQL-compatible mode is highly compatible with open source MySQL databases. For more information about the MySQL connector, see MySQL connector.
If your business does not require advanced features, we recommend that you use the MySQL connector for dimension tables and result tables. This simplifies the parameter configuration for the tables.
If you use OceanBase V3.2.4.4 or later for an OceanBase database, we recommend that you use the MySQL connector for source tables. If you use OceanBase V3.2.4.4 or later for an OceanBase database in MySQL-compatible mode, binary logging is enabled and the output format of binary logs is the same as the output format of binary logs of open source MySQL. This reduces the costs incurred in the technical architecture transformation.
The following table describes the capabilities supported by the OceanBase connector.
Item | Description |
Table type | Source table, dimension table, and result table |
Running mode | Streaming mode and batch mode |
Data format | None |
Metric | None |
API type | SQL API |
Data update or deletion in a result table | Yes |
Prerequisites
The database and table to which you want to connect are created. For more information about related operations, see the following topics:
OceanBase database in MySQL-compatible mode
OceanBase database in Oracle-compatible mode
Limits
Dimension table and result table
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.1 or later supports the OceanBase connector.
The at-least-once semantics can be used. If a result table contains a primary key, idempotence can be used to ensure data correctness.
Result table: If OceanBase Database Proxy (OBProxy) is not deployed in the OceanBase database, the OceanBase connector uses OceanBase Connector Java (OCJ) to connect to the OceanBase database. In this case, Config URL is required. The OceanBase cloud platform must be deployed for the OceanBase database. The OCJ connection mode can be used only when the OceanBase database uses the MySQL-compatible mode.
NoteOBProxy and OCJ provide the same routing feature. The OCJ driver is integrated into the Java application. OBProxy is an independent proxy service. The OceanBase team recommends that you use OBProxy to connect to OceanBase clusters. In most cases, the OCJ driver is used to achieve compatibility with specific historical clusters and applications.
Syntax
CREATE TABLE oceanabse_source (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = '<yourJdbcUrl>',
'tableName' = '<yourTableName>',
'userName' = '<yourUserName>',
'password' = '<yourPassword>'
);
When the connector writes data to a result table, the connector concatenates each received data record into an SQL statement and executes the SQL statement based on the following rules:
If the result table does not have a primary key, the received data records are concatenated into an INSERT INTO statement.
If the result table has a primary key, the received data records are concatenated into an UPSERT statement based on the compatibility mode of the database.
Parameters in the WITH clause
Common parameters
Parameter
Description
Required
Data type
Default value
Remarks
connector
The type of the table.
Yes
STRING
No default value
If you use the OceanBase connector for a source table or dimension table, set the value to
oceanbase
.If you use the OceanBase connector for a result table, specify a value for this parameter based on the following rules:
If you use OBProxy, set the value to
oceanbase
.If you want to set up a direct connection to the OceanBase cluster, set the value to
oceanbase-ocj
.
userName
The username that is used to access the OceanBase database.
Yes
STRING
No default value
N/A.
password
The password that is used to access the OceanBase database.
Yes
STRING
No default value
N/A.
Parameters only for source tables
NoteThe OceanBase connector provides the database-name and table-name parameters for regular expression matching of tables to be listened to and the table-list parameter for exact matching of tables to be listened to. If you use regular expression matching together with exact matching to specify tables, the connector listens to all tables that are specified by using the two matching methods.
Parameter
Description
Required
Data type
Default value
Remarks
logproxy.host
The IP address or hostname of the OceanBase log proxy (OBLogProxy).
Yes
STRING
No default value
N/A.
logproxy.port
The port number of OBLogProxy.
Yes
INTEGER
No default value
N/A.
scan.startup.mode
The mode in which the OceanBase CDC connector starts to read data from an OceanBase database.
Yes
STRING
No default value
Valid values:
initial: All data is pulled from the initial offset.
latest-offset: Change data is pulled from the current offset.
timestamp: Change data is pulled from a specific timestamp.
tenant-name
The name of the tenant to which the OceanBase database belongs.
Yes
STRING
No default value
N/A.
database-name
The name of the OceanBase database.
No
STRING
No default value
You can use a regular expression to specify a database name.
NoteThis parameter is required only when the scan.startup.mode parameter is set to initial.
table-name
The name of the table in the OceanBase database.
No
STRING
No default value
You can use a regular expression to specify a table name.
NoteThis parameter is required only when the scan.startup.mode parameter is set to initial.
table-list
The names of tables in the full path of the OceanBase database.
No
STRING
No default value
Separate multiple table names with commas (,). Example:
db1.table1, db2.table2
.hostname
The IP address or hostname of the OceanBase database or OBProxy.
No
STRING
No default value
N/A.
port
The port number of the OceanBase database server.
No
INTEGER
No default value
The value of this parameter can be the SQL port number of the OceanBase server. Default value: 2881.
The value of this parameter can also be the port number of OBProxy. Default value: 2883.
connect.timeout
The timeout period before Realtime Compute for Apache Flink connects to the OceanBase database server.
No
DURATION
30s
N/A.
server-time-zone
The session time zone in the database server.
No
STRING
+00:00
The session time zone is in the
±hh:mm
format. The value of this parameter represents the time zone offset from Coordinated Universal Time (UTC).NoteIf you specify the session time zone, the display and storage of data of a time type are affected. Therefore, if you want to control the conversion of data of a time type in the OceanBase database into strings, you must specify a proper session time zone. This helps ensure that the correct local time is displayed.
If a table that stores time zone data already exists in the MySQL database, you can use the time zone created in the table as a valid value of this parameter when you specify the session time zone.
logproxy.client.id
The client ID of OBLogProxy.
No
STRING
Generated by rules
If you do not specify this parameter, Realtime Compute for Apache Flink automatically generates the client ID based on the
{flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}
rule.rootserver-list
The OceanBase root servers.
No
STRING
No default value
The value of this parameter is in the
ip:rpc_port:sql_port
format. You can execute theSHOW PARAMETERS LIKE 'rootservice_list';
statement to obtain the root servers.NoteThis parameter is required in OceanBase Community Edition.
Separate multiple root servers with semicolons (;).
config-url
The URL that is used to obtain server information from the Config server.
No
STRING
No default value
This parameter is required in OceanBase Enterprise Edition.
working-mode
The working mode of libobcdc in OBLogProxy.
No
STRING
storage
Valid values:
storage: Data is stored on a disk or in other persistent storage media.
memory: Data is stored in the memory.
compatible-mode
The compatibility mode of the OceanBase database.
No
STRING
mysql
Valid values:
mysql
oracle
jdbc.driver
The class name of a JDBC driver that is used to read all data from the source table.
No
STRING
com.mysql.jdbc.Driver
N/A.
jdbc.properties.*
Custom JDBC URL parameters.
No
STRING
No default value
For example, if you set the
jdbc.properties.useSSL
parameter to false, SSL encryption is not enabled.obcdc.properties.*
Pass custom OBCDC parameters to libobcdc.
No
STRING
No default value
Example:
'obcdc.properties.sort_trans_participants' = '1'
.For more information about the parameters, see obcdc parameters.
Parameters only for dimension tables
Parameter
Description
Required
Data type
Default value
Remarks
url
The Java Database Connectivity (JDBC) URL or Config URL.
Yes
STRING
No default value
If the connector parameter is set to
oceanbase
, the JDBC URL is used. If the connector parameter is set tooceanbase-ocj
, the Config URL is used.The URL must contain the MySQL database name or Oracle service name.
cache
The cache policy.
No
STRING
ALL
Valid values:
ALL: indicates that all the data in the dimension table is cached. Before a deployment runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.
If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause.
LRU: indicates that only specific data in the dimension table is cached. The system searches for data in the cache each time a data record is read from the source table. If the data is not found, the system searches for the data in the physical dimension table. If you set the cache parameter to LRU, you must configure the cacheSize parameter.
None: No data is cached.
ImportantIf you set the cache parameter to ALL, you must monitor the memory usage of the operator to prevent out of memory (OOM) errors.
If you set the cache parameter to ALL, you must increase the memory of the operator for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.
cacheSize
The maximum number of data records that can be cached.
No
INTEGER
100000
You must configure this parameter if the cache parameter is set to LRU.
You do not need to configure this parameter if the cache parameter is set to ALL.
cacheTTLMs
The cache timeout period.
No
LONG
Long.MAX_VALUE
The configuration of the cacheTTLMs parameter varies based on the value of the cache parameter.
If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.
If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.
If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system reloads the cache. By default, the cache is not refreshed.
maxRetryTimeout
The maximum timeout period for a retry.
No
DURATION
60s
N/A.
Parameters only for result tables
Parameter
Description
Required
Data type
Default value
Remarks
compatibleMode
The compatibility mode of the OceanBase database.
No
STRING
mysql
Valid values:
mysql
oracle
NoteThis parameter is required only when the connector parameter is set to oceanbase.
databaseName
The name of the database.
Yes
STRING
No default value
The value must be the same as the value that is used in Config URL.
NoteThis parameter is required only when the connector parameter is set to oceanbase-ocj.
passwordEncrypted
Specifies whether to use an encrypted password.
No
BOOLEAN
false
This parameter is required only when the connector parameter is set to oceanbase-ocj.
slowQueryThresholdMs
The wait threshold for slow queries.
No
INTEGER
60000
Unit: milliseconds.
NoteThis parameter is required only when the connector parameter is set to oceanbase-ocj.
url
The JDBC URL or Config URL.
Yes
STRING
No default value
If the connector parameter is set to
oceanbase
, the JDBC URL is used. If the connector parameter is set tooceanbase-ocj
, the Config URL is used.The URL must contain the MySQL database name or Oracle service name.
tableName
The name of the table in the database.
Yes
STRING
No default value
N/A.
maxRetryTimes
The maximum number of retries for writing data to the table.
No
INTEGER
3
N/A.
poolInitialSize
The initial size of the database connection pool.
No
INTEGER
1
N/A.
poolMaxActive
The maximum number of connections in the database connection pool.
No
INTEGER
8
N/A.
poolMaxWait
The maximum wait time for obtaining a connection from the database connection pool.
No
INTEGER
2000
Unit: milliseconds.
poolMinIdle
The minimum number of idle connections in the database connection pool.
No
INTEGER
1
N/A.
connectionProperties
The JDBC connection properties.
No
STRING
No default value
The value is in the k1=v1;k2=v2;k3=v3 format.
ignoreDelete
Specifies whether to ignore delete operations.
No
BOOLEAN
false
N/A.
excludeUpdateColumns
The names of the columns that you want to exclude. If you configure this parameter, the excluded columns are not updated when you perform update operations.
No
STRING
No default value
Separate multiple column names with commas (,). Example:
excludeUpdateColumns=column1,column2
.NoteThe columns that are not updated when you perform update operations include the primary key column and the columns specified by this parameter.
partitionKey
The partition key column.
No
STRING
No default value
If you specify a partition key, the connector groups data based on the partition key. Then, data of each group is separately written to the database. The grouping operation based on the partition key is performed earlier than the grouping operation based on the grouping rule specified by the modRule parameter.
modRule
The grouping rule.
No
STRING
No default value
The grouping rule must be in the format of column name mod number. Data in the specified column must be of the NUMERIC type. If you configure a grouping rule, data is grouped based on the calculated results. Then, the data of each group is separately written to the database. The grouping operation based on the grouping rule is performed later than the grouping operation based on the partition key specified by the partitionKey parameter.
bufferSize
The buffer size.
No
INTEGER
1000
N/A.
flushIntervalMs
The interval at which the cache is cleared. The value of this parameter indicates that if the number of input data records does not reach the value specified by the batchSize parameter within a specific period of time, all cached data is written to the result table.
No
LONG
1000
N/A.
retryIntervalMs
The maximum timeout period for a retry.
No
INTEGER
5000
Unit: milliseconds.
Data type mappings
OceanBase database in MySQL-compatible mode
Data type of OceanBase
Data type of Realtime Compute for Apache Flink
TINYINT
TINYINT
SMALLINT
SMALLINT
TINYINT UNSIGNED
INT
INT
MEDIUMINT
SMALLINT UNSIGNED
BIGINT
BIGINT
INT UNSIGNED
BIGINT UNSIGNED
DECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLE
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
NoteThe value of p is less than or equal to 38.
DECIMAL(p, s)
BOOLEAN
BOOLEAN
TINYINT(1)
DATE
DATE
TIME [(p)]
TIME [(p)] [WITHOUT TIME ZONE]
DATETIME [(p)]
TIMESTAMP [(p)] [WITHOUT TIME ZONE]
TIMESTAMP [(p)]
CHAR(n)
CHAR(n)
VARCHAR(n)
VARCHAR(n)
BIT(n)
BINARY(⌈n/8⌉)
BINARY(n)
BINARY(n)
VARBINARY(N)
VARBINARY(N)
TINYTEXT
STRING
TEXT
MEDIUMTEXT
LONGTEXT
TINYBLOB
BYTES
ImportantFlink supports only data records of the BLOB type that are less than or equal to 2,147,483,647(2^31 - 1).
BLOB
MEDIUMBLOB
LONGBLOB
OceanBase database in Oracle-compatible mode
Data type of OceanBase
Data type of Realtime Compute for Apache Flink
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
FLOAT
BINARY_FLOAT
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)]
CHAR(n)
STRING
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
BLOB
BYTES
ROWID
Sample code
Sample code for a source table or result table
CREATE TEMPORARY TABLE oceanbase_source ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'scan.startup.mode' = 'initial', 'username' = 'user', 'password' = 'password', 'tenant-name' = 'tenant', 'database-name' = '^test_db$', 'table-name' = '^orders$', 'hostname' = '11.22.33.44', 'port' = '2883', 'config-url' = 'http://11.22.33.44:55/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx', 'logproxy.host' = '11.22.33.44', 'logproxy.port' = '2983', 'working-mode' = 'memory' ); -- Create a result table for which the connector parameter is set to oceanbase. CREATE TEMPORARY TABLE oceanbase_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '<yourPassword>', 'tableName' = '<yourTableName>' ); -- Create a result table for which the connector parameter is set to oceanbase-ocj. CREATE TEMPORARY TABLE oceanbase_ocj_sink ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase-ocj', 'url' = '<yourConfigUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'databaseName' = '<yourDatabaseName>', 'tableName' = '<yourTableName>' ); BEGIN STATEMENT SET; INSERT INTO oceanbase_sink SELECT * FROM oceanbase_source; INSERT INTO oceanbase_ocj_sink SELECT * FROM oceanbase_source; END;
Sample code for an OceanBase dimension table
CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE oceanbase_dim ( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector' = 'oceanbase', 'url' = '<yourJdbcUrl>', 'userName' = '<yourUserName>', 'password' = '${secret_values.password}', 'tableName' = '<yourTableName>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN oceanbase_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
References
For more information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.