When you use Data Transmission Service (DTS) to migrate or synchronize data to a Kafka cluster, you can specify the format in which data is stored in the Kafka cluster. This topic describes the formats that you can use to store data in a Kafka cluster. You can parse data based on the definition of these data formats.
Data formats
DTS allows you to store data into a Kafka cluster in one of the following formats:
DTS Avro: a data serialization format into which data structures or objects can be converted to facilitate storage and transmission.
Shareplex Json: the format in which the data read from a source database by using the data replication software SharePlex is stored.
Canal Json: the format in which data is stored in a Kafka cluster after Canal parses the logs about the incremental data of the source database and transmits the incremental data to the Kafka cluster.
DTS Avro
DTS Avro is the default data format. By default, data that is migrated or synchronized to a Kafka cluster by using DTS is stored in the DTS Avro format. You must parse the data based on the schema definition of DTS Avro. For more information, see subscribe_example at GitHub.
In the DTS Avro format, the DDL statements are of the STRING type.
Shareplex Json
Parameter | Description |
| The UTC time when the transaction in the database is committed. The value of this parameter is in the yyyy-MM-ddTHH:mm:ssZ format. |
| The ID of the user who commits the transaction. |
| The operation type. Valid values: INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, and UPDATE AFTER. |
| The system change number (SCN) that identifies the version of the transaction that the database commits at a specific point in time. Each committed transaction is assigned a unique SCN. |
| A relatively unique address value that is used to identify a record in the database. |
| The ID of the transaction. |
| The sequence number of the operation in the transaction. The number starts from 1. |
| The total number of operations in the transaction. |
| The name of the table. |
| The index of the operation in the transaction, in the |
| The time when the transaction is committed to the destination database. |
Examples:
Data inserted
{ "meta": { "time": "2017-06-16T14:24:34", "userid": 84, "op": "ins", "scn": "14589063118712", "rowid": "AAATGpAAIAAItcIAAA", "trans": "7.0.411499", "seq": 1, "size": 11, "table": "CL_BIZ1.MIO_LOG", "idx": "1/11", "posttime": "2017-06-16T14:33:52" }, "data": { "MIO_LOG_ID": "32539737" } }
Data updated
{ "meta": { "time": "2017-06-16T15:38:13", "userid": 84, "op": "upd", "table": "CL_BIZ1.MIO_LOG" ... }, "data": { "CNTR_NO": "1171201606" }, "key": { "MIO_LOG_ID": "32537893", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CNTR_NO": "1171201606syui26" } }
Data deleted
{ "meta": { "time": "2017-06-16T15:51:35", "userid": 84, "op": "del", }, "data": { "MIO_LOG_ID": "32539739", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CG_NO": null } }
Canal Json
Parameter | Description |
| The name of the database. |
| The time when the operation is performed in the database. The value is a 13-bit UNIX timestamp. Unit: millisecond. Note You can use a search engine to obtain a UNIX timestamp converter. |
| The serial number of the operation. |
| Indicates whether the operation is a DDL operation.
|
| The data type of the field. |
| The data before and after update. Note For change tracking instances that are created before March 20, 2022, the value of |
| The name of the primary key. |
| The SQL statement. |
| The converted field type. For example, the type LONG is converted from UNSIGNED INTEGER and BIGDECIMAL from UNSIGNED LONG. |
| The name of the table. |
| The time when the operation starts to be performed in the destination database. The value is a 13-bit UNIX timestamp. Unit: millisecond. Note You can use a search engine to obtain a UNIX timestamp converter. |
| The operation type. Valid values include DELETE, UPDATE, and INSERT. |
| The global transaction identifier(GTID) that identifies a transaction. Each transaction is assigned a globally unique GTID. |
Examples of data updated
For change tracking instances that are created before March 20, 2022, after the DELETE
statements of the source table are synchronized, the value of old
is data and the value of data
is NULL. To keep consistent with the open source community, the value of data
is data and the value of old
is NULL for change tracking instances that are created or restarted from March 20, 2022.
Change tracking instances that are created before March 20, 2022
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"shipping_type": "varchar(50)"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
Change tracking instances that are created or restarted from March 20, 2022
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"shipping_type": "varchar(50)"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
Example of a DDL operation
{
"database":"dbname", the name of the source database.
"es":1600161894000, the time when the data in the source database is written to the binary logs.
"id":58, the offset of the DTS cache.
"isDdl":true, specifies whether to synchronize DDL operations.
"sql":"eg:createxxx", the DDL statements recorded in the binary logs.
"table":"tablename", the name of the source table.
"ts":1600161894771, the time when DTS writes data to the destination database.
"type":"DDL"
}