This topic describes the data formats used in serialization methods and when data is transmitted from OceanBase Database to a message queue system.
Data formats used in serialization methods
When you use the data transmission service to synchronize data from the source to a Kafka, DataHub (BLOB type), or RocketMQ instance, you can use a serialization method to control the format of the synchronized data. Supported serialization methods are Default, Canal, DataWorks (V2.0), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro.
At present, only MySQL tenants of OceanBase Database support the Debezium, DebeziumFlatten, and DebeziumSmt serialization methods.
At present, the Avro serialization method is supported only when you synchronize data from a MySQL tenant of OceanBase Database to a Kafka instance.
Default JSON message format
When you synchronize data to a Kafka, DataHub (BLOB type), or RocketMQ instance, the following JSON message format is used for the Default serialization method:
{
"prevStruct": { // The image before modification.
"col1": "val1" // The key-value pair that contains the full key value.
},
"postStruct": { // The image after modification.
"col1": "val1" // The key-value pair that contains the full key value.
},
"allMetaData" {
"checkpoint": "STRING", // The current synchronization checkpoint, which specifies the target checkpoint (second-level timestamp) in the incremental synchronization phase and the primary key-value pair in the full synchronization phase.
"record_primary_key": "STRING", // The name of the primary key column. If the primary key has multiple columns, separate the column names with \u0001.
"record_primary_value" "STRING", // The primary key value. If the primary key has multiple columns, separate the column names with \u0001.
"source_identity": "STRING", // The identifier of the source. The value is a subtopic in incremental migration and a meaningless sequence number in full migration.
"dbType": "STRING", // The database type. Valid values: MYSQL, ORACLE, OCEANBASE, OB_IN_ORACLE_MODE, DB2, OB_MYSQL, OB_ORACLE, and DB2_LUW. Note that OCEANBASE, OB_IN_ORACLE_MODE, and DB2 are used by previous versions of the data transmission service and are compatible with this version of the data transmission service.
"storeDataSequence": LONG, // This field is available only when the value of sequenceEnabled is true at the source.json configuration file in incremental synchronization scenarios. Default value: true. This field is used for sorting. A sequence is generated during the synchronization process in the timestamp + sequence number format. The sequence number increments and does not exceed five digits. {Timestamp}{Incrementing sequence number}.
"table_name": "STRING", // The name of the table that is changed by using an SQL statement.
"db": "STRING", // The name of the database that is changed by using an SQL statement. For an OceanBase database, the database name includes the tenant name. The format is {tenant}.{database}.
"timestamp": "STRING", // The timestamp in seconds for data modification, which is available only for incremental synchronization.
"uniqueId": "STRING", // The sequence number as the identifier that is passed from the store in incremental migration.
"ddlType": "STRING", // The DDL operation type. This field is supported in OceanBase Migration Service (OMS) V4.0.0 and later.
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL" // The modification type.
}
If the DDL records contain only keys with the column name of "ddl", the values are DDL statements.
Beforeimage and afterimage:
prevStruct
: The beforeimage of incremental data, which is the data before an SQL statement is executed.postStruct
: The afterimage of incremental data, which is the data after an SQL statement is executed.
DELETE
contains only theprevStruct
field.INSERT
andDDL
contain only thepostStruct
field.UPDATE
contains bothprevStruct
andpostStruct
.HEARTBEAT
(indicating a periodic heartbeat message) does not containpostStruct
orpostStruct
.
Here are some examples:
Example of data insertion (INSERT)
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", } }
Example of data update (UPDATE)
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }
Example of data deletion (DELETE)
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col16":1.2222, "col7":9.99999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345" }, "recordType":"DELETE", "postStruct":null }
Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct" : null, "postStruct" : { "ddl" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "allMetaData" : { "checkpoint" : "1671177057", "dbType" : "OB_MYSQL", "storeDataSequence" : null, "db" : "connector_test", "timestamp" : "1671177057", "uniqueId" : null, "ddlType" : "ALTER_TABLE", "record_primary_key" : null, "source_identity" : null, "record_primary_value" : null, "table_name" : "all_mysql_type_test" }, "recordType" : "DDL" }
Canal JSON message format
When you synchronize data to a Kafka, DataHub (BLOB type), or RocketMQ instance, the following JSON message format is used for the Canal serialization method:
{
"database": "STRING", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
"sqlType": {
"col1": INTEGER, // Indicates that the INTEGER column type is modified. For more information, see java.sql.Types.
},
"data": [ // The modified data key-value pair. This field exists in only one message.
{
"col1": "val1"
}
],
"pkNames": [ // The primary key column name.
"col1"
],
"old": [ // This field exists only in update messages and indicates the column modified by using an UPDATE statement. It indicates the column value before the modification.
{
"col1": "val1"
}
],
"mysqlType": { // The description of the column type.
"col": "STRING"
},
"type": "STRING", // The modification type.
"table": "STRING", // The name of the table that is changed by using an SQL statement.
"es": LONG, // The modification time, with a millisecond-level timestamp.
"isDdl": BOOLEAN, // Indicates whether it is a DDL statement.
"ts": LONG, // The timestamp written into the destination database.
"sql": "STRING", // It is empty.
}
Here are some examples:
Example of data insertion (
INSERT
){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"INSERT", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618323429026, "sql":"" }
Example of data update (
UPDATE
){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world 2020", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":[ { "string":"hello world" } ], "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"UPDATE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364572908, "sql":"" }
Example of data deletion (
DELETE
){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "int8", "int16" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"DELETE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364660278, "sql":"" }
Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "database" : "connector_test", "sqlType" : null, "data" : null, "pkNames" : null, "old" : null, "mysqlType" : null, "type" : "ALTER", "table" : "all_mysql_type_test", "es" : 1671177209000, "isDdl" : true, "ts" : 1671177291475, "sql" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }
DataWorks JSON message format
When you synchronize data to a Kafka, DataHub (BLOB type), or RocketMQ instance, the following JSON message format is used for the DataWorks serialization method:
{
"version":"2.0", // The protocol version. Only DataWorks 2.0 is supported.
"schema": { // The modified metadata information, with only the column name and column type specified.
"source": {// The source of the modification.
"dbType": "mysql", // The source type.
"dbVersion": "5.7.35", // The database version.
"dbName": "myDatabase", // The database name.
"schema": "mySchema", // The schema name. This field is required in a system with schemas.
"table": "tableName" // The table name.
}
"column": [// The modified data column. This field indicates the updated record content in the target table.
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [// This field is required if a primary key or unique key is available. Otherwise, it is optional.
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data":{
"id": 111,
"name":"scooter",
"mydata": "[base64 string]", // Base64 encoding is required for the binary type.
"ts": 1590315269000.123456789 // The timestamp with an integer part of 13 digits and a fractional part of 9 digits.
}
},
"after": {
"data":{
"id": 222,
"name":"donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTE
R/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",// Case sensitive.
"timestamp": {
"eventTime": 1620457659000 // The modification time on the source database, which is a 13-digit timestamp in milliseconds.
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "The system change number (SCN) that increments."
},
"extend": { // The extend field that can be used for meeting extension requirements in the future. This field can be left unspecified if no extension is available.
"load_fm":"CIBS", // The source system, for example, "CIBS".
}
}
Heartbeat message of a synchronization task:
{
"version":"2.0", // The protocol version.
"payload": {
"timestamp": {
"eventTime": 1620457659000 // The timestamp of the heartbeat packet.
},
"op": "HEARTBEAT" // Indicates a heartbeat packet.
}
}
Here are some examples:
Example of data insertion (
INSERT
){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName12" ] }, "payload":{ "before":null, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"INSERT", "timestamp":{ "eventTime":1647581000000, "systemTime":1647581000795, "checkpointTime":1647581000 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }
Example of data update (
UPDATE
){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world 2020", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"UPDATE", "timestamp":{ "eventTime":1647581038000, "systemTime":1647581038674, "checkpointTime":1647581038 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }
Example of data deletion (
DELETE
){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":null, "op":"DELETE", "timestamp":{ "eventTime":1647581072000, "systemTime":1647581072976, "checkpointTime":1647581072 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }
Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "version" : "2.0", "schema" : { "source" : { "dbType" : "ob_mysql", "dbVersion" : null, "dbName" : "connector_test", "schema" : null, "table" : "all_mysql_type_test" }, "column" : null, "pk" : null }, "payload" : { "before" : null, "after" : null, "op" : "ALTER", "timestamp" : { "eventTime" : 1671177209000, "systemTime" : 1671177291485, "checkpointTime" : 1671177200 }, "ddl" : { "text" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "scn" : "null" }, "extend" : { } }
SharePlex JSON message format
When you synchronize data to a Kafka, DataHub (BLOB type), or RocketMQ instance, the following JSON message format is used for the SharePlex serialization method:
{
"data": { // The modified data key-value pair. If an INSERT or a DELETE statement is executed, the value is the full value. If an UPDATE statement is executed, the value is the modified value.
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // The modification time.
"op": "", // The change type. Valid values: ins, upd, del, and ddl.
"posttime": "YYYY-MM-DDTHH:mm:ss", // The time at which data is written to the destination.
"idx":"STRING", // Indexes of messages in the transaction/Number of index messages. This field is deprecated.
"size": NUMBER, // The number of messages in the transaction. This field is deprecated.
"seq": "STRING", // The sorting sequence. This field is available only when transactionEnabled is set to true on the source database.
"table": "STRING", // The database and table name in the {database}.{table} format modified by using an SQL statement.
"rowid": "STRING", // {The modified database and table name}-{The primary key values are separated with \u0001}.
"trans": "STRING", // The transaction ID.
"scn": "STRING", // This field is available only when the value of sequenceEnabled is true at the source.json configuration file during incremental synchronization. Default value: true. This field is used for sorting. A sequence is generated during the synchronization process in the timestamp + sequence number format. The sequence number increments and does not exceed five digits.
},
"key": { // The key value before modification, which is available only for modifications by using an UPDATE statement.
}
}
Here are some examples:
Example of data insertion (
INSERT
){ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:22:00", "op":"ins", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3 129", "trans":"shareplex_transaction_id", "scn":"123456789" } }
Example of data update (
UPDATE
){ "data":{ "string":"hello world 2020" }, "meta":{ "posttime":"2020-12-07T13:59:09", "op":"upd", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" }, "key":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" } }
Example of data deletion (
DELETE
){ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:34:10", "op":"del", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }
Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "data" : {}, "meta" : { "posttime" : "2022-12-16T15:54:51", "op" : "ddl", "size" : 0, "time" : "2022-12-16T15:53:29", "idx" : "0/0", "seq" : 0, "table" : "connector_test.all_mysql_type_test", "rowid" : "connector_test.all_mysql_type_test-", "trans" : null, "scn" : "null" }, "sql" : { "ddl" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" } }
DefaultExtendColumnType JSON message format
When you synchronize data to a Kafka, DataHub (BLOB type), or RocketMQ instance, the following JSON message format is used for the DefaultExtendColumnType serialization method:
The DefaultExtendColumnType JSON messages are created in the Default
format with a __light_type
field. This field specifies the data type of fields.
{
"prevStruct": { // The image before modification.
},
"postStruct": { // The image after modification.
"__light_type": {
"col":{ // The field name.
"schemaType":"type" // The value type.
}
}
},
"allMetaData" {},
}
Here are some examples:
Example of data insertion (
INSERT
){ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }
Example of data update (
UPDATE
){ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world 2020", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }
Example of data deletion (
DELETE
){ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType":"DELETE", "postStruct":null }
Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
{ "prevStruct" : null, "postStruct" : { "ddl" : "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'", "__light_type" : { "ddl" : { "schemaType" : "VAR_STRING" } } }, "allMetaData" : { "checkpoint" : "1671177200", "dbType" : "OB_MYSQL", "storeDataSequence" : null, "db" : "connector_test", "timestamp" : "1671177209", "uniqueId" : null, "ddlType" : "ALTER_TABLE", "record_primary_key" : null, "source_identity" : null, "record_primary_value" : null, "table_name" : "all_mysql_type_test" }, "recordType" : "DDL" }
Debezium JSON message format
When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (BLOB type), or RocketMQ instance, the following JSON message formats are used for the Debezium serialization method. Generally, only the struct in the payload
field is displayed by default.
Format where the
schema
andpayload
fields are specified{ "schema":{// The struct that describes the payload field. By default, this struct is not specified. "type": "struct",// struct indicates that this field contains other structs. "optional": false,// Specifies whether this field is required. "fields": [ { "type": "int64",// The field type. "optional": false,// Specifies whether this field is required. "field": "ts_ms"// The field name. } ... ] }, "payload":{ "op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message. "source":{ "version":"" // The OMS version. "connector":"OB_MYSQL", // The type of the data source. "name":"OMS", // A fixed value, which is OMS. "ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization. "db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name. "table":"testTab" // The name of the table that is changed by using an SQL statement. "pos":"553132@1668496109" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset]. }, "before":{ // The image before modification. "column":"value" // The key-value pair that contains the full key value. } "after":{ // The image after modification. "column":"value" // The key-value pair that contains the full key value. }, "ts_ms":1668497367188 // The timestamp of data processing. } }
Format where only the
payload
field is specified{ "payload":{ "op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message. "source":{ "version":"" // The OMS version. "connector":"OB_MYSQL", // The type of the data source. "name":"OMS", // A fixed value, which is OMS. "ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization. "db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name. "table":"testTab" // The name of the table that is changed by using an SQL statement. "pos":"553132@16684****" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset]. }, "before":{ // The image before modification. "column":"value" // The key-value pair that contains the full key value. } "after":{ // The image after modification. "column":"value" // The key-value pair that contains the full key value. }, "ts_ms":1668497367188 // The timestamp of data processing. } }
Here are some examples:
Example of data insertion (
INSERT
){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 } }
Example of data update (
UPDATE
){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }
Example of data deletion (
DELETE
){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON message format
When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (BLOB type), or RocketMQ instance, the following JSON message format is used for the DebeziumFlatten serialization method. Unlike the Debezium serialization method, the schema
and payload
fields are not provided in the DebeziumFlatten serialization method.
{
"op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message.
"source":{
"version":"" // The OMS version.
"connector":"OB_MYSQL", // The type of the data source.
"name":"OMS", // A fixed value, which is OMS.
"ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization.
"db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
"table":"testTab" // The name of the table that is changed by using an SQL statement.
"pos":"553132@16684****" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset].
},
"before":{ // The image before modification.
"column":"value" // The key-value pair that contains the full key value.
}
"after":{ // The image after modification.
"column":"value" // The key-value pair that contains the full key value.
},
"ts_ms":1668497367188 // The timestamp of data processing.
}
}
Here are some examples:
Example of data insertion (
INSERT
){ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }
Example of data update (
UPDATE
){ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }
Example of data deletion (
DELETE
){ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON message format
DebeziumSmt is a configuration method provided by the Debezium serialization method. It transforms and processes a single message by using event flattening single message transformation (SMT). When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (BLOB type), or RocketMQ instance, only the key:value
pairs in the after
section of the JSON message format for the DebeziumSmt serialization method are displayed.
In the following example, the Debezium serialization method is used to update data:
{
"op": "u",
"source": {
"connector":"OB_MYSQL",
"name":"OMS",
...
},
"ts_ms" : 1668496119717,
"before" : {
"field1" : "before_value1",
"field2" : "before_value2"
},
"after" : {
"field1" : "after_value1",
"field2" : "after_value2"
}
}
SMT simplifies the format of the preceding message. The simplified JSON message format by using the DebeziumSmt serialization method is as follows:
{
"field1" : "after_value1",
"field2" : "after_value2"
}
Here are some examples:
Example of data insertion (
INSERT
){ "field1" : "after_value1", "field2" : "after_value2", "__deleted": "false" }
Example of data update (
UPDATE
){ "field1" : "after_value1", "field2" : "after_value2", "__deleted": "false" }
Example of data deletion (
DELETE
){ "field1" : "after_value1", "field2" : "after_value2", "__deleted": "true" }
Avro JSON message format
When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka instance, the following JSON message format is used for the Avro serialization method:
Full migration
{ "version": 1, "id": 0, "sourceTimestamp": 1702371565, // The timestamp security checkpoint. "sourcePosition": "", // For full migration, the sourcePosition and safeSourcePosition fields are empty. "safeSourcePosition": "", "sourceTxid": "", "source": { "sourceType": "MySQL", // A fixed value, which is MySQL. "version": "OBMySQL" // A fixed value, which is OBMySQL. }, }, "operation": "INIT", // The data type for full migration is INIT. "objectName": "test***", "processTimestamps": [1702371565238], // Only the delivery time is provided. "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // Only the primary key type is provided. }, "fields": [ {"name": "id", "dataTypeNumber": 246}, // The data type of each column. {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15}, {"name": "address", "dataTypeNumber": 254} ], "beforeImages": null, // The image before full migration is empty. "afterImages": [ // The image after full migration. The precision of a field of the INTEGER or FLOAT type is always 8. The scale is always 64. {"value": "1", "precision": 1, "scale": 0}, {"precision": 8, "value": "11"}, {"charset": "utf8mb4", "value": {"bytes": "yyy"}}, null ] }
Incremental DML operation synchronization
Example of data insertion (
INSERT
){ "version": 1, "id": 170236922143600000, "sourceTimestamp": 1702369092, "sourcePosition": "1702369080", // A checkpoint for the MySQL tenant of OceanBase Database. "safeSourcePosition": "1702369080", // A checkpoint for the MySQL tenant of OceanBase Database. "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "INSERT", "objectName": "test***", "processTimestamps": [1702369221480], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": null, // The image before the INSERT operation is empty. "afterImages": [ {"precision": 8, "value": "2"}, {"precision": 8, "value": "12"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"} } ] }
Example of data update (
UPDATE
){ "version": 1, "id": 170236975822100001, "sourceTimestamp": 1702369757, "sourcePosition": "1702369756", "safeSourcePosition": "1702369756", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "UPDATE", "objectName": "test***", "processTimestamps": [1702369758237], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ // Images before and after the UPDATE operation. {"precision": 8, "value": "3"}, {"precision": 8, "value": "22"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ] }
Example of data deletion (
DELETE
){ "version": 1, "id": 170236976527500000, "sourceTimestamp": 1702369764, "sourcePosition": "1702369763", "safeSourcePosition": "1702369763", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DELETE", "objectName": "test***", "processTimestamps": [1702369765287], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": null // The image after the DELETE operation is empty. }
Incremental DDL operation synchronization
{ "version": 1, "id": 170236979372400000, "sourceTimestamp": 1702369793, "sourcePosition": "1702369792", "safeSourcePosition": "1702369792", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DDL", "objectName": "test***", "processTimestamps": [1702369794543], "tags": {}, "fields": null, // The values of fields and beforeImages are null for incremental DDL operation synchronization. "beforeImages": null, "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // The value of afterImages of the STRING type is a DDL statement. }
Data formats used when data is transmitted from OceanBase Database to a message queue system
Take note of the following items when you synchronize data from OceanBase Database to a Kafka, DataHub (BLOB type), or RocketMQ instance:
The following tables describe the format mappings for the MySQL and Oracle tenants of OceanBase Database when the serialization method is Default, Canal, DataWorks (version 2.0 supported), SharePlex, or DefaultExtendColumnType.
MySQL tenant of OceanBase Database
Data type
Mapped-to type
Description
TINYINT
SMALLINT
MEDIUMINT
INT
INTEGER
YEAR
BOOL
BOOLEAN
Long
Integers that are shorter than 64 digits.
Scientific notation is not used for a normal value such as 1000.
For a BOOL or BOOLEAN value, true is converted to 1 and false is converted to 0.
DECIMAL
NUMERIC
BigDecimal
Exact decimal numerals and integers longer than 64 digits.
An integer is displayed without the decimal point or a decimal place.
A decimal numeral is displayed based on the data passed to the database instance without removing zeros at the end. Scientific notation is used.
FLOAT
DOUBLE
Double
Floating-point numbers.
The number of significant digits is determined based on the source data type. If the source data type is FLOAT, the converted value has 7 significant digits. If the source data type is DOUBLE, the converted value has 16 significant digits.
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
String
The string type.
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BIT
Bytes
Byte arrays. By default, a byte array is displayed in a Base64-encoded string.
NoteFor a fixed-length BIT data type, the destination database removes zeros in higher-order bits after receiving a byte array for incremental synchronization but does not do so for full synchronization. As a result, the displayed Base64-encoded strings in the two scenarios may be different. However, the actual results obtained after decoding are consistent.
DATE
Date
The DATE type in the format of
YYYY-MM-DD
. If the value is invalid, the original string is displayed.TIME
Time
The TIME type in the format of
HH:mm:ss[.nnnnnnnnn]
.The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.
DATETIME
DateTime
The DATETIME type in the format of
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
, including the time zone.The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.
TIMESTAMP
Timestamp
The TIMESTAMP type in the format of
[second-level timestamp][.nnnnnnnnn]
.The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the time is invalid, 0000-00-00 00:00:00 is displayed.
Oracle tenant of OceanBase Database
Data type
Mapped-to type
Description
INTEGER
Long
Integers that are shorter than 64 digits.
Scientific notation is not used for a normal value such as 1000.
NUMBER
FLOAT
BigDecimal
Exact decimal numerals and integers longer than 64 digits.
BINARY_FLOAT BINARY_DOUBLE
Double
Floating-point numbers.
The number of significant digits is determined based on the source data type. If the source data type is FLOAT, the converted value has 7 significant digits. If the source data type is DOUBLE, the converted value has 16 significant digits.
VARCHAR2
NVARCHAR2
INTERVAL YEAR TO MOTH
INTERVAL DAY TO SECOND
CLOB
NCLOB
ROWID
UROWID
String
String
BLOB
BFILE
RAW
Bytes
Byte arrays.
By default, a byte array is displayed in a Base64-encoded string.
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
DateTime
The DATETIME type in the format of
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId]
, including the time zone.The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.
If the serialization method is Debezium, the data type mappings for MySQL tenants of OceanBase Database are described in the following table.
ImportantWhen you migrate data from an Oracle tenant of OceanBase Database to a Kafka, DataHub (BLOB type), or RocketMQ instance, the Debezium serialization method is not supported.
Data type
Mapped-to type
Description
BOOLEAN
BOOL
BOOLEAN
Valid values: true and false.
TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEAR
LONG
Integers that range from -263 to 263.
BIGINT
STRING
Strings that display the data in full.
FLOAT
DOUBLE
DOUBLE
Floating-point numbers.
DECIMAL
NUMERIC
STRING
Strings that display the data in full. A decimal numeral is displayed based on the data passed to the database instance without removing zeros at the end. Scientific notation is used.
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
Base16-encoded byte arrays.
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
STRING
The string type.
TIMESTAMP
STRING
Timestamp strings in the format of YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z. The time zone is UTC+0.
DATE
LONG
The number of days since January 1st, 1970.
TIME
LONG
The temporal value, in microseconds, since 00:00:00, excluding the time zone.
DATETIME
LONG
The temporal value, in milliseconds, since 1970-01-01 00:00:00, excluding the time zone.
If the serialization method is Avro, the data type mappings for MySQL tenants of OceanBase Database are described in the following table.
ImportantThe Avro serialization method is supported only when you synchronize data from a MySQL tenant of OceanBase Database to a Kafka instance.
The name of the type.
Mapped-to type
TINYINT
BOOLEAN
SMALLINT
MEDIUMINT
INT
BIGINT
BIT
INTEGER
FLOAT
DOUBLE
FLOAT
DECIMAL
NUMERIC
DECIMAL
VARCHAR
CHAR
TINYTEXT
MEDIUMTEXT
LONGTEXT
TEXT
CHARACTER
BINARY
VARBINARY
TINYBLOB
MEDIUMBLOB
LONGBLOB
BLOB
BinaryObject
TIMESTAMP
TimestampObject
NoteFor full migration and incremental synchronization, a value of the TIMESTAMP type will be converted into a timestamp.
-9223372022400L
is invalid.Except the invalid time, you can call the
Instant.ofEpochSecond(ts, nanos)
method of Java to obtain the correct wall clock time.DATE
TIME
DATETIME
YEAR
DATETIME
JSON
ENUM
SET
TextObject
GEOMETRY
TextGeometry
NoteAt present, the data transmission service uses the EWKT format for transparent data transmission. Therefore, the mapped-to type is TextGeometry.