本文介紹寫入Kafka訊息的訊息結構及各欄位含義。
背景資訊
同步整庫資料至kafka任務,是將從上遊資料來源讀取的資料,按照下面描述的JSON格式寫入到Kafka的topic。訊息總體格式包括變更記錄的列資訊、以及資料變更前後的狀態資訊等。為確保消費Kafka中資料時能夠準確判斷同步任務進度,同步任務還將定時產生op欄位作為MHEARTBEAT的同步任務心跳記錄寫入Kafka的topic中。以下為您介紹寫入Kafka的訊息總體格式、同步任務心跳訊息格式及源端更改資料對應的訊息格式,關於欄位類型及參數說明等資訊,詳情請參見欄位類型和參數說明。
訊息總體格式
寫入Kafka訊息的總體格式如下所示:
{
"schema": { //變更的中繼資料資訊,僅指定列名與列類型資訊
"dataColumn": [//變更的資料列資訊,更新目標表記錄內容
{
"name": "id",
"type": "LONG"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "binData",
"type": "BYTES"
},
{
"name": "ts",
"type": "DATE"
},
{
"name":"rowid",// 資料來源為Oracle時,rowid會放在資料列中
"type":"STRING"
}
],
"primaryKey": [
"pkName1",
"pkName2"
],
"source": {
"dbType": "mysql",
"dbVersion": "1.0.0",
"dbName": "myDatabase",
"schemaName": "mySchema",
"tableName": "tableName"
}
},
"payload": {
"before": {
"dataColumn":{
"id": 111,
"name":"scooter",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"//字串類型,Oracle的rowid資訊
}
},
"after": {
"dataColumn":{
"id": 222,
"name":"donald",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"//字串類型,Oracle的rowid資訊
}
},
"sequenceId":"XXX",//字串類型,用於增全量資料合併的資料排序,
"scn":"xxxx",//字串類型,Oracle的scn資訊
"op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",//大小寫敏感,
"timestamp": {
"eventTime": 1,//必選,記錄源端庫發生變更的時間,毫秒精度的13位時間戳記
"systemTime": 2,//可選,同步任務處理該條變更訊息的時間,毫秒精度的13位時間戳記
"checkpointTime": 3//可選,重設同步位點時的設定時間,毫秒精度的13位時間戳記,一般等於eventTime
},
"ddl": {
"text": "ADD COLUMN ...",
"ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
}
},
"version":"1.0.0"
}
同步任務心跳訊息格式
{
"schema": {
"dataColumn": null,
"primaryKey": null,
"source": null
},
"payload": {
"before": null,
"after": null,
"sequenceId": null,
"timestamp": {
"eventTime": 1620457659000,
"checkpointTime": 1620457659000
},
"op": "MHEARTBEAT",
"ddl": null
},
"version": "0.0.1"
}
源端更改資料對應的訊息格式
源端插入資料對應的Kafka訊息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000000", "timestamp": { "eventTime": 1620457896000, "systemTime": 1620457896977, "checkpointTime": 1620457896000 }, "op": "INSERT", "ddl": null }, "version": "0.0.1" }
源端更新資料對應的Kafka訊息格式:
當未勾選源端update變更對應一條Kafka記錄時,源端更新資料對應的Kafka訊息格式包含兩條Kafka訊息,分別描述更新前的資料狀態和更新後的資料狀態。具體訊息格式如下:
更新前的資料狀態訊息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_BEFOR", "ddl": null }, "version": "0.0.1" }
更新後的資料狀態訊息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
當勾選了源端update變更對應一條Kafka記錄時,源端更新資料對應的Kafka訊息格式包含一條Kafka訊息,描述更新前的資料狀態和更新後的資料狀態。具體訊息格式如下:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
源端刪除資料對應的Kafka訊息格式:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000002", "timestamp": { "eventTime": 1620458266000, "systemTime": 1620458266101, "checkpointTime": 1620458266000 }, "op": "DELETE", "ddl": null }, "version": "0.0.1" }
欄位類型
寫入Kafka topic中的訊息將從源端讀取資料對應為BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六種類型,再以不同的JSON格式寫入kafka topic中。
類型 | 說明 |
BOOLEAN | 對應JSON中的布爾類型,取值為true,false |
DATE | 對應JSON中的數實值型別,取值為13位元字時間戳記,精確到毫秒(ms)級。 |
BYTES | 對應JSON中的字串類型,寫入Kafka前會先對位元組數組進行base64編碼轉換為字串,消費時需要進行base64解碼(編碼Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));解碼Base64.getDecoder().decode(encodedText))。 |
STRING | 對應JSON中的字串類型 |
LONG | 對應JSON中的數實值型別 |
DOUBLE | 對應JSON中的數實值型別 |
參數說明
以下為您介紹寫入Kafka的訊息中的各個欄位的含義及說明。
一級元素 | 二級元素 | 說明 |
schema | dataColumn | JSONArray類型,資料列的類型資訊。dataColumn記錄上遊資料變更記錄的所有列和對應的列類型資訊。變更操作包括資料庫對資料的更改(新增、刪除及修改)和資料庫表結構等變更。
|
primaryKey | List類型,主鍵資訊。 pk:主鍵名。 | |
source | Object 類型,源端資料庫或表資訊。
| |
payload | before | JSONObject類型,修改前的資料。例如:資料來源端為mysql,做了一次記錄的update操作,before欄位儲存記錄被update之前的資料內容。
|
after | 修改後的資料。格式同before相同。 | |
sequenceId | 字串類型,Streamx產生,用於增量資料和全量資料合併的資料排序,每個streamx record都是唯一的。 說明 對於從源端讀取的更新操作訊息,會產生兩條寫入記錄,一條update before記錄和一條update after記錄,這兩條記錄的sequenceId相同。 | |
scn | 當源端為Oracle資料庫時有效,對應Oracle的scn資訊。 | |
op | 對應源端讀取到的訊息類型,取值如下:
| |
timestamp | JSONObject 類型,本條資料的相關時間戳記。
| |
ddl | 該欄位只在更改資料庫的表結構時才會填充資料,更改資料(包括新增、刪除和修改)時對應的ddl直接填充為null。
| |
version | 無 | 格式的版本號碼。 |