全部产品
Search
文档中心

数据传输服务 DTS:Kafka集群的数据存储格式

更新时间:Dec 12, 2024

DTS支持选择迁移或同步到Kafka集群的数据存储格式,本文为您介绍数据格式的定义说明,方便您根据定义解析数据。

数据存储格式

DTS支持将写入至Kafka集群的数据存储为如下三种格式:

  • DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。

  • Shareplex Json:数据复制软件Shareplex读取源库中的数据,将数据写入至Kafka集群时,数据存储格式为Shareplex Json。

  • Canal JsonCanal解析数据库增量日志,并将增量数据传输至Kafka集群,数据存储格式为Canal Json。

DTS Avro

DTS Avro为默认存储格式。您需要根据DTS Avro的schema定义进行数据解析,schema定义详情请参见迁移或同步至Kafka集群中的数据均DTS Avro的schema定义

说明

DTS Avro格式中的DDL语句为String类型。

Shareplex Json

表 1. 参数说明

参数

说明

time

数据库中事务的提交时间,格式为yyyy-MM-ddTHH:mm:ssZ(UTC时间)。

userid

提交事务的用户ID。

op

数据操作类型,包括INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, UPDATE AFTER。

scn

系统变化编号SCN(System Change Number),用以标识数据库在某个确切时刻提交事务的版本。每个已提交的事务分配一个唯一的SCN。

rowid

用于定位数据库中一条记录的一个相对唯一地址值。

trans

事务ID。

seq

事务内部的操作序号,从1开始记录。

size

事务内部的操作总数。

table

表名。

idx

事务内部操作的索引,格式为seq/size,例如1/11表示,在操作总数为11的事务内部,该操作的序号为1。

posttime

事务提交至目标库的时间。

示例如下:

  • 插入数据:

    {
        "meta": {
            "time": "2017-06-16T14:24:34", 
            "userid": 84,                                    
            "op": "ins",                                   
              "scn": "14589063118712",                  
              "rowid": "AAATGpAAIAAItcIAAA",      
            "trans": "7.0.411499",                 
            "seq": 1,                                          
            "size": 11,                                         
            "table": "CL_BIZ1.MIO_LOG",       
              "idx": "1/11",                                       
            "posttime": "2017-06-16T14:33:52"
        },
        "data": {
            "MIO_LOG_ID": "32539737"
         }
    }
  • 更新数据:

    {
        "meta": {
            "time": "2017-06-16T15:38:13",
            "userid": 84,
            "op": "upd",                             
            "table": "CL_BIZ1.MIO_LOG"
            ….
        },
        "data": {                                          
            "CNTR_NO": "1171201606"
        },
        "key": {                                            
            "MIO_LOG_ID": "32537893",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CNTR_NO": "1171201606syui26"
        }
    }
  • 删除数据:

    {
        "meta": {
            "time": "2017-06-16T15:51:35",
            "userid": 84,
            "op": "del",                      
         },
        "data": {                                    
            "MIO_LOG_ID": "32539739",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CG_NO": null
         }
    }

Canal Json

表 2. 参数说明

参数

说明

database

数据库名称。

es

操作在源库的执行时间,13位Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可用搜索引擎获取。

id

操作的序列号。

isDdl

是否是DDL操作。

  • true:是。

  • false:否。

mysqlType

字段的数据类型。

说明

不支持精度等数据类型的参数信息。

olddata

变更前或变更后的数据。

说明

2022年3月20日之前创建的DTS订阅实例,old的值是变更后的数据(默认包含所有列的数据,而不是只包含被修改列的数据),data的值是变更前的数据。为了和开源社区保持一致,2022年3月20日起创建或重启的DTS订阅实例,data的值是变更后的数据,old的值是变更前的数据。

pkNames

主键名称。

sql

SQL语句。

sqlType

经转换处理后的字段类型,如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal。

table

表名。

ts

操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。

说明

Unix时间戳转换工具可用搜索引擎获取。

type

操作的类型,比如DELETE、UPDATE、INSERT。

gtid

全局事务标识GTID(Global Transaction IDentifier),具有全局唯一性,一个事务对应一个GTID。

更新数据的示例如下:

说明

2022年3月20日之前创建的DTS订阅实例,源表的DELETE语句同步到kafka,其中old的值是数据,data的值是null。为了和开源社区保持一致,2022年3月20日起创建或重启的DTS订阅实例,data的值是数据,old的值是null。

2022年3月20日之前创建的DTS订阅实例

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

2022年3月20日起创建或重启的DTS订阅实例

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL操作示例如下:

{
    "database":"dbname",表示同步的数据库名称
    "es":1600161894000,表示源库数据写入到binlog的时间
    "id":58,DTS缓存的偏移量
    "isDdl":true,是否同步DDL
    "sql":"eg:createxxx",Binlog的DDL语句
    "table":"tablename",同步的表名
    "ts":1600161894771,DTS将数据写入目标的时间
    "type":"DDL"
}