Data Transmission Service (DTS) enables you to select a storage format when you synchronize or migrate data to a message queue, such as Kafka or RocketMQ. This topic describes the data formats to help you parse the data.
Data storage formats
DTS supports the following three storage formats for data written to a message queue:
DTS Avro: A data serialization format that converts data structures or objects into a format that is easy to store or transmit.
Shareplex Json: When the data replication software SharePlex reads data from a source database and writes the data to a message queue, the data is stored in Shareplex Json format.
Canal Json: Canal parses incremental logs from a database and transmits the incremental data to a message queue. The data is stored in Canal Json format.
DTS Avro
You must parse the data based on the DTS Avro schema definition. For more information, see DTS Avro schema definition and DTS Avro deserialization example.
In DTS Avro format, DDL statements are of the String type.
Shareplex Json
Parameter Description
Parameter | Description |
| The time when the transaction was committed in the database. The format is yyyy-MM-ddTHH:mm:ssZ (UTC). |
| The ID of the user who committed the transaction. |
| The data operation type. Valid values include INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, and UPDATE AFTER. |
| System Change Number (SCN). It identifies the version of a transaction committed at a specific time in the database. Each committed transaction is assigned a unique SCN. |
| A relatively unique address value used to locate a record in the database. |
| The transaction ID. |
| The ordinal number of the operation within the transaction. The value starts from 1. |
| The total number of operations in the transaction. |
| The table name. |
| The index of the operation within the transaction. The format is |
| The time when the transaction was committed to the destination database. |
Examples
Insert data
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}Update data
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}Delete data
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}Canal Json
If you enable Split message delivery after partition key update, Kafka delivers a DELETE message and an INSERT message when a partition key is modified. Kafka selects the partition for each message based on its respective partition key value.
Example: The partition key id has a value of 1. The message is delivered to partition-1. The following list describes the differences before and after you enable Split message delivery after partition key update.
Enabled: When you run the
UPDATE SET id = 2 WHERE id = 1command in the source database, aDELETEmessage withid=1is delivered topartition-1, and anINSERTmessage withid=2is delivered topartition-2.Disabled: Only one
UPDATEmessage is delivered topartition-1. TheUPDATEoperation selects the partition for delivery based on the value before the change.
Metric descriptions
Parameter | Description |
| The database name. |
| The time when the operation was executed in the source database. This is a 13-digit UNIX timestamp in milliseconds. Note
|
| The serial number of the operation. Note This is generated from the timestamp and an internal DTS offset. It can help you determine the order of records. |
| Specifies whether the operation is a DDL operation.
|
| The data type of the field. Note Parameters for data types, such as precision, are not supported. |
| The data before or after the change. Note For synchronization or migration instances created before March 20, 2022, the value of old is the data after the change, and the value of |
| The primary key name. |
| The SQL statement. |
| The transformed field type. The value is the same as the value of dataTypeNumber. For more information, see Mapping between field types and dataTypeNumber values. |
| The table name. |
| The time when the operation started to write data to the destination database. This is a 13-digit UNIX timestamp in milliseconds. Note You can use a search engine to find a UNIX timestamp conversion tool. |
| The operation type, such as DELETE, UPDATE, or INSERT. Note For full data synchronization or migration tasks, the value is fixed at INIT. |
| Global Transaction Identifier (GTID). A GTID is globally unique. Each transaction corresponds to one GTID. |
Examples
Update data
For synchronization or migration instances created before March 20, 2022, when a DELETE statement from a source table is synchronized or migrated to Kafka, the old field contains the data and the data field is null. To align with the open source community, for instances created or restarted on or after March 20, 2022, the data field contains the data and the old field is null.
Synchronization or migration instances created before March 20, 2022
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}Synchronization or migration instances created or restarted on or after March 20, 2022
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL operation
{
"database":"dbname", // The name of the database for synchronization or migration.
"es":1600161894000, // The time when the source data was written to the binary log.
"id":58, // The offset in the DTS cache.
"isDdl":true, // Specifies whether to synchronize or migrate DDL statements.
"sql":"eg:createxxx", // The DDL statement from the binary log.
"table":"tablename", // The name of the table for synchronization or migration.
"ts":1600161894771, // The time when DTS wrote the data to the destination.
"type":"DDL"
}