全部產品
Search
文件中心

Lindorm:資料消費格式

更新時間:Sep 11, 2024

本文介紹即時資料訂閱功能的資料消費格式定義說明和樣本,預設格式為Debezium Format V2.0。

資料消費定義說明

資料消費格式如下代碼,欄位說明如下表所示。

{ 
  "payload": { 
    "op": "u", 
    "ts_ms": 1465491411815, 
    "before": { 
      "id": 1004,
      "name": "Jane"
    },
    "after": { 
      "id": 1004,
      "name": "Anne"
    },
    "source": { 
      "version": "v1.0",
      "db": "ld-xxxx",
      "namespace": "default",
      "table": "customers",
      "ts_ms": 1465491411807
    }
  },
  "schema": { 
  "type": "struct",
  "fields": [
     {
        "type": "string",
        "optional": false,
        "field": "op"
   }, {
        "type": "int64",
        "optional": false,
        "field": "ts_ms"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
          }
        ],
        "optional": true,
        "field": "before"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
     }, {
            "type": "string",
            "optional": false,
            "field": "name"
     }
        ],
        "optional": true,
        "field": "after"
   }, {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
         }, {
            "type": "string",
            "optional": false,
            "field": "db"
     }, {
            "type": "string",
            "optional": false,
            "field": "namespace"
     }, {
            "type": "string",
            "optional": false,
            "field": "table"
     }, {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
     }
        ],
        "optional": false,
        "field": "source"
   }
    ],
  "optional": false
 }
}

Field name

描述

payload.op

  • c表示對資料進行新增操作。

  • u表示對資料進行更新操作。

  • d表示對資料進行刪除操作。

  • r表示對資料進行全量匯出操作,暫時不會涉及到。

payload.ts_ms

表示寫入Kafka的Unix時間戳記。

payload.before

表示匯出整行資料更新前的值。

payload.after

表示匯出整行資料的最新值。

payload.source

表示操作的額外資訊,支援額外添加。

  • version:訊息對應Lindorm資料庫的版本號碼。

  • db:源叢集。

  • namespace:表的namespace。

  • table:表名。

  • ts_ms:記錄更新Lindorm的時間。

schema

根據payload內容自動產生,對整個JSON的結構及所有欄位類型進行說明。預設都包括schema資訊。整個schema的結構是遞迴的。

  • field:欄位名稱。

  • type:field欄位名對應的欄位類型。

  • name:schema欄位所屬模式名稱。

  • fields:遞迴說明當前欄位包含的子欄位內容。

  • optional:該欄位是否可選。

說明

HBase表的訂閱格式跟SQL表一致,但在結構上存在以下兩點不同:

  • HBase表在資料庫中儲存的是原始的位元據,通過資料訂閱消費到的資料是對位元據進行Base64編碼後的字串。

  • HBase表存在列族的概念,因此非主鍵列的列名格式為列族_列名,主鍵列的列名固定為ROW

資料消費樣本

SQL表

在Lindorm資料庫中建立如下Schema。

CREATE TABLE customers (id VARCHAR,first_name VARCHAR,last_name VARCHAR, PRIMARY KEY(id));
  • 插入資料的資料消費樣本。

    {
      "schema": {}, 
      "payload": { 
        "op": "c", 
        "ts_ms": 1465491411815, 
        "before": null, 
        "after": { 
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": { 
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 更新資料的資料消費樣本。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": {
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": {
          "id": "1004",
          "first_name": "Anne",
          "last_name": "Kretchmar"
        },
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 刪除一行資料的資料消費樣本。

    {
      "schema": {},
      "payload": { 
        "op": "d", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": null,
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }
  • 刪除列的資料消費樣本。

    {
      "schema": {},
      "payload": { 
        "op": "u", 
        "ts_ms": 1465491411815,
        "before": { 
          "id": "1004",
          "first_name": "Anne Marie",
          "last_name": "Kretchmar"
        }, 
        "after": { 
          "id": "1004",
          "first_name": "Anne Marie"
        }, 
        "source": {
          "version": "v1.0",
          "db": "ld-xxxx",
          "namespace": "default",
          "table": "customers",
          "ts_ms": 1465491411807
        }
      }
    }

HBase表

向HBase表中插入一條資料:

Put put = new Put(Bytes.toBytes("user1"));
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lucky"));
table.put(put);

對應的變更訊息為:

{
    "schema": {},
    "payload": {
        "op": "c",
        "ts_ms": 1725258859839,
        "before": null,
        "after": {
            "ROW": "dXNlcjE=",
            "f_name": "bHVja3k="
        },
        "source": {
            "version": "v2.0",
            "db": "ld-xxxx",
            "namespace": "default",
            "table": "customers",
            "ts_ms": 1725258833727
        }
    }
}