本文介绍实时数据订阅功能的数据消费格式定义说明和示例,默认格式为Debezium Format V2.0。
数据消费定义说明
数据消费格式如下代码,字段说明如下表所示。
{
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"name": "Jane"
},
"after": {
"id": 1004,
"name": "Anne"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
},
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "before"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "after"
}, {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": false,
"field": "namespace"
}, {
"type": "string",
"optional": false,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}
],
"optional": false,
"field": "source"
}
],
"optional": false
}
}
Field name | 描述 |
payload.op |
|
payload.ts_ms | 表示写入Kafka的Unix时间戳。 |
payload.before | 表示导出整行数据更新前的值。 |
payload.after | 表示导出整行数据的最新值。 |
payload.source | 表示操作的额外信息,支持额外添加。
|
schema | 根据payload内容自动生成,对整个JSON的结构及所有字段类型进行说明。默认都包括schema信息。整个schema的结构是递归的。
|
HBase表的订阅格式跟SQL表一致,但在结构上存在以下两点不同:
HBase表在数据库中存储的是原始的二进制数据,通过数据订阅消费到的数据是对二进制数据进行Base64编码后的字符串。
HBase表存在列族的概念,因此非主键列的列名格式为
列族_列名
,主键列的列名固定为ROW
。
数据消费示例
SQL表
在Lindorm数据库中创建如下Schema。
CREATE TABLE customers (id VARCHAR,first_name VARCHAR,last_name VARCHAR, PRIMARY KEY(id));
插入数据的数据消费示例。
{ "schema": {}, "payload": { "op": "c", "ts_ms": 1465491411815, "before": null, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
更新数据的数据消费示例。
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
删除一行数据的数据消费示例。
{ "schema": {}, "payload": { "op": "d", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": null, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
删除列的数据消费示例。
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne Marie" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
HBase表
向HBase表中插入一条数据:
Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);
对应的变更消息为:
{
"schema": {},
"payload": {
"op": "c",
"ts_ms": 1725258859839,
"before": null,
"after": {
"ROW": "dXNlcjE=",
"f_name": "bHVja3k="
},
"source": {
"version": "v2.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1725258833727
}
}
}