This topic describes the support of different topic types for synchronization of data changes caused by operations on a source table, sharding strategies for different topic types, data formats, and sample messages.
Support of different topic types for synchronization of data changes caused by operations on a source table
A topic is the smallest unit for data subscription and publication in DataHub. You can use topics to distinguish different types of streaming data. Two types of topics are supported: TUPLE and binary large object (BLOB).Topic type | Write DML messages to a topic | Write upstream heartbeat messages to a topic | Write DDL messages to a topic | Mapping mode for a source table and a topic | Data type |
---|---|---|---|---|---|
TUPLE | Supported | Not supported | Not supported | Single table to single topic | Data types supported by DataHub |
BLOB | Supported | Supported | Supported | Single database (multiple tables) to single topic | BLOB data |
- After you create a TUPLE topic, the fields that you define in the schema of the topic cannot be modified. Therefore, a TUPLE topic is suitable for scenarios in which the schema of the source table is fixed and DDL operations that change the schema of the table, such as ADD COLUMN and DROP COLUMN, are not performed on the source table. A TUPLE topic cannot synchronize DDL messages or heartbeat messages that are transmitted from a source table, and as a result, the DDL messages and heartbeat messages cannot be transparently transmitted to a destination. The mapping mode for source tables and topics is one-to-one mapping. If you want to write data of a large number of source tables to DataHub, you must create the same number of topics in DataHub. This does not facilitate the consumption of data by a destination.
- A BLOB topic does not have a schema and stores only BLOB data. A BLOB topic can synchronize DDL messages and heartbeat messages that are transmitted from a source table, and the DDL messages and heartbeat messages can be transmitted to a destination for consumption. The mapping mode for source tables and topics is that multiple tables in a database are mapped to a single topic. You need to create only a single topic for multiple source tables from which you want to read data. This facilitates the consumption of data by a destination. We recommend that you use a BLOB topic in scenarios in which DataHub is used as an intermediate message queue to migrate all data in a database.
Sharding strategies for different topic types
Shards are concurrent channels that are used for data transmission in a topic. The rate for a single shard to write messages to a topic is limited. You can add multiple shards to improve write performance. However, DataHub can ensure the order of message consumption only if a single shard is added. DataHub cannot ensure the order of message consumption between shards when multiple shards are added. The following table describes the sharding strategies for different topic types to achieve the following effects: More shards can be added to improve write performance, the order of message consumption between shards can be ensured, and data skew can be prevented.
Scenario | TUPLE | BLOB |
---|---|---|
A primary key exists. You can specify a custom primary key. | The messages that you want to write are sharded by the primary key. | The messages that you want to write are sharded by the primary key. |
The order of message consumption needs to be ensured. | The messages that have the same primary key value are orderly consumed. | The messages that have the same primary key value are orderly consumed. |
No primary key exists. | The messages that you want to write are sharded by a random column. | The messages that you want to write are sharded by table name. |
The order of message consumption needs to be ensured. | The order of message consumption cannot be ensured. | The messages that belong to the same table are orderly consumed. |
Data formats
- TUPLE
When you create a topic in Data Integration, specific metadata columns are automatically added.
_sequence_id_
,_excute_time_
,_source_table_
,_before_image_
, and_after_image_
are metadata columns.Parameter Description _sequence_id_ The unique ID of each message. The value of the parameter is of the STRING type and consists of digits. The ID of a message is the same for an update before operation and an update after operation. _excute_time_ The time at which data is generated. _source_table_ The name of the source table. _before_image_ The pre-image. The value of the parameter is Y for a delete operation or an update before operation. The value of the parameter is N for an insert operation or an update after operation. _after_image_ The post-image. The value of the parameter is N for a delete operation or an update before operation. The value of the parameter is Y for an insert operation or an update after operation. _sequence_id_ _operation_type_ _excute_time_ _before_image_ _after_image_ 1649991610688000000 I 1649991726000 N Y 1649991610688000001 U 1649991756000 Y N 1649991610688000001 U 1649991756000 N Y 1649991610688000002 D 1649991774000 Y N - BLOB
A message of a BLOB topic is binary data that is obtained by converting JSON-formatted strings. The following sample code shows JSON-formatted strings:
{ "schema": { // The metadata to which changes are made. Only column names and data types of the columns are specified. "dataColumn": [// The columns to which changes are made. The data in the destination table is automatically updated. { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "binData", "type": "BYTES" }, { "name": "ts", "type": "DATE" } ], "primaryKey": [ "pkName1", "pkName2" ], "source": { "dbType": "mysql", "dbVersion": "1.0.0", "dbName": "myDatabase", "schemaName": "mySchema", "tableName": "tableName" } }, "payload": { "before": { "dataColumn":{ "id": 111, "name":"scooter", "binData": "[base64 string]", "ts": 1590315269000 } }, "after": { "dataColumn":{ "id": 222, "name":"donald", "binData": "[base64 string]", "ts": 1590315269000 } }, "sequenceId":XXX// The unique sequence ID of each message that is generated after the incremental data and full data are merged. The value of the parameter is of the STRING type. "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT..."// The operation that is performed. The value of the parameter is case-sensitive. "timestamp": { "eventTime": 1,// Required. The time at which the operation is performed. The value of the parameter is a 13-digit timestamp in milliseconds. "systemTime": 2,// Optional. The system time that you must specify for specific data sources such as Oracle. "checkpointTime": 3// Optional. The checkpoint time that you must specify for specific data sources such as ApsaraDB for OceanBase. }, "ddl": { "text": "ADD COLUMN ...", "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]" } }, "version":"1.0.0" }
- Field descriptions for a BLOB topic
Notice The following data types are supported for the fields in a message: BOOLEAN, DOUBLE, DATE, BYTES, LONG, and STRING.
BOOLEAN: The valid values for a field of this data type are true and false. DATE: The value of a field of this data type is a 13-digit timestamp in milliseconds. BYTES: The value of a field of this data type is a Base64-encoded string. Base64 encoding and decoding are implemented by calling the API operations that are provided by java.util.Base64. String text = "Test text123"; // Encode Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) // Encode Base64.getDecoder().decode(encodedText)// Decode
Level-1 field Level-2 field Description schema dataColumn The names and data types of columns. The value is of the JSONArray type. dataColumn records the names and data types of columns that are updated in the source. A change operation can be data addition, deletion, or modification, or a table schema change in the source. - name: the name of the column.
- type: the data type of the column.
primaryKey The information about the primary key. The value is of the List type. pk: the name of the primary key.
source The information about the source or source table. The value is of the Object type. - dbType: the type of the source. The value is of the STRING type.
- dbVersion: the version of the source. The value is of the STRING type.
- dbName: the name of the source. The value is of the STRING type.
- schemaName: the name of the schema. You must specify this field for sources such as PostgreSQL and SQL Server. The value is of the STRING type.
- tableName: the name of the source table. The value is of the STRING type.
payload before The data before a change. The value is of the JSONObject type. For example, if a MySQL database is the source and data in this source is updated, the before field records the data before the update. - After a data update or deletion message is read from the source, the before field is specified in a write record.
- dataColumn: the column information. The value is of the JSONObject type. The field value is in the Column name:Column value format. The column name is a string. If the data type that is specified for the column is BYTES, the column value is a Base64-encoded string. If the data type that is specified for the column is DATE, the column value is a 13-digit timestamp of the LONG type.
after The data after a change. The after field records the data after a change in the same data format as that of the before field. Note This field is required when an update or an insert operation is performed.op The type of the operation. Valid values: - INSERT: inserts data.
- UPDATE_BEFOR: updates data (before).
- UPDATE_AFTER: updates data (after).
- DELETE: deletes data.
- TRANSACTION_BEGIN: starts a data source transaction.
- TRANSACTION_END: terminates a data source transaction.
- CREATE: creates a table in a data source.
- ALTER: modifies a table in a data source.
- QUERY: queries data changes in a database by executing the SQL statements that cause the data changes.
- TRUNCATE: removes all rows from a table in a data source.
- RENAME: renames a table in a data source.
- CINDEX: creates an index.
- DINDEX: deletes an index.
- MHEARTBEAT: a heartbeat message. The message indicates that a synchronization node runs as expected when no new data is generated in the source.
timestamp The timestamp of a data record. The value is of the JSONObject type. - eventTime: the time at which the data in the source changes. The value is a 13-digit timestamp in milliseconds and is of the LONG type.
- systemTime: the time at which the synchronization node reads the change message. The value is a 13-digit timestamp in milliseconds and is of the LONG type.
- checkpointTime: the specified time at which the synchronization offset is reset. The value is a 13-digit timestamp in milliseconds and is of the LONG type. In most cases, the value of this field is equal to the value of the eventTime field.
ddl This field is specified only if the schema of a table in the source is changed. The value is null when a DDL operation, such as data addition, deletion, or modification is performed in the source. - text: the text of a DDL statement in the source. The value is of the STRING type.
- ddlMeta: the binary data that is obtained by serializing SQL statement objects that
are generated after you use FastSQL to parse DDL statements. The binary data is stored
as Base64-encoded strings. The value of ddlMeta is of the STRING type.
After you enable DDL support, a destination deserializes the serialized SQL statement objects to restore the objects to DDL statements that can be executed in the topic.
version None The version of data in the JSON format. - Serialization description for data of BLOB data
In this topic, a message is mapped to a JSON object. A JSON object consists of key-value pairs. The values of the value parameter in each key-value pair can be JSON objects, JSON arrays, or values of a specific data type based on the message format.
The storage data type for each field in a JSON object is based on the preceding field descriptions. You can serialize a JSON object to convert it into a string. You can use the toJSONString method that is provided by Fastjson for conversion. Then, you can use the getBytes(Charsets.UTF_8) method provided by a string to convert specified character sets in the UTF-8 format into byte[].
- Field descriptions for a BLOB topic
Sample JSON data for a related message
- INSERT:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "INSERT", "after": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000004", "timestamp": { "eventTime": 1605339932000, "systemTime": 1605339932736, "checkpointTime": 1605339932000 } }, "version": "0.0.1" }
- UPDATE BEFORE:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_BEFOR", "before": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
- UPDATE AFTER:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_AFTER", "after": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
- DELETE:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "DELETE", "before": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000006", "timestamp": { "eventTime": 1605339937000, "systemTime": 1605339937671, "checkpointTime": 1605339937000 } }, "version": "0.0.1" }
- HEARTBEAT:
{ "schema": {}, "payload": { "op": "MHEARTBEAT", "timestamp": { "eventTime": 1605339953629, "checkpointTime": 1605339953629 } }, "version": "0.0.1" }
- DDL:
{ "schema": { "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_nopk" } }, "payload": { "op": "ALTER", "sequenceId": "1605339516000000035", "ddl": { "text": "alter table t_shiyu_nopk add column holo text", "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA==" }, "timestamp": { "eventTime": 1605342109000, "systemTime": 1605342109259, "checkpointTime": 1605342109000 } }, "version": "0.0.1" }