全部產品
Search
文件中心

Data Transmission Service:Kafka叢集的資料存放區格式

更新時間:Jul 06, 2024

DTS支援選擇遷移或同步到Kafka叢集的資料存放區格式,本文為您介紹資料格式的定義說明,方便您根據定義解析資料。

資料存放區格式

DTS支援將寫入至Kafka叢集的資料存放區為如下三種格式:

  • DTS Avro:一種資料序列化格式,可以將資料結構或對象轉化成便於儲存或傳輸的格式。

  • Shareplex Json:資料複製軟體Shareplex讀取源庫中的資料,將資料寫入至Kafka叢集時,資料存放區格式為Shareplex Json。

  • Canal JsonCanal解析資料庫增量日誌,並將增量資料轉送至Kafka叢集,資料存放區格式為Canal Json。

DTS Avro

DTS Avro為預設儲存格式。您需要根據DTS Avro的schema定義進行資料解析,schema定義詳情請參見遷移或同步至Kafka叢集中的資料均DTS Avro的schema定義

說明

DTS Avro格式中的DDL語句為String類型。

Shareplex Json

表 1. 參數說明

參數

說明

time

資料庫中事務的提交時間,格式為yyyy-MM-ddTHH:mm:ssZ(UTC時間)。

userid

提交事務的使用者ID。

op

資料操作類型,包括INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, UPDATE AFTER。

scn

系統變化編號SCN(System Change Number),用以標識資料庫在某個確切時刻提交事務的版本。每個已提交的事務分配一個唯一的SCN。

rowid

用於定位元據庫中一條記錄的一個相對唯一地址值。

trans

事務ID。

seq

事務內部的操作序號,從1開始記錄。

size

事務內部的操作總數。

table

表名。

idx

事務內部操作的索引,格式為seq/size,例如1/11表示,在操作總數為11的事務內部,該操作的序號為1。

posttime

事務提交至目標庫的時間。

樣本如下:

  • 插入資料:

    {
        "meta": {
            "time": "2017-06-16T14:24:34", 
            "userid": 84,                                    
            "op": "ins",                                   
              "scn": "14589063118712",                  
              "rowid": "AAATGpAAIAAItcIAAA",      
            "trans": "7.0.411499",                 
            "seq": 1,                                          
            "size": 11,                                         
            "table": "CL_BIZ1.MIO_LOG",       
              "idx": "1/11",                                       
            "posttime": "2017-06-16T14:33:52"
        },
        "data": {
            "MIO_LOG_ID": "32539737"
         }
    }
  • 更新資料:

    {
        "meta": {
            "time": "2017-06-16T15:38:13",
            "userid": 84,
            "op": "upd",                             
            "table": "CL_BIZ1.MIO_LOG"
            ….
        },
        "data": {                                          
            "CNTR_NO": "1171201606"
        },
        "key": {                                            
            "MIO_LOG_ID": "32537893",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CNTR_NO": "1171201606syui26"
        }
    }
  • 刪除資料:

    {
        "meta": {
            "time": "2017-06-16T15:51:35",
            "userid": 84,
            "op": "del",                      
         },
        "data": {                                    
            "MIO_LOG_ID": "32539739",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CG_NO": null
         }
    }

Canal Json

表 2. 參數說明

參數

說明

database

資料庫名稱。

es

操作在源庫的執行時間,13位Unix時間戳記,單位為毫秒。

說明

Unix時間戳記轉換工具可用搜尋引擎擷取。

id

操作的序號。

isDdl

是否是DDL操作。

  • true:是。

  • false:否。

mysqlType

欄位的資料類型。

說明

不支援精度等資料類型的參數資訊。

olddata

變更前或變更後的資料。

說明

2022年3月20日之前建立的DTS訂閱執行個體,old的值是變更後的資料(預設包含所有列的資料,而不是只包含被修改列的資料),data的值是變更前的資料。為了和開源社區保持一致,2022年3月20日起建立或重啟的DTS訂閱執行個體,data的值是變更後的資料,old的值是變更前的資料。

pkNames

主鍵名稱。

sql

SQL語句。

sqlType

經轉換處理後的欄位類型,如unsigned int會被轉化為Long,unsigned long會被轉換為BigDecimal。

table

表名。

ts

操作開始寫入到目標庫的時間,13位Unix時間戳記,單位為毫秒。

說明

Unix時間戳記轉換工具可用搜尋引擎擷取。

type

操作的類型,比如DELETE、UPDATE、INSERT。

gtid

全域事務標識GTID(Global Transaction IDentifier),具有全域唯一性,一個事務對應一個GTID。

更新資料的樣本如下:

說明

2022年3月20日之前建立的DTS訂閱執行個體,源表的DELETE語句同步到kafka,其中old的值是資料,data的值是null。為了和開源社區保持一致,2022年3月20日起建立或重啟的DTS訂閱執行個體,data的值是資料,old的值是null。

2022年3月20日之前建立的DTS訂閱執行個體

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

2022年3月20日起建立或重啟的DTS訂閱執行個體

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL操作樣本如下:

{
    "database":"dbname",表示同步的資料庫名稱
    "es":1600161894000,表示源庫資料寫入到binlog的時間
    "id":58,DTS緩衝的位移量
    "isDdl":true,是否同步DDL
    "sql":"eg:createxxx",Binlog的DDL語句
    "table":"tablename",同步的表名
    "ts":1600161894771,DTS將資料寫入目標的時間
    "type":"DDL"
}