DTS支援選擇遷移或同步到Kafka叢集的資料存放區格式,本文為您介紹資料格式的定義說明,方便您根據定義解析資料。
資料存放區格式
DTS支援將寫入至Kafka叢集的資料存放區為如下三種格式:
DTS Avro:一種資料序列化格式,可以將資料結構或對象轉化成便於儲存或傳輸的格式。
Shareplex Json:資料複製軟體Shareplex讀取源庫中的資料,將資料寫入至Kafka叢集時,資料存放區格式為Shareplex Json。
Canal Json:Canal解析資料庫增量日誌,並將增量資料轉送至Kafka叢集,資料存放區格式為Canal Json。
DTS Avro
DTS Avro為預設儲存格式。您需要根據DTS Avro的schema定義進行資料解析,schema定義詳情請參見遷移或同步至Kafka叢集中的資料均DTS Avro的schema定義。
DTS Avro格式中的DDL語句為String類型。
Shareplex Json
表 1. 參數說明
參數 | 說明 |
| 資料庫中事務的提交時間,格式為yyyy-MM-ddTHH:mm:ssZ(UTC時間)。 |
| 提交事務的使用者ID。 |
| 資料操作類型,包括INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, UPDATE AFTER。 |
| 系統變化編號SCN(System Change Number),用以標識資料庫在某個確切時刻提交事務的版本。每個已提交的事務分配一個唯一的SCN。 |
| 用於定位元據庫中一條記錄的一個相對唯一地址值。 |
| 事務ID。 |
| 事務內部的操作序號,從1開始記錄。 |
| 事務內部的操作總數。 |
| 表名。 |
| 事務內部操作的索引,格式為 |
| 事務提交至目標庫的時間。 |
樣本如下:
插入資料:
{ "meta": { "time": "2017-06-16T14:24:34", "userid": 84, "op": "ins", "scn": "14589063118712", "rowid": "AAATGpAAIAAItcIAAA", "trans": "7.0.411499", "seq": 1, "size": 11, "table": "CL_BIZ1.MIO_LOG", "idx": "1/11", "posttime": "2017-06-16T14:33:52" }, "data": { "MIO_LOG_ID": "32539737" } }
更新資料:
{ "meta": { "time": "2017-06-16T15:38:13", "userid": 84, "op": "upd", "table": "CL_BIZ1.MIO_LOG" …. }, "data": { "CNTR_NO": "1171201606" }, "key": { "MIO_LOG_ID": "32537893", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CNTR_NO": "1171201606syui26" } }
刪除資料:
{ "meta": { "time": "2017-06-16T15:51:35", "userid": 84, "op": "del", }, "data": { "MIO_LOG_ID": "32539739", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CG_NO": null } }
Canal Json
表 2. 參數說明
參數 | 說明 |
| 資料庫名稱。 |
| 操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可通過搜尋引擎擷取。 |
| 操作的序號。 |
| 是否是DDL操作。
|
| 欄位的資料類型。 說明 不支援精度等資料類型的參數資訊。 |
| 變更前或變更後的資料。 說明 2022年3月20日之前建立的DTS訂閱執行個體, |
| 主鍵名稱。 |
| SQL語句。 |
| 以Canal Json格式記錄的列的資料類型。更多資訊,請參見SQL Type field。 |
| 表名。 |
| 操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。 說明 Unix時間戳記轉換工具可通過搜尋引擎擷取。 |
| 操作的類型,比如DELETE、UPDATE、INSERT。 說明 全量任務階段固定為INIT。 |
| 全域事務標識GTID(Global Transaction IDentifier),具有全域唯一性,一個事務對應一個GTID。 |
更新資料的樣本如下:
2022年3月20日之前建立的DTS訂閱執行個體,源表的DELETE
語句同步到kafka,其中old
的值是資料,data
的值是null。為了和開源社區保持一致,2022年3月20日起建立或重啟的DTS訂閱執行個體,data
的值是資料,old
的值是null。
2022年3月20日之前建立的DTS訂閱執行個體
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
2022年3月20日起建立或重啟的DTS訂閱執行個體
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL操作樣本如下:
{
"database":"dbname",表示同步的資料庫名稱
"es":1600161894000,表示源庫資料寫入到binlog的時間
"id":58,DTS緩衝的位移量
"isDdl":true,是否同步DDL
"sql":"eg:createxxx",Binlog的DDL語句
"table":"tablename",同步的表名
"ts":1600161894771,DTS將資料寫入目標的時間
"type":"DDL"
}