全部產品
Search
文件中心

DataWorks:附錄:訊息格式

更新時間:Jun 19, 2024

本文介紹寫入Kafka訊息的訊息結構及各欄位含義。

背景資訊

同步整庫資料至kafka任務,是將從上遊資料來源讀取的資料,按照下面描述的JSON格式寫入到Kafka的topic。訊息總體格式包括變更記錄的列資訊、以及資料變更前後的狀態資訊等。為確保消費Kafka中資料時能夠準確判斷同步任務進度,同步任務還將定時產生op欄位作為MHEARTBEAT的同步任務心跳記錄寫入Kafka的topic中。以下為您介紹寫入Kafka的訊息總體格式同步任務心跳訊息格式源端更改資料對應的訊息格式,關於欄位類型及參數說明等資訊,詳情請參見欄位類型參數說明

訊息總體格式

寫入Kafka訊息的總體格式如下所示:

{
    "schema": { //變更的中繼資料資訊,僅指定列名與列類型資訊
        "dataColumn": [//變更的資料列資訊,更新目標表記錄內容
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// 資料來源為Oracle時,rowid會放在資料列中
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"//字串類型,Oracle的rowid資訊
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"//字串類型,Oracle的rowid資訊
            }
        },
        "sequenceId":"XXX",//字串類型,用於增全量資料合併的資料排序,
        "scn":"xxxx",//字串類型,Oracle的scn資訊
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",//大小寫敏感,
        "timestamp": {
            "eventTime": 1,//必選,記錄源端庫發生變更的時間,毫秒精度的13位時間戳記
            "systemTime": 2,//可選,同步任務處理該條變更訊息的時間,毫秒精度的13位時間戳記
            "checkpointTime": 3//可選,重設同步位點時的設定時間,毫秒精度的13位時間戳記,一般等於eventTime
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
        }
    },
    "version":"1.0.0"
}
說明

關於欄位類型及參數說明等資訊,詳情請參見欄位類型參數說明

同步任務心跳訊息格式

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}
說明

關於欄位類型及參數說明等資訊,詳情請參見欄位類型參數說明

源端更改資料對應的訊息格式

  • 源端插入資料對應的Kafka訊息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • 源端更新資料對應的Kafka訊息格式:

    • 當未勾選源端update變更對應一條Kafka記錄時,源端更新資料對應的Kafka訊息格式包含兩條Kafka訊息,分別描述更新前的資料狀態和更新後的資料狀態。具體訊息格式如下:

      更新前的資料狀態訊息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }

      更新後的資料狀態訊息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • 當勾選了源端update變更對應一條Kafka記錄時,源端更新資料對應的Kafka訊息格式包含一條Kafka訊息,描述更新前的資料狀態和更新後的資料狀態。具體訊息格式如下:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • 源端刪除資料對應的Kafka訊息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }
說明

關於欄位類型及參數說明等資訊,詳情請參見欄位類型參數說明

欄位類型

寫入Kafka topic中的訊息將從源端讀取資料對應為BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六種類型,再以不同的JSON格式寫入kafka topic中。

類型

說明

BOOLEAN

對應JSON中的布爾類型,取值為true,false

DATE

對應JSON中的數實值型別,取值為13位元字時間戳記,精確到毫秒(ms)級。

BYTES

對應JSON中的字串類型,寫入Kafka前會先對位元組數組進行base64編碼轉換為字串,消費時需要進行base64解碼(編碼Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));解碼Base64.getDecoder().decode(encodedText))

STRING

對應JSON中的字串類型

LONG

對應JSON中的數實值型別

DOUBLE

對應JSON中的數實值型別

參數說明

以下為您介紹寫入Kafka的訊息中的各個欄位的含義及說明。

一級元素

二級元素

說明

schema

dataColumn

JSONArray類型,資料列的類型資訊。dataColumn記錄上遊資料變更記錄的所有列和對應的列類型資訊。變更操作包括資料庫對資料的更改(新增、刪除及修改)和資料庫表結構等變更。

  • name:列名

  • type:列類型

primaryKey

List類型,主鍵資訊。

pk:主鍵名。

source

Object 類型,源端資料庫或表資訊。

  • dbType:String類型,資料庫類型

  • dbVersion:String類型,資料庫版本

  • dbName:String類型,資料庫名

  • schemaName:String類型,Schema名(針對Postgres和SQL Server等)

  • tableName:String 類型,資料表名

payload

before

JSONObject類型,修改前的資料。例如:資料來源端為mysql,做了一次記錄的update操作,before欄位儲存記錄被update之前的資料內容。

  • 在從源端讀取到更新、刪除操作訊息時,在寫入記錄中填充該欄位。

  • dataColumn:JSONObject類型,表示資料資訊。格式為列名:列值, 列名為字串,列值BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING。

after

修改後的資料。格式同before相同。

sequenceId

字串類型,Streamx產生,用於增量資料和全量資料合併的資料排序,每個streamx record都是唯一的。

說明

對於從源端讀取的更新操作訊息,會產生兩條寫入記錄,一條update before記錄和一條update after記錄,這兩條記錄的sequenceId相同。

scn

當源端為Oracle資料庫時有效,對應Oracle的scn資訊。

op

對應源端讀取到的訊息類型,取值如下:

  • INSERT:資料插入

  • UPDATE_BEFOR:資料更新前

  • UPDATE_AFTER:資料更新後

  • DELETE:資料刪除

  • TRANSACTION_BEGIN:資料庫事務開始

  • TRANSACTION_END:資料庫事務結束

  • CREATE:資料庫建表

  • ALTER:資料庫表變更

  • QUERY:資料庫變更的原始SQL

  • TRUNCATE:資料庫表清空

  • RENAME:資料庫表重新命名

  • CINDEX:建立索引

  • DINDEX:刪除索引

  • MHEARTBEAT:用於在源端無新增資料時身分識別同步處理仍正常進行的心跳訊息

timestamp

JSONObject 類型,本條資料的相關時間戳記。

  • eventTime:Long類型,記錄源端庫發生變更的時間,毫秒精度的13位時間戳記。

  • systemTime:Long類型,同步任務處理該條變更訊息的時間,毫秒精度的13位時間戳記。

  • checkpointTime:Long類型,重設同步位點時的設定時間,毫秒精度的13位時間戳記,一般與eventTime值一致。

ddl

該欄位只在更改資料庫的表結構時才會填充資料,更改資料(包括新增、刪除和修改)時對應的ddl直接填充為null。

  • text:String類型,資料庫DDL語句文本。

  • ddlMeta:String類型,將資料庫ddl類型變更記錄到一個Java對象,使用對象序列化後再進行base64編碼得到的字串。

version

格式的版本號碼。