This topic describes the data format of operation records that Lindorm generates after data in subscribed tables is inserted, updated, and deleted. This topic also provides sample data for each type of operation. The default data format is Debezium Format V2.0. You can configure your consumer client to consume data based on the data format.
Data format description
The following sample code provides an example of the data format. The following table describes the fields in the sample code.
{
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"name": "Jane"
},
"after": {
"id": 1004,
"name": "Anne"
},
"source": {
"version": "v1.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1465491411807
}
},
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "op"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "before"
}, {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}, {
"type": "string",
"optional": false,
"field": "name"
}
],
"optional": true,
"field": "after"
}, {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
}, {
"type": "string",
"optional": false,
"field": "db"
}, {
"type": "string",
"optional": false,
"field": "namespace"
}, {
"type": "string",
"optional": false,
"field": "table"
}, {
"type": "int64",
"optional": false,
"field": "ts_ms"
}
],
"optional": false,
"field": "source"
}
],
"optional": false
}
}
Field name | Description |
payload.op |
|
payload.ts_ms | The UNIX timestamp when data is written to Kafka. |
payload.before | The data that is stored in the corresponding row before an insert, update, or delete operation is performed. |
payload.after | The latest data that is stored in the corresponding row after an insert, update, or delete operation is performed. |
payload.source | The additional information about the operation.
|
schema | The schema field is automatically generated based on the payload of the data that is to be consumed. The value of the schema field is in the JSON format. The schema field is used to describe the fields of the data that is to be consumed, including the data type of each field. By default, each operation record includes the schema field. The content of the schema field is in a recursive structure.
|
The data format of operation records in an HBase table is the same as that of an SQL table. However, the HBase table and SQL table have the following structural differences:
An HBase table stores raw binary data in the database. The data consumed by using the data change tracking feature is strings encoded in Base64.
An HBase table uses the column family. Therefore, the column name of a non-primary key column is in the format of
column family_column name
. The column name of the primary key column is fixed toROW
.
Sample data
SQL
Create a table that uses the following schema in your Lindorm database:
CREATE TABLE customers (id VARCHAR,first_name VARCHAR,last_name VARCHAR, PRIMARY KEY(id));
Sample data for data insert records
{ "schema": {}, "payload": { "op": "c", "ts_ms": 1465491411815, "before": null, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
Sample data for data update records
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne", "last_name": "Kretchmar" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
Sample data for row deletion records
{ "schema": {}, "payload": { "op": "d", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": null, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
Sample data for column deletion records
{ "schema": {}, "payload": { "op": "u", "ts_ms": 1465491411815, "before": { "id": "1004", "first_name": "Anne Marie", "last_name": "Kretchmar" }, "after": { "id": "1004", "first_name": "Anne Marie" }, "source": { "version": "v1.0", "db": "ld-xxxx", "namespace": "default", "table": "customers", "ts_ms": 1465491411807 } } }
HBase
Sample data for data insert records
Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);
Sample data for data update records
{
"schema": {},
"payload": {
"op": "c",
"ts_ms": 1725258859839,
"before": null,
"after": {
"ROW": "dXNlcjE=",
"f_name": "bHVja3k="
},
"source": {
"version": "v2.0",
"db": "ld-xxxx",
"namespace": "default",
"table": "customers",
"ts_ms": 1725258833727
}
}
}