本文介紹即時資料訂閱功能的資料消費格式定義說明和樣本,預設格式為Debezium Format V2.0。
資料消費定義說明
資料消費格式如下代碼,欄位說明如下表所示。
{
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"name": "Jane"
},
"after": {
"id": 1004,
"name": "Anne"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
},
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "before"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "after"
}, {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": false,
"field": "namespace"
}, {
"type": "string",
"optional": false,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}
],
"optional": false,
"field": "source"
}
],
"optional": false
}
}
Field name | 描述 |
payload.op |
|
payload.ts_ms | 表示寫入Kafka的Unix時間戳記。 |
payload.before | 表示匯出整行資料更新前的值。 |
payload.after | 表示匯出整行資料的最新值。 |
payload.source | 表示操作的額外資訊,支援額外添加。
|
schema | 根據payload內容自動產生,對整個JSON的結構及所有欄位類型進行說明。預設都包括schema資訊。整個schema的結構是遞迴的。
|
HBase表的訂閱格式跟SQL表一致,但在結構上存在以下兩點不同:
HBase表在資料庫中儲存的是原始的位元據,通過資料訂閱消費到的資料是對位元據進行Base64編碼後的字串。
HBase表存在列族的概念,因此非主鍵列的列名格式為
列族_列名
,主鍵列的列名固定為ROW
。
資料消費樣本
SQL表
在Lindorm資料庫中建立如下Schema。
CREATE TABLE customers (id VARCHAR,first_name VARCHAR,last_name VARCHAR, PRIMARY KEY(id));
插入資料的資料消費樣本。
{ "schema": {}, "payload": { "op": "c", "ts_ms": 1465491411815, "before": null, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
更新資料的資料消費樣本。
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
刪除一行資料的資料消費樣本。
{ "schema": {}, "payload": { "op": "d", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": null, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
刪除列的資料消費樣本。
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne Marie" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
HBase表
向HBase表中插入一條資料:
Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);
對應的變更訊息為:
{
"schema": {},
"payload": {
"op": "c",
"ts_ms": 1725258859839,
"before": null,
"after": {
"ROW": "dXNlcjE=",
"f_name": "bHVja3k="
},
"source": {
"version": "v2.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1725258833727
}
}
}