DTS支持选择迁移或同步到Kafka集群的数据存储格式,本文为您介绍数据格式的定义说明,方便您根据定义解析数据。
数据存储格式
DTS支持将写入至Kafka集群的数据存储为如下三种格式:
DTS Avro:一种数据序列化格式,可以将数据结构或对象转化成便于存储或传输的格式。
Shareplex Json:数据复制软件Shareplex读取源库中的数据,将数据写入至Kafka集群时,数据存储格式为Shareplex Json。
Canal Json:Canal解析数据库增量日志,并将增量数据传输至Kafka集群,数据存储格式为Canal Json。
DTS Avro
DTS Avro为默认存储格式。您需要根据DTS Avro的schema定义进行数据解析,schema定义详情请参见迁移或同步至Kafka集群中的数据均DTS Avro的schema定义。
DTS Avro格式中的DDL语句为String类型。
Shareplex Json
表 1. 参数说明
参数 | 说明 |
| 数据库中事务的提交时间,格式为yyyy-MM-ddTHH:mm:ssZ(UTC时间)。 |
| 提交事务的用户ID。 |
| 数据操作类型,包括INSERT, UPDATE, DELETE, TRUNCATE, DROP COLUMN, UPDATE BEFORE, UPDATE AFTER。 |
| 系统变化编号SCN(System Change Number),用以标识数据库在某个确切时刻提交事务的版本。每个已提交的事务分配一个唯一的SCN。 |
| 用于定位数据库中一条记录的一个相对唯一地址值。 |
| 事务ID。 |
| 事务内部的操作序号,从1开始记录。 |
| 事务内部的操作总数。 |
| 表名。 |
| 事务内部操作的索引,格式为 |
| 事务提交至目标库的时间。 |
示例如下:
插入数据:
{ "meta": { "time": "2017-06-16T14:24:34", "userid": 84, "op": "ins", "scn": "14589063118712", "rowid": "AAATGpAAIAAItcIAAA", "trans": "7.0.411499", "seq": 1, "size": 11, "table": "CL_BIZ1.MIO_LOG", "idx": "1/11", "posttime": "2017-06-16T14:33:52" }, "data": { "MIO_LOG_ID": "32539737" } }
更新数据:
{ "meta": { "time": "2017-06-16T15:38:13", "userid": 84, "op": "upd", "table": "CL_BIZ1.MIO_LOG" …. }, "data": { "CNTR_NO": "1171201606" }, "key": { "MIO_LOG_ID": "32537893", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CNTR_NO": "1171201606syui26" } }
删除数据:
{ "meta": { "time": "2017-06-16T15:51:35", "userid": 84, "op": "del", }, "data": { "MIO_LOG_ID": "32539739", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CG_NO": null } }
Canal Json
表 2. 参数说明
参数 | 说明 |
| 数据库名称。 |
| 操作在源库的执行时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可通过搜索引擎获取。 |
| 操作的序列号。 |
| 是否是DDL操作。
|
| 字段的数据类型。 说明 不支持精度等数据类型的参数信息。 |
| 变更前或变更后的数据。 说明 2022年3月20日之前创建的DTS订阅实例, |
| 主键名称。 |
| SQL语句。 |
| 以Canal Json格式记录的列的数据类型。更多信息,请参见SQL Type field。 |
| 表名。 |
| 操作开始写入到目标库的时间,13位Unix时间戳,单位为毫秒。 说明 Unix时间戳转换工具可通过搜索引擎获取。 |
| 操作的类型,比如DELETE、UPDATE、INSERT。 说明 全量任务阶段固定为INIT。 |
| 全局事务标识GTID(Global Transaction IDentifier),具有全局唯一性,一个事务对应一个GTID。 |
更新数据的示例如下:
2022年3月20日之前创建的DTS订阅实例,源表的DELETE
语句同步到kafka,其中old
的值是数据,data
的值是null。为了和开源社区保持一致,2022年3月20日起创建或重启的DTS订阅实例,data
的值是数据,old
的值是null。
2022年3月20日之前创建的DTS订阅实例
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
2022年3月20日起创建或重启的DTS订阅实例
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL操作示例如下:
{
"database":"dbname",表示同步的数据库名称
"es":1600161894000,表示源库数据写入到binlog的时间
"id":58,DTS缓存的偏移量
"isDdl":true,是否同步DDL
"sql":"eg:createxxx",Binlog的DDL语句
"table":"tablename",同步的表名
"ts":1600161894771,DTS将数据写入目标的时间
"type":"DDL"
}