This topic describes how to use the data transmission service to synchronize data from OceanBase Database to a Kafka instance.
Background
Kafka is a widely used high-performance distributed stream computing platform. The data transmission service supports real-time data synchronization between a self-managed Kafka instance and an Oracle or MySQL tenant of OceanBase Database, extending the message processing capability. Therefore, the data transmission service is widely applied to business scenarios such as real-time data warehouse building, data query, and report distribution.
Prerequisites
The data transmission service has the privilege to access cloud resources. For more information, see Grant privileges to roles for data transmission.
You have created a dedicated database user for data synchronization in the source OceanBase database and granted corresponding privileges to the user. For more information, see Create a database user.
Limitations
Only physical tables can be synchronized.
The data transmission service supports Kafka V0.9, V1.0, and V2.x.
During data synchronization, if you rename a source table to be synchronized and the new name is beyond the synchronization scope, the data of the source table will not be synchronized to the destination Kafka instance.
The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.
The data transmission service supports the migration of an object only when the following conditions are met: the database name, table name, and column name of the object are ASCII-encoded without special characters. The special characters are line breaks, spaces, and the following characters: . | " ' ` ( ) = ; / & \.
The data transmission service does not support a standby OceanBase database as the source.
Considerations
In a data synchronization task where the source is an OceanBase database and DDL synchronization is enabled, if a
RENAME
operation is performed on a table in the source database, we recommend that you restart the task to avoid data loss during incremental synchronization.During incremental synchronization from OceanBase Database V4.x, if the STORED attribute is not marked for a generated column, the column value synchronized to the destination is NULL. As a result, the data of this column received by the downstream system is not as expected.
Take note of the following items when an updated row contains a LOB column:
If the LOB column is updated, do not use the value stored in the LOB column before the
UPDATE
orDELETE
operation.The following data types are stored in LOB columns: JSON, GIS, XML, user-defined type (UDT), and TEXT such as LONGTEXT and MEDIUMTEXT.
If the LOB column is not updated, the value stored in the LOB column before and after the
UPDATE
orDELETE
operation is NULL.
If the clocks between nodes or between the client and the server are out of synchronization, the latency may be inaccurate during incremental synchronization.
For example, if the clock is earlier than the standard time, the latency can be negative. If the clock is later than the standard time, the latency can be positive.
When data transfer is resumed for a task, some data (within the last minute) may be duplicated in the Kafka instance. Therefore, deduplication is required in downstream systems.
When you synchronize data from an OceanBase database to a Kafka instance, if the statement for unique index creation fails the execution at the source, the Kafka instance consumes the DDL statements for unique index creation and deletion. If the downstream DDL statement for unique index creation fails the execution, ignore this exception.
If you select only Incremental Synchronization when you create a data synchronization task, the data transmission service requires that the local incremental logs in the source database be retained for at least 48 hours.
If you select Full Synchronization and Incremental Synchronization when you create a data synchronization task, the data transmission service requires that the local incremental logs in the source database be retained for at least seven days. Otherwise, the data synchronization task may fail or the data in the source and destination databases may be inconsistent because the data transmission service cannot obtain incremental logs.
Supported source and destination instance types
In the following table, OB_MySQL stands for a MySQL tenant of OceanBase Database, and OB_Oracle stands for an Oracle tenant of OceanBase Database.
Source | Destination |
OB_MySQL (OceanBase cluster instance) | Kafka instance (Kafka instance on Alibaba Cloud) |
OB_MySQL (OceanBase cluster instance) | Kafka instance (self-managed Kafka instance in a VPC) |
OB_MySQL (OceanBase cluster instance) | Kafka instance (Kafka instance in the public network) |
OB_MySQL (serverless instance) | Kafka instance (Kafka instance on Alibaba Cloud) |
OB_MySQL (serverless instance) | Kafka instance (self-managed Kafka instance in a VPC) |
OB_MySQL (serverless instance) | Kafka instance (Kafka instance in the public network) |
OB_Oracle (OceanBase cluster instance) | Kafka instance (Kafka instance on Alibaba Cloud) |
OB_Oracle (OceanBase cluster instance) | Kafka instance (self-managed Kafka instance in a VPC) |
OB_Oracle (OceanBase cluster instance) | Kafka instance (Kafka instance in the public network) |
Supported DDL
CREATE TABLE
ImportantThe created table must be a synchronization object. To execute the
CREATE TABLE
statement on a synchronized table, execute theDROP TABLE
statement on this table first.ALTER TABLE
DROP TABLE
TRUNCATE TABLE
NoteIn delayed deletion, the same transaction contains two identical
TRUNCATE TABLE
DDL statements. In this case, idempotence is implemented for downstream consumption.ALTER TABLE…TRUNCATE PARTITION
CREATE INDEX
DROP INDEX
COMMENT ON TABLE
RENAME TABLE
ImportantThe renamed table must be a synchronization object.
Procedure
Log on to the ApsaraDB for OceanBase console and purchase a data synchronization task.
For more information, see Purchase a data synchronization task.
Choose Data Transmission > Data Synchronization. On the page that appears, click Configuration for the data synchronization task.
If you want to reference the configurations of an existing task, click Reference Configuration. For more information, see Reference and clear the configuration of a data synchronization task.
On the Select Source and Destination page, configure the parameters.
Parameter
Description
Task Name
We recommend that you set it to a combination of digits and letters. It must not contain any spaces and cannot exceed 64 characters in length.
Tag (Optional)
Click the field and select a target tag from the drop-down list. You can also click Manage Tags to create, modify, and delete tags. For more information, see Use tags to manage data synchronization tasks.
Source
If you have created an OceanBase data source, select it from the drop-down list. If not, click New Data Source in the drop-down list and create one in the dialog box that appears on the right. For more information about the parameters, see Create an OceanBase data source.
ImportantThe source must not be an OceanBase Database tenant instance.
Destination
If you have created a Kafka data source, select it from the drop-down list. If not, click New Data Source in the drop-down list and create one in the dialog box that appears on the right. For more information, see Create a Kafka data source.
Click Next. On the Select Synchronization Type page, specify the synchronization types for the current data synchronization task.
The supported synchronization types are Full Synchronization and Incremental Synchronization. Options for Incremental Synchronization are DML Synchronization and DDL Synchronization. The supported DML operations are
INSERT
,DELETE
, andUPDATE
. You can select operations as needed. For more information, see Configure DDL/DML synchronization.Click Next. On the Select Synchronization Objects page, select the objects to be synchronized in the current data synchronization task.
You can use the Specify Objects or Match Rules option to specify synchronization objects. The following procedure describes how to specify synchronization objects by using the Specify Objects option. For information about the procedure for specifying synchronization objects by using the Match Rules option, see the "Configure matching rules for data migration/synchronization from a database to a Message Queue instance" section in the Configure matching rules topic.
NoteIf you selected DDL Synchronization in the Select Synchronization Type step, we recommend that you select synchronization objects by using the
Match Rules option. This ensures that all new objects meeting the matching rules are synchronized. If you selected synchronization objects by using theSpecify Objects option, new or renamed objects will not be synchronized.When you synchronize data from an OceanBase database to a Kafka instance, you can synchronize data from multiple tables to multiple topics.
In the Select Synchronization Objects section, select Specify Objects.
In the left-side pane, select the objects to be synchronized.
Click >.
Click the Existing Topics drop-down list in the Map Object to Topic dialog box and select the target topic.
Click OK.
The data transmission service allows you to import objects by using text. It also allows you to change the topics of the objects, set row filters, and remove a single object or all objects. Objects in the destination database are listed in the structure of Topic > Database > Table.
NoteWhen you select Match Rules to specify synchronization objects, object renaming is implemented based on the syntax of the specified matching rules. In the operation area, you can only set filtering conditions and select sharding columns and the columns to be synchronized. For more information, see Configure matching rules.
Operation
Description
Import objects
In the list on the right, click Import Objects in the upper-right corner.
In the dialog box that appears, click OK.
ImportantThis operation will overwrite previous selections. Proceed with caution.
In the Import Synchronization Objects dialog box, import the objects to be synchronized. You can import CSV files to set row filter conditions, filter columns, and sharding columns. For more information, see Download and import the settings of synchronization objects.
Click Validate.
After the validation succeeds, click OK.
Change topics
The data transmission service allows you to change topics for objects in the destination database. For more information, see Change topics.
Configure settings
You can use the
WHERE
clause to filter data by row and select sharding columns and the columns to be synchronized.In the Settings dialog box, you can perform the following operations:
In the Row Filters section, specify a standard SQL
WHERE
clause to filter data by row. For more information, see Use SQL conditions to filter data.Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional.
Unless otherwise specified, select the primary keys as sharding columns. If the primary keys are not load-balanced, select load-balanced fields with unique identifiers as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:
Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the destination table supports concurrent writes.
Orderliness: The data transmission service ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.
Remove one or all objects
The data transmission service allows you to remove a single object or all migration objects that are added to the right-side list during data mapping.
Remove a single synchronization object
In the list on the right, move the pointer over the object that you want to remove, and click Remove to remove the synchronization object.
Remove all synchronization objects
In the list on the right, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.
Click Next. On the Synchronization Options page, configure the parameters.
Full synchronization
The following table describes the parameters for full synchronization, which are displayed only if you have selected Full Synchronization on the Select Synchronization Type page.
Parameter
Description
Read Concurrency Configuration
The concurrency for reading data from the source during full synchronization. The maximum value is 512. A high concurrency may incur excessive stress on the source, thereby affecting the business.
Write Concurrency Configuration
The concurrency for writing data to the destination during full synchronization. The maximum value is 512. A high write concurrency may incur excessive stress on the destination, affecting the business.
Full Synchronization Rate Limit
You can choose whether to limit the full synchronization rate as needed. If you choose to limit the full synchronization rate, you must specify the records per second (RPS) and bytes per second (BPS). The RPS specifies the maximum number of data rows synchronized to the destination per second during full synchronization, and the BPS specifies the maximum amount of data in bytes synchronized to the destination per second during full synchronization.
NoteThe RPS and BPS values specified here are only for throttling. The actual full synchronization performance is subject to factors such as the settings of the source and destination and the instance specifications.
Incremental synchronization
The following table describes the incremental synchronization parameters, which are displayed only when you have selected Incremental Synchronization on the Select Synchronization Type page.
Parameter
Description
Write Concurrency Configuration
The concurrency for writing data to the destination during incremental synchronization. The maximum value is 512. A high write concurrency may incur excessive stress on the destination, affecting the business.
Incremental Synchronization Rate Limit
You can choose whether to limit the incremental synchronization rate as needed. If you choose to limit the incremental synchronization rate, you must specify the RPS and BPS. The RPS specifies the maximum number of data rows synchronized to the destination per second during incremental synchronization, and the BPS specifies the maximum amount of data in bytes synchronized to the destination per second during incremental synchronization.
NoteThe RPS and BPS values specified here are only for throttling. The actual incremental synchronization performance is subject to factors such as the settings of the source and destination and the instance specifications.
Incremental Synchronization Start Timestamp
This parameter will not be displayed if you have selected Full Synchronization.
If you have selected Incremental Synchronization but not Full Synchronization, specify a point in time after which data is to be synchronized. The default value is the current system time. For more information, see Set an incremental synchronization timestamp.
Advanced parameters
Parameter
Description
Serialization Method
The message format for synchronizing data to the destination Kafka instance. Valid values are Default, Canal, DataWorks (version 2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro. For more information, see Data formats.
ImportantOnly MySQL tenants of OceanBase Database support Debezium, DebeziumFlatten, DebeziumSmt, and Avro.
If the message format is set to DataWorks, DDL operations
COMMENT ON TABLE
andALTER TABLE…TRUNCATE PARTITION
cannot be synchronized.
Partitioning Rules
The rule for synchronizing data from an OceanBase database to a Kafka topic. OMS Community Edition supports Hash, Table, and One. For more information about the delivery of DDL statements in different scenarios and examples, see the description below.
Hash indicates that the data transmission service uses a hash algorithm to select the partition of a Kafka topic based on the value of the primary key or sharding column.
Table indicates that the data transmission service delivers all data in a table to the same partition and uses the table name as the hash key.
One indicates that JSON messages are delivered to a partition under a topic to ensure ordering.
Business System Identification (Optional)
Identifies the source business system of data. This parameter is displayed only when you select DataWorks for Serialization Method. The business system identifier consists of 1 to 20 characters.
The following table describes the delivery of a DDL statement in different scenarios.
Partitioning rule
DDL statement involving multiple tables
(example: RENAME TABLE)
DDL statement involving unknown tables
(example: DROP INDEX)
DDL statement involving a single table
Hash
The DDL statement is delivered to all partitions of the topics associated with the involved tables.
Assume that the DDL statement involves three tables: A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to all partitions of Topic 1 and Topic 2.
The DDL statement is delivered to all partitions of all topics of the current task.
Assume that the DDL statement cannot be identified by the data transmission service. If the current task has three topics, the DDL statement is delivered to all partitions of these three topics.
The DDL statement is delivered to all partitions of the topic associated with the table.
Table
The DDL statement is delivered to specific partitions of the topics associated with the tables. The partitions correspond to the hash values of the names of involved tables.
Assume that the DDL statement involves three tables: A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to the partitions corresponding to the hash values of the involved table names in Topic 1 and Topic 2.
The DDL statement is delivered to all partitions of all topics of the current task.
Assume that the DDL statement cannot be identified by the data transmission service. If the current task has three topics, the DDL statement is delivered to all partitions of these three topics.
The DDL statement is delivered to a partition of the topic associated with the table.
One
The DDL statement is delivered to a fixed partition of the topics associated with the tables.
Assume that the DDL statement involves three tables: A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to a fixed partition of Topic 1 and Topic 2.
The DDL statement is delivered to a fixed partition of all topics of the current task.
Assume that the DDL statement cannot be identified by the data transmission service. If the current task has three topics, the DDL statement is delivered to a fixed partition of these three topics.
The DDL statement is delivered to a fixed partition of the topic associated with the table.
Click Precheck.
During the precheck, the data transmission service detects the connection with the destination Kafka instance. If an error is returned during the precheck, you can perform the following operations:
Identify and troubleshoot the problem and then perform the precheck again.
Click Skip in the Actions column of the failed precheck item. In the dialog box that prompts the consequences of the operation, click
OK .
After the precheck succeeds, click Start Task.
If you do not need to start the task now, click Save. You can manually start the task on the Synchronization Tasks page or by performing batch operations later. For more information about batch operations, see Perform batch operations on data synchronization tasks.
The data transmission service allows you to modify the synchronization objects when a synchronization task is running. For more information, see View and modify synchronization objects and their filter conditions. After the data synchronization task is started, it will be executed based on the selected synchronization types. For more information, see View details of a data synchronization task.
If the data synchronization task encounters an execution exception due to a network failure or slow startup of processes, you can click Recover on the Synchronization Tasks or Details page of the synchronization task.