本文為您介紹序列化方式和資料庫傳輸到文本協議的資料格式。
序列化方式的格式說明
您在使用資料轉送同步源端資料至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,支援序列化方式控制資料同步至目標端的訊息格式。序列化方式包括 Default、Canal、 DataWorks(支援 2.0 版本)、SharePlex、DefaultExtendColumnType、Debezium、DebeziumFlatten、DebeziumSmt 和 Avro。
目前僅 OceanBase 資料庫 MySQL 租戶支援序列化方式 Debezium、DebeziumFlatten 和 DebeziumSmt。
目前僅同步 OceanBase 資料庫 MySQL 租戶的資料至 Kafka 時,支援序列化方式 Avro。
Default JSON 訊息格式
資料同步至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 Default 使用如下 JSON 訊息格式。
{
"prevStruct": { // 變更前鏡像
"col1": "val1" // 索引值對,包含全量索引值
},
"postStruct": { // 變更後鏡像
"col1": "val1" // 索引值對,包含全量索引值
},
"allMetaData"{
"checkpoint": "STRING", // 當前同步位點,增量階段表示同步到的時間位點(秒級時間戳記),全量階段使用主鍵索引值對錶示
"record_primary_key": "STRING", // 主鍵列的名稱。如果存在多列使用 \u0001 分割
"record_primary_value": "STRING", // 主索引值。如果存在多列使用 \u0001 分割
"source_identity": "STRING", // 源端標識,如果是增量則是 subtopic,如果是全量則沒有意義的序號
"dbType": "STRING", // 資料庫的類型。包括 MYSQL/ORACLE/OCEANBASE(老模式,相容使用)/OB_IN_ORACLE_MODE(老模式,相容使用)/DB2(老模式,相容使用)/OB_MYSQL/OB_ORACLE/DB2_LUW
"storeDataSequence": "LONG", // 該欄位只有在增量情境下 source.json 配置中包含 sequenceEnabled=true 才存在,預設是 true。用於排序,建置規則是一個同步進程中,時間戳記 + 不超過五位序號遞增。{時間戳記}{遞增序號}。
"table_name": "STRING", // 使用 SQL 陳述式進行變更的表的名稱
"db": "STRING", // 使用 SQL 陳述式進行變更的資料庫的名稱。如果是 OceanBase 資料庫,則包含租戶,格式為 {tenant}.{database}
"timestamp": "STRING", // 資料變更秒級時間戳記,僅增量存在
"uniqueId": "STRING", // 增量中表示 STORE 傳遞下來的事務序號標識,
"ddlType": "STRING", // DDL 具體類型
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL" // 變更類型
}
DDL 的 Record 中,僅存在 "ddl" 為列名的鍵,值為 DDL 語句。
前鏡像和後鏡像:
prevStruct
:表示增量資料的前鏡像資訊,即 SQL 執行前的資料。postStruct
:表示增量資料的後鏡像資訊,即 SQL 執行後的資料。
DELETE
僅存在prevStruct
,INSERT
和DDL
僅存在postStruct
,UPDATE
同時存在prevStruct
和postStruct
,HEARTBEAT
(定期心跳訊息)不存在postStruct
和postStruct。
資料樣本如下:
INSERT(插入)資料的樣本
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" } }
UPDATE(更新)資料的樣本
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" } }
DELETE(刪除)資料的樣本
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col16": 1.2222, "col7": 9.99999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" }, "recordType": "DELETE", "postStruct": null }
DDL 樣本
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "allMetaData": { "checkpoint": "1671177057", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177057", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Canal JSON 訊息格式
資料同步至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 Canal 使用如下 JSON 訊息格式。
{
"database": "STRING", // 使用 SQL 陳述式進行變更的資料庫的名稱。如果是 OceanBase 資料庫,僅存在資料庫名稱,無需租戶名稱。
"sqlType": {
"col1": "INTEGER" // 變更列類型,數字參考 java.sql.Types
},
"data": [ // 變更後資料索引值對,目前只會存在一條訊息
{
"col1": "val1"
}
],
"pkNames": [ // 主鍵列名
"col1"
],
"old": [ // 僅 UPDATE 訊息存在。表示 UPDATE 語句變更的列,即變更前的列值
{
"col1": "val1"
}
],
"mysqlType": { // 列類型描述
"col": "STRING"
},
"type": "STRING", // 變更類型
"table": "STRING", // 使用 SQL 陳述式進行變更的表的名稱
"es": "LONG", // 變更時間,毫秒級時間戳記
"isDdl": "BOOLEAN", // 是否是 DDL
"ts": "LONG", // 寫入目的端時間戳記
"sql": "STRING" // 當前是空
}
資料樣本如下:
INSERT
(插入)資料的樣本{ "database": "database", "sqlType": { "col1": 93, "col2": 12, "col3": 6, "col4": 8, "col5": 5, "col6": 92, "col7": 4, "col8": -5, "col9": 2004, "col10": -6, "col11": 91, "col12": 3, "col13": -5, "col14": 93 }, "data": [ { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": 1.2222, "col4": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5": 129, "col6": "00:01:02", "col7": 2147483646, "col8": 9223372036854775806, "col9": "aGVsbG8gd29ybGQ=", "col10": 3, "col11": "2020-11-25", "col12": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13": 10223372036854775806, "col14": "1606233662.012345" } ], "pkNames": [ "col1", "col2" ], "old": null, "mysqlType": { "col1": "datetime", "col2": "varchar", "col3": "float", "col4": "double", "col5": "smallint", "col6": "time", "col7": "int", "col8": "int64", "col9": "blob", "col10": "tinyint", "col11": "date", "col12": "decimal", "col13": "bigint", "col14": "timestamp" }, "type": "INSERT", "table": "table", "es": 1609344671000, "isDdl": false, "ts": 1618323429026, "sql": "" }
UPDATE
(更新)資料的樣本{ "database": "database", "sqlType": { "col1": 93, "col2": 12, "col3": 6, "col4": 8, "col5": 5, "col6": 92, "col7": 4, "col8": -5, "col9": 2004, "col10": -6, "col11": 91, "col12": 3, "col13": -5, "col14": 93 }, "data": [ { "col1": "2020-11-25 00:01:02", "col2": "hello world 2020", "col3": 1.2222, "col4": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5": 129, "col6": "00:01:02", "col7": 2147483646, "col8": 9223372036854775806, "col9": "aGVsbG8gd29ybGQ=", "col10": 3, "col11": "2020-11-25", "col12": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13": 10223372036854775806, "col14": "1606233662.012345" } ], "pkNames": [ "col1", "col2" ], "old": [ { "string": "hello world" } ], "mysqlType": { "col1": "datetime", "col2": "varchar", "col3": "float", "col4": "double", "col5": "smallint", "col6": "time", "col7": "int", "col8": "int64", "col9": "blob", "col10": "tinyint", "col11": "date", "col12": "decimal", "col13": "bigint", "col14": "timestamp" }, "type": "UPDATE", "table": "table", "es": 1609344671000, "isDdl": false, "ts": 1618364572908, "sql": "" }
DELETE
(刪除)資料的樣本{ "database": "database", "sqlType": { "col1": 93, "col2": 12, "col3": 6, "col4": 8, "col5": 5, "col6": 92, "col7": 4, "col8": -5, "col9": 2004, "col10": -6, "col11": 91, "col12": 3, "col13": -5, "col14": 93 }, "data": [ { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": 1.2222, "col4": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5": 129, "col6": "00:01:02", "col7": 2147483646, "col8": 9223372036854775806, "col9": "aGVsbG8gd29ybGQ=", "col10": 3, "col11": "2020-11-25", "col12": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13": 10223372036854775806, "col14": "1606233662.012345" } ], "pkNames": [ "int8", "int16" ], "old": null, "mysqlType": { "col1": "datetime", "col2": "varchar", "col3": "float", "col4": "double", "col5": "smallint", "col6": "time", "col7": "int", "col8": "int64", "col9": "blob", "col10": "tinyint", "col11": "date", "col12": "decimal", "col13": "bigint", "col14": "timestamp" }, "type": "DELETE", "table": "table", "es": 1609344671000, "isDdl": false, "ts": 1618364660278, "sql": "" }
DDL 樣本
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "database": "connector_test", "sqlType": null, "data": null, "pkNames": null, "old": null, "mysqlType": null, "type": "ALTER", "table": "all_mysql_type_test", "es": 1671177209000, "isDdl": true, "ts": 1671177291475, "sql": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }
DataWorks JSON 訊息格式
資料同步至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 DataWorks 使用如下 JSON 訊息格式。
{
"version": "2.0", //協議版本,目前僅支援 DataWorks 2.0 版本
"schema": { //變更的中繼資料資訊,僅指定列名與列類型資訊
"source": { //變更來源資訊
"dbType": "mysql", //資料來源類型
"dbVersion": "5.7.35", //資料庫版本
"dbName": "myDatabase", //資料庫名稱
"schema": "mySchema", //Schema 名稱,存在 Schema 的系統必填
"table": "tableName" //表名
},
"column": [ //變更的資料列資訊,更新目標表記錄內容
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [ //有主鍵或唯⼀鍵必填,否則可以不填
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data": {
"id": 111,
"name": "scooter",
"mydata": "[base64 string]", //如果是二進位類型,需要進行 Base 64 編碼
"ts": 1590315269000.123456789 //時間戳記,其整數部分 13 位,小數部分 9 位
}
},
"after": {
"data": {
"id": 222,
"name": "donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op": "INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...", //大小寫敏感
"timestamp": {
"eventTime": 1620457659000 // 變更在源端庫發生時間,毫秒精度的 13 位時間戳記
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "⾃增 ID"
},
"extend": { //extend 擴充欄位,用於後續擴充需求。如果沒有,可以不填
"load_fm": "CIBS" //記錄來源系統。例如,"CIBS"
}
}
同步任務心跳訊息:
{
"version": "2.0", //協議版本
"payload": {
"timestamp": {
"eventTime": 1620457659000 //⼼跳包時間
},
"op": "HEARTBEAT" //標識是⼼跳包
}
}
資料樣本如下:
INSERT
(插入)資料的樣本{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "db", "schema": null, "table": "tab" }, "column": [ { "name": "int8", "type": "TINYINT" }, { "name": "int16", "type": "SMALLINT" }, { "name": "int32", "type": "INT" }, { "name": "int64", "type": "INT64" }, { "name": "float32", "type": "FLOAT" }, { "name": "float64", "type": "DOUBLE" }, { "name": "bigInt", "type": "BIGINT" }, { "name": "boolean", "type": "BOOLEAN" }, { "name": "string", "type": "VARCHAR" }, { "name": "bytes", "type": "BLOB" }, { "name": "decimal", "type": "DECIMAL" }, { "name": "localDate", "type": "DATE" }, { "name": "localTime", "type": "TIME" }, { "name": "localDateTime", "type": "DATETIME" }, { "name": "timestamp", "type": "TIMESTAMP" }, { "name": "zonedDateTime", "type": "ZONED_DATETIME" }, { "name": "intervalDayToSecond", "type": "INTERVAL_DAY_TO_SECOND" }, { "name": "intervalYearToMonth", "type": "INTERVAL_YEAR_TO_MONTH" } ], "pk": [ "pkName1", "pkName12" ] }, "payload": { "before": null, "after": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "op": "INSERT", "timestamp": { "eventTime": 1647581000000, "systemTime": 1647581000795, "checkpointTime": 1647581000 }, "ddl": null, "scn": "null" }, "extend": { "load_fm": "test" } }
UPDATE
(更新)資料的樣本{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "db", "schema": null, "table": "tab" }, "column": [ { "name": "int8", "type": "TINYINT" }, { "name": "int16", "type": "SMALLINT" }, { "name": "int32", "type": "INT" }, { "name": "int64", "type": "INT64" }, { "name": "float32", "type": "FLOAT" }, { "name": "float64", "type": "DOUBLE" }, { "name": "bigInt", "type": "BIGINT" }, { "name": "boolean", "type": "BOOLEAN" }, { "name": "string", "type": "VARCHAR" }, { "name": "bytes", "type": "BLOB" }, { "name": "decimal", "type": "DECIMAL" }, { "name": "localDate", "type": "DATE" }, { "name": "localTime", "type": "TIME" }, { "name": "localDateTime", "type": "DATETIME" }, { "name": "timestamp", "type": "TIMESTAMP" }, { "name": "zonedDateTime", "type": "ZONED_DATETIME" }, { "name": "intervalDayToSecond", "type": "INTERVAL_DAY_TO_SECOND" }, { "name": "intervalYearToMonth", "type": "INTERVAL_YEAR_TO_MONTH" } ], "pk": [ "pkName1", "pkName2" ] }, "payload": { "before": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "after": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world 2020", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "op": "UPDATE", "timestamp": { "eventTime": 1647581038000, "systemTime": 1647581038674, "checkpointTime": 1647581038 }, "ddl": null, "scn": "null" }, "extend": { "load_fm": "test" } }
DELETE
(刪除)資料的樣本{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "db", "schema": null, "table": "tab" }, "column": [ { "name": "int8", "type": "TINYINT" }, { "name": "int16", "type": "SMALLINT" }, { "name": "int32", "type": "INT" }, { "name": "int64", "type": "INT64" }, { "name": "float32", "type": "FLOAT" }, { "name": "float64", "type": "DOUBLE" }, { "name": "bigInt", "type": "BIGINT" }, { "name": "boolean", "type": "BOOLEAN" }, { "name": "string", "type": "VARCHAR" }, { "name": "bytes", "type": "BLOB" }, { "name": "decimal", "type": "DECIMAL" }, { "name": "localDate", "type": "DATE" }, { "name": "localTime", "type": "TIME" }, { "name": "localDateTime", "type": "DATETIME" }, { "name": "timestamp", "type": "TIMESTAMP" }, { "name": "zonedDateTime", "type": "ZONED_DATETIME" }, { "name": "intervalDayToSecond", "type": "INTERVAL_DAY_TO_SECOND" }, { "name": "intervalYearToMonth", "type": "INTERVAL_YEAR_TO_MONTH" } ], "pk": [ "pkName1", "pkName2" ] }, "payload": { "before": { "data": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 1.2222, "col6": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7": 10223372036854775806, "col8": 1, "col9": "hello world", "col10": "aGVsbG8gd29ybGQ=", "col11": 0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12": "2020-11-25", "col13": "00:01:02", "col14": "2020-11-25 00:01:02", "col15": "1606233662.012345", "col16": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col17": "INTERVAL '3' DAY", "col18": "INTERVAL '4' YEAR" } }, "after": null, "op": "DELETE", "timestamp": { "eventTime": 1647581072000, "systemTime": 1647581072976, "checkpointTime": 1647581072 }, "ddl": null, "scn": "null" }, "extend": { "load_fm": "test" } }
DDL 樣本
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "connector_test", "schema": null, "table": "all_mysql_type_test" }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "ALTER", "timestamp": { "eventTime": 1671177209000, "systemTime": 1671177291485, "checkpointTime": 1671177200 }, "ddl": { "text": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "scn": "null" }, "extend": {} }
SharePlex JSON 訊息格式
資料同步至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 SharePlex 使用如下 JSON 訊息格式。
{
"data": { // 變更資料索引值對,如果是 INSERT / DELETE 是全量值,如果是 UPDETE 只有變更值
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // 變更時間
"op": "", // 變更類型,包括 ins/upd/del/ddl
"posttime": "YYYY-MM-DDTHH:mm:ss", // 寫入目標端的時間
"idx": "STRING", //訊息在事務中的索引/索引的訊息數量。該參數已廢棄。
"size": NUMBER, //事務內訊息數量。該參數已廢棄。
"seq": "STRING", // 排序序號,需要配合源端開啟 transactionEnabled 才能存在
"table": "STRING", // SQL 變更庫表名 {database}.{table}
"rowid": "STRING", // {變更庫表名}-{主索引值使用\u0001} 分割
"trans": "STRING", // 事務 ID
"scn": "STRING" // 該欄位只有在增量情境下 source.json 配置中包含 sequenceEnabled=true 才存在,預設是 true。用於排序,建置規則是一個同步進程中,時間戳記 + 不超過五位序號遞增。
},
"key": { // 僅 UPDATE 存在,表示變更前的值
}
}
資料樣本如下:
INSERT
(插入)資料的樣本{ "data": { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": "INTERVAL '3' DAY", "col4": 1.2222, "col5": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6": 129, "col7": "00:01:02", "col8": 1, "col9": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col10": 2147483646, "col11": 9223372036854775806, "col12": "aGVsbG8gd29ybGQ=", "col13": "INTERVAL '4' YEAR", "col14": 3, "col15": "2020-11-25", "col16": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17": 10223372036854775806, "col18": "1606233662.012345" }, "meta": { "posttime": "2020-12-07T13:22:00", "op": "ins", "size": 10, "time": "2020-11-25T00:01:02", "idx": "1/10", "seq": 1, "table": "mock_database.mock_table", "rowid":"mock_database.mock_table-3129", "trans": "shareplex_transaction_id", "scn": "123456789" } }
UPDATE
(更新)資料的樣本{ "data": { "string": "hello world 2020" }, "meta": { "posttime": "2020-12-07T13:59:09", "op": "upd", "size": 10, "time": "2020-11-25T00:01:02", "idx": "1/10", "seq": 1, "table": "mock_database.mock_table", "rowid": "mock_database.mock_table-3\u0001129", "trans": "shareplex_transaction_id", "scn": "123456789" }, "key": { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": "INTERVAL '3' DAY", "col4": 1.2222, "col5": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6": 129, "col7": "00:01:02", "col8": 1, "col9": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col10": 2147483646, "col11": 9223372036854775806, "col12": "aGVsbG8gd29ybGQ=", "col13": "INTERVAL '4' YEAR", "col14": 3, "col15": "2020-11-25", "col16": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17": 10223372036854775806, "col18": "1606233662.012345" } }
DELETE
(刪除)資料的樣本{ "data": { "col1": "2020-11-25 00:01:02", "col2": "hello world", "col3": "INTERVAL '3' DAY", "col4": 1.2222, "col5": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6": 129, "col7": "00:01:02", "col8": 1, "col9": "2020-11-25 00:01:02.012345 Asia/Shanghai", "col10": 2147483646, "col11": 9223372036854775806, "col12": "aGVsbG8gd29ybGQ=", "col13": "INTERVAL '4' YEAR", "col14": 3, "col15": "2020-11-25", "col16": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17": 10223372036854775806, "col18": "1606233662.012345" }, "meta": { "posttime": "2020-12-07T13:34:10", "op": "del", "size": 10, "time": "2020-11-25T00:01:02", "idx": "1/10", "seq": 1, "table": "mock_database.mock_table", "rowid": "mock_database.mock_table-3\u0001129", "trans": "shareplex_transaction_id", "scn": "123456789" } }
DDL 樣本
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "data": {}, "meta": { "posttime": "2022-12-16T15:54:51", "op": "ddl", "size": 0, "time": "2022-12-16T15:53:29", "idx": "0/0", "seq": 0, "table": "connector_test.all_mysql_type_test", "rowid": "connector_test.all_mysql_type_test-", "trans": null, "scn": "null" }, "sql": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" } }
DefaultExtendColumnType JSON 訊息格式
資料同步至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 DefaultExtendColumnType 使用如下 JSON 訊息格式。
DefaultExtendColumnType JSON 訊息格式會在 DEFAULT
的基礎上,在鏡像內增加一個欄位 __light_type
,用於表示欄位的資料類型。
{
"prevStruct": { // 變更前鏡像
},
"postStruct": { // 變更後鏡像
"__light_type": {
"col": { // 欄位的名稱
"schemaType": "type" // 值的類型
}
}
},
"allMetaData": {
}
}
資料樣本如下:
INSERT
(插入)資料的樣本{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }
UPDATE
(更新)資料的樣本{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }
DELETE
(刪除)資料的樣本{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "DELETE", "postStruct": null }
DDL 樣本
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'", "__light_type": { "ddl": { "schemaType": "VAR_STRING" } } }, "allMetaData": { "checkpoint": "1671177200", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177209", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Debezium JSON 訊息格式
同步 OceanBase 資料庫 MySQL 租戶的資料至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 Debezium 使用如下 JSON 訊息格式,共包含兩種,通常預設僅顯示 payload
中的結構。
存在
schema
和payload
{ "schema": { //描述 payload 欄位資訊的結構體,預設沒有該結構體 "type": "struct", //struct 表示該欄位內部還有結構 "optional": false, //是否必須包含該欄位 "fields": [ { "type": "int64", //欄位的類型 "optional": false, //是否必須包含該欄位 "field": "ts_ms" //欄位的名稱 } ] }, "payload": { "op": "c", //資料修改類型,包括 c(全量、插入)、u(更新)、d(刪除)和 HEARTBEAT(心跳訊息) "source": { "version": "", //OMS 的版本 "connector": "OB_MYSQL", //資料來源的類型 "name": "OMS", //固定值 OMS "ts_ms": 0, //資料變更秒級時間戳記,僅增量存在 "db": "test", //使用 SQL 陳述式進行變更的資料庫的名稱。如果是 OceanBase 資料庫,僅存在資料庫名稱,無需租戶名稱 "table": "testTab", //使用 SQL 陳述式進行變更的表的名稱 "pos": "553132@1668496109" //在 binlog 檔案中的位置 [binlog 檔案名稱]@[binlog 檔案名稱 offset] }, "before": { //變更前鏡像 "column": "value" //索引值對,包含全量索引值 }, "after": { //變更後鏡像 "column": "value" // 索引值對,包含全量索引值 }, "ts_ms": 1668497367188 //資料處理時間戳記 } }
僅存在
payload
{ "payload": { "op": "c", //資料修改類型,包括 c(全量、插入)、u(更新)、d(刪除)和 HEARTBEAT(心跳訊息) "source": { "version": "", //OMS 的版本 "connector": "OB_MYSQL", //資料來源的類型 "name": "OMS", //固定值 OMS "ts_ms": 0, //資料變更秒級時間戳記,僅增量存在 "db": "test", //使用 SQL 陳述式進行變更的資料庫的名稱。如果是 OceanBase 資料庫,僅存在資料庫名稱,無需租戶名稱 "table": "testTab", //使用 SQL 陳述式進行變更的表的名稱 "pos": "553132@16684****" //在 binlog 檔案中的位置 [binlog 檔案名稱]@[binlog 檔案名稱 offset] }, "before": { //變更前鏡像 "column": "value" //索引值對,包含全量索引值 }, "after": { //變更後鏡像 "column": "value" // 索引值對,包含全量索引值 }, "ts_ms": 1668497367188 //資料處理時間戳記 } }
資料樣本如下:
INSERT
(插入)資料的樣本{ "schema": { "optional": false, "type": "STRUCT", "fields": [ { "field": "before", "optional": false, "type": "struct", "fields": [ { "field": "c01", "optional": false, "type": "int32" }, { "field": "c02", "optional": false, "type": "string" }, { "field": "c03", "optional": false, "type": "string" }, { "field": "c04", "optional": false, "type": "bytes" }, { "field": "c05", "optional": false, "type": "int16" }, { "field": "c06", "optional": false, "type": "int16" }, { "field": "c07", "optional": false, "type": "int32" }, { "field": "c08", "optional": false, "type": "int64" }, { "field": "c09", "optional": false, "type": "float64" }, { "field": "c10", "optional": false, "type": "float64" }, { "field": "c11", "optional": false, "type": "string" }, { "field": "c12", "optional": false, "type": "string" }, { "field": "c13", "optional": false, "type": "string" }, { "field": "c14", "optional": false, "type": "string" }, { "field": "c15", "optional": false, "type": "bytes" }, { "field": "c16", "optional": false, "type": "string" }, { "field": "c17", "optional": false, "type": "bytes" }, { "field": "c18", "optional": false, "type": "bytes" }, { "field": "c19", "optional": false, "type": "bytes" }, { "field": "c20", "optional": false, "type": "bytes" }, { "field": "c21", "optional": false, "type": "string" }, { "field": "c22", "optional": false, "type": "int32" }, { "field": "c23", "optional": false, "type": "int64" }, { "field": "c24", "optional": false, "type": "string" }, { "field": "c25", "optional": false, "type": "int32" }, { "field": "c26", "optional": false, "type": "bytes" } ] }, { "field": "after", "optional": false, "type": "struct", "fields": [ { "field": "c01", "optional": false, "type": "int32" }, { "field": "c02", "optional": false, "type": "string" }, { "field": "c03", "optional": false, "type": "string" }, { "field": "c04", "optional": false, "type": "bytes" }, { "field": "c05", "optional": false, "type": "int16" }, { "field": "c06", "optional": false, "type": "int16" }, { "field": "c07", "optional": false, "type": "int32" }, { "field": "c08", "optional": false, "type": "int64" }, { "field": "c09", "optional": false, "type": "float64" }, { "field": "c10", "optional": false, "type": "float64" }, { "field": "c11", "optional": false, "type": "string" }, { "field": "c12", "optional": false, "type": "string" }, { "field": "c13", "optional": false, "type": "string" }, { "field": "c14", "optional": false, "type": "string" }, { "field": "c15", "optional": false, "type": "bytes" }, { "field": "c16", "optional": false, "type": "string" }, { "field": "c17", "optional": false, "type": "bytes" }, { "field": "c18", "optional": false, "type": "bytes" }, { "field": "c19", "optional": false, "type": "bytes" }, { "field": "c20", "optional": false, "type": "bytes" }, { "field": "c21", "optional": false, "type": "string" }, { "field": "c22", "optional": false, "type": "int32" }, { "field": "c23", "optional": false, "type": "int64" }, { "field": "c24", "optional": false, "type": "string" }, { "field": "c25", "optional": false, "type": "int32" }, { "field": "c26", "optional": false, "type": "bytes" } ] }, { "field": "source", "optional": false, "type": "struct", "fields": [ { "field": "version", "optional": false, "type": "string" }, { "field": "connector", "optional": false, "type": "string" }, { "field": "name", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": false, "type": "int64" }, { "field": "db", "optional": false, "type": "string" }, { "field": "table", "optional": false, "type": "string" }, { "field": "server_id", "optional": false, "type": "int64" }, { "field": "pos", "optional": false, "type": "string" } ] }, { "field": "op", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": false, "type": "int64" } ] }, "payload": { "op": "c", "source": { "connector": "OB_MYSQL", "pos": "703223@166849****", "name": "OMS", "version": "", "ts_ms": 1668491621000, "db": "test", "table": "table_name" }, "after": { "c11": "a", "c10": 2.4212412, "c13": "c", "c12": "b", "c15": "65", "c14": "d", "c17": "67", "c16": "f", "c19": "690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18": "68", "c20": "6A", "c22": 19311, "c21": "2022-11-15T05:12:11Z", "c02": "12312", "c24": 1668489131000, "c01": 2, "c23": 36060000000, "c04": "61", "c26": "6B", "c03": "1241.41000", "c25": 2022, "c06": 141, "c05": 11, "c08": 412124124, "c07": 4241, "c09": 2.11111 }, "ts_ms": 1668495423594 } }
UPDATE
(更新)資料的樣本{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }
DELETE
(刪除)資料的樣本{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON 訊息格式
同步 OceanBase 資料庫 MySQL 租戶的資料至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 DebeziumFlatten 的 JSON 訊息格式如下所示,和序列化方式 Debezium 相比,不再填充 schema
和 payload
。
{
"op": "c", //資料修改類型,包括 c(全量、插入)、u(更新)、d(刪除)和 HEARTBEAT(心跳訊息)
"source": {
"version": "", //OMS 的版本
"connector": "OB_MYSQL", //資料來源的類型
"name": "OMS", //固定值 OMS
"ts_ms": 0, //資料變更秒級時間戳記,僅增量存在
"db": "test", //使用 SQL 陳述式進行變更的資料庫的名稱。如果是 OceanBase 資料庫,僅存在資料庫名稱,無需租戶名稱
"table": "testTab", //使用 SQL 陳述式進行變更的表的名稱
"pos": "553132@16684****" //在 binlog 檔案中的位置 [binlog 檔案名稱]@[binlog 檔案名稱 offset]
},
"before": { //變更前鏡像
"column": "value" //索引值對,包含全量索引值
},
"after": { //變更後鏡像
"column": "value" // 索引值對,包含全量索引值
},
"ts_ms": 1668497367188 //資料處理時間戳記
}
資料樣本如下:
INSERT
(插入)資料的樣本{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }
UPDATE
(更新)資料的樣本{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }
DELETE
(刪除)資料的樣本{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON 訊息格式
DebeziumSmt 是 Debezium 提供的一種配置方式,使用事件扁平化單訊息轉換(Single Message Transform,SMT)對單條資訊進行轉換和處理。同步 OceanBase 資料庫 MySQL 租戶的資料至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,序列化方式 DebeziumSmt 的 JSON 訊息格式僅顯示 after
中的 key:value
。
例如,使用序列化方式 Debezium 更新資料:
{
"op": "u",
"source": {
"connector": "OB_MYSQL",
"name": "OMS"
},
"ts_ms": 1668496119717,
"before": {
"field1": "before_value1",
"field2": "before_value2"
},
"after": {
"field1": "after_value1",
"field2": "after_value2"
}
}
SMT 對上述樣本的訊息進行處理後,簡化了訊息格式。即使用序列化方式 DebeziumSmt,JSON 訊息格式如下所示。
{
"field1": "after_value1",
"field2": "after_value2"
}
資料樣本如下:
INSERT
(插入)資料的樣本{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }
UPDATE
(更新)資料的樣本{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }
DELETE
(刪除)資料的樣本{ "field1": "after_value1", "field2": "after_value2", "__deleted": "true" }
Avro JSON 訊息格式
同步 OceanBase 資料庫 MySQL 租戶的資料至 Kafka 時,序列化方式 Avro 使用如下 JSON 訊息格式。
全量遷移
{ "version": 1, "id": 0, "sourceTimestamp": 1702371565, // 時間戳記安全位元點。 "sourcePosition": "", // 全量遷移無 position 等資訊。 "safeSourcePosition": "", "sourceTxid": "", "source": { "sourceType": "MySQL", // 固定值 MySQL。 "version": "OBMySQL" // 固定值 OBMySQL。 }, "operation": "INIT", // 全量類型為 INIT。 "objectName": "test***", "processTimestamps": [ 1702371565238 ], // 只有投遞時間。 "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // 只有主鍵類型。 }, "fields": [ { "name": "id", "dataTypeNumber": 246 }, // 每個列的類型。 { "name": "bid", "dataTypeNumber": 3 }, { "name": "name", "dataTypeNumber": 15 }, { "name": "address", "dataTypeNumber": 254 } ], "beforeImages": null, // 全量遷移前鏡像為空白。 "afterImages": [ // 後鏡像。INTEGER 類型的 precision 衡為 8,FLOAT 類型的 precision 衡為 8、scale 衡為 64。 { "value": "1", "precision": 1, "scale": 0 }, { "precision": 8, "value": "11" }, { "charset": "utf8mb4", "value": { "bytes": "yyy" } }, null ] }
增量同步處理 DML
INSERT
(插入)資料的樣本{ "version": 1, "id": 170236922143600000, "sourceTimestamp": 1702369092, "sourcePosition": "1702369080", // OceanBase 資料庫 MySQL 租戶的 checkpoint。 "safeSourcePosition": "1702369080", // OceanBase 資料庫 MySQL 租戶的 checkpoint。 "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "INSERT", "objectName": "test***", "processTimestamps": [1702369221480], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": null, // INSERT 前鏡像為空白。 "afterImages": [ {"precision": 8, "value": "2"}, {"precision": 8, "value": "12"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"} } ] }
UPDATE
(更新)資料的樣本{ "version": 1, "id": 170236975822100001, "sourceTimestamp": 1702369757, "sourcePosition": "1702369756", "safeSourcePosition": "1702369756", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "UPDATE", "objectName": "test***", "processTimestamps": [1702369758237], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ // UPDATE 存在前鏡像和後鏡像。 {"precision": 8, "value": "3"}, {"precision": 8, "value": "22"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ] }
DELETE
(刪除)資料的樣本{ "version": 1, "id": 170236976527500000, "sourceTimestamp": 1702369764, "sourcePosition": "1702369763", "safeSourcePosition": "1702369763", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DELETE", "objectName": "test***", "processTimestamps": [1702369765287], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": null // DELETE 後鏡像為空白。 }
增量同步處理 DDL
{ "version": 1, "id": 170236979372400000, "sourceTimestamp": 1702369793, "sourcePosition": "1702369792", "safeSourcePosition": "1702369792", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DDL", "objectName": "test***", "processTimestamps": [ 1702369794543 ], "tags": {}, "fields": null, // 增量同步處理 DDL 無 fields 和 beforeImages。 "beforeImages": null, "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // STRING 類型的 afterImages 為 DDL 語句。 }
資料庫傳輸到文本協議的格式說明
同步 OceanBase 資料庫的資料至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時:
如果序列化方式為 Default、Canal、DataWorks(支援 2.0 版本)、SharePlex 或 DefaultExtendColumnType,OceanBase 資料庫兩種租戶對應的映射說明如下。
OceanBase 資料庫 MySQL 租戶
資料類型
映射類型
描述
TINYINT
SMALLINT
MEDIUMINT
INT
INTEGER
YEAR
BOOL
BOOLEAN
Long
64 位元以下的整型。
正常數值,例如 1000,不使用科學計數法。
對於 BOOL/BOOLEAN,則 true = 1,false = 0。
DECIMAL
NUMERIC
BigDecimal
精確小數數實值型別以及超過 64 位元的整型。
對於整型數值不會展示小數點及小數。
對於存在小數的數值,會根據資料庫傳入的資料進行位元展示,不會去除末尾的 0,使用科學計數法。
FLOAT
DOUBLE
Double
浮點數
根據源端是 FLOAT 或 DOUBLE 類型決定有效位元。FLOAT 是 7 位有效位元,DOUBLE 是 16 位有效位元。
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
String
字串。
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BIT
Bytes
位元組數組,預設以 BASE64 編碼展示。
說明對於 BIT 定長的類型,增量接收到位元組數組之後會將高位 0 去除,但是全量不會,所以看到的 BASE64 編碼可能會不一致。但是實際結果一致,解碼之後結果一致。
DATE
Date
日期類型,格式為
YYYY-MM-DD
。 如果是非法時間,會顯示原有字串。TIME
Time
時間類型,格式為
HH:mm:ss[.nnnnnnnnn]
。低於秒級的時間最多展示 9 位。如果是低於秒級的時間,會顯示出所有非 0 的數值。如果是非法時間,會顯示原有字串。
DATETIME
DateTime
日期時間類型,包括時區。格式為
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
。低於秒級的時間最多展示 9 位。如果是低於秒級的時間,會顯示出所有非 0 的數值。如果是非法時間,會顯示原有字串。
TIMESTAMP
Timestamp
時間戳記類型,格式為
[秒級時間戳記][.nnnnnnnnn]
。低於秒級的時間最多展示 9 位。如果是低於秒級的時間,會顯示出所有非 0 的數值。如果是非法時間,會以 0000-00-00 00:00:00 格式顯示。
OceanBase 資料庫 Oracle 租戶
資料類型
映射類型
描述
INTEGER
Long
64 位元以下的整型。
正常數值,例如 1000,不使用科學計數法。
NUMBER
FLOAT
BigDecimal
精確小數數實值型別以及超過 64 位元的整型。
BINARY_FLOAT BINARY_DOUBLE
Double
浮點數
根據源端是 FLOAT 或 DOUBLE 類型決定有效位元。FLOAT 是 7 位有效位元,DOUBLE 是 16 位有效位元。
VARCHAR2
NVARCHAR2
INTERVAL YEAR TO MOTH
INTERVAL DAY TO SECOND
CLOB
NCLOB
ROWID
UROWID
String
字串
BLOB
BFILE
RAW
Bytes
位元組數組
預設以 BASE64 編碼展示。
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
DateTime
日期時間類型,包括時區。格式為
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
。低於秒級的時間最多展示 9 位。如果是低於秒級的時間,會顯示出所有非 0 的數值。如果是非法時間,會顯示原有字串。
如果序列化方式為 Debezium,OceanBase 資料庫 MySQL 租戶對應的映射說明如下。
重要同步 OceanBase 資料庫 Oracle 租戶的資料至 Kafka、DataHub(BLOB 類型)和 RocketMQ 時,不支援選擇序列化方式 Debezium。
資料類型
映射類型
描述
BOOLEAN
BOOL
BOOLEAN
取值包括 true 和 false。
TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEAR
LONG
-263 ~ 263範圍的整型。
BIGINT
STRING
使用字串完整展示資料。
FLOAT
DOUBLE
DOUBLE
浮點數。
DECIMAL
NUMERIC
STRING
使用字串完整展示資料。對於存在小數的數值,會根據資料庫傳入的資料進行位元展示,不會去除末尾的 0,使用科學計數法。
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
位元組數組,base16 編碼。
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
STRING
字串。
TIMESTAMP
STRING
格式為 YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z,時區為 0 時區。
DATE
LONG
表示自 1970-01-01 以來的天數。
TIME
LONG
表示自 00:00:00 以來的時間值(以微秒為單位),不包括時區資訊。
DATETIME
LONG
表示自 1970-01-01 00:00:00 以來的毫秒數,不包括時區資訊。
如果序列化方式為 Avro,OceanBase 資料庫 MySQL 租戶對應的映射說明如下。
重要僅同步 OceanBase 資料庫 MySQL 租戶的資料至 Kafka 時,支援選擇序列化方式 Avro。
類型名稱
映射類型
TINYINT
BOOLEAN
SMALLINT
MEDIUMINT
INT
BIGINT
BIT
INTEGER
FLOAT
DOUBLE
FLOAT
DECIMAL
NUMERIC
DECIMAL
VARCHAR
CHAR
TINYTEXT
MEDIUMTEXT
LONGTEXT
TEXT
CHARACTER
BINARY
VARBINARY
TINYBLOB
MEDIUMBLOB
LONGBLOB
BLOB
BinaryObject
TIMESTAMP
TimestampObject
說明對於 TIMESTAMP 類型,全量和增量均會轉換至時間戳記,非法時間為
-9223372022400L
。除非法時間外,您可以使用 Java 的
Instant.ofEpochSecond(ts, nanos)
方法擷取正確的時間。DATE
TIME
DATETIME
YEAR
DATETIME
JSON
ENUM
SET
TextObject
GEOMETRY
TextGeometry
說明目前資料轉送使用 EWKT 格式透傳,所以映射為 TextGeometry 類型。