This topic describes the formats of messages that are written to Kafka and the meaning of each field in the messages.
Background information
A node that synchronizes all data in a data source to Kafka writes the data that DataWorks reads from the data source to Kafka topics in the JSON format. A message that is written to Kafka contains the column change information and the status of the data before and after the change. To ensure that a consumer knows the progress of a synchronization node when it consumes Kafka messages, the synchronization node periodically generates a heartbeat message that contains the op field with the MHEARTBEAT value and writes the heartbeat message to Kafka topics. For more information about the formats of messages that are written to Kafka, see Format of a Kafka message, Format of a heartbeat message generated by a synchronization node, and Format of a Kafka message for data change in a source. For more information about the type and meaning of each field in a message, see Field types and Fields.
Format of a Kafka message
Format of a message that is written to Kafka:
{
"schema": { // The metadata change information. Only column names and column types are specified.
"dataColumn": [// The column change information. The data in a destination topic is updated based on the information.
{
"name": "id",
"type": "LONG"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "binData",
"type": "BYTES"
},
{
"name": "ts",
"type": "DATE"
},
{
"name":"rowid",// If an Oracle data source is used, rowid is added as a column.
"type":"STRING"
}
],
"primaryKey": [
"pkName1",
"pkName2"
],
"source": {
"dbType": "mysql",
"dbVersion": "1.0.0",
"dbName": "myDatabase",
"schemaName": "mySchema",
"tableName": "tableName"
}
},
"payload": {
"before": {
"dataColumn":{
"id": 111,
"name":"scooter",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// The ID of a row in the Oracle data source. The value is of the STRING type.
}
},
"after": {
"dataColumn":{
"id": 222,
"name":"donald",
"binData": "[base64 string]",
"ts": 1590315269000,
"rowid": "AAIUMPAAFAACxExAAE"// The ID of a row in the Oracle data source. The value is of the STRING type.
}
},
"sequenceId":"XXX",// The unique sequence ID of each data record that is generated after the incremental data and all data are merged. The value is of the STRING type.
"scn":"xxxx",// The system change number (SCN) of the Oracle data source. The value is of the STRING type.
"op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",// The operation that is performed. The value of the parameter is case-sensitive.
"timestamp": {
"eventTime": 1,// Required. The time when the data in the source database changes. The value is a 13-bit timestamp in milliseconds.
"systemTime": 2,// Optional. The time when the synchronization node reads the change message. The value is a 13-bit timestamp in milliseconds.
"checkpointTime": 3// Optional. The specified time when the synchronization offset is reset. The value is a 13-bit timestamp in milliseconds and equals the value of the eventTime field in most cases.
},
"ddl": {
"text": "ADD COLUMN ...",
"ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
}
},
"version":"1.0.0"
}
Format of a heartbeat message generated by a synchronization node
{
"schema": {
"dataColumn": null,
"primaryKey": null,
"source": null
},
"payload": {
"before": null,
"after": null,
"sequenceId": null,
"timestamp": {
"eventTime": 1620457659000,
"checkpointTime": 1620457659000
},
"op": "MHEARTBEAT",
"ddl": null
},
"version": "0.0.1"
}
Format of a Kafka message for data change in a source
- Format of a Kafka message for data insertion into a source:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000000", "timestamp": { "eventTime": 1620457896000, "systemTime": 1620457896977, "checkpointTime": 1620457896000 }, "op": "INSERT", "ddl": null }, "version": "0.0.1" }
- Format of a Kafka message for data update in a source:
- If When one record in the source is updated, one Kafka record is generated is not selected, two Kafka messages are generated for a data update in a source. One Kafka message describes the status of data before the update and the other Kafka message describes the status of data after the update. The following sample messages show the formats:Format of the Kafka message that describes the status of data before the change:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_BEFOR", "ddl": null }, "version": "0.0.1" }
Format of the Kafka message that describes the status of data after the change:{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": null, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
- If When one record in the source is updated, one Kafka record is generated is selected, only one Kafka message is generated for a data update in a source. The Kafka message describes the status of data before and after the update. The following sample messages show the formats:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "man", "#alibaba_rds_row_id#": 15 } }, "after": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "sequenceId": "1620457642589000001", "timestamp": { "eventTime": 1620458077000, "systemTime": 1620458077779, "checkpointTime": 1620458077000 }, "op": "UPDATE_AFTER", "ddl": null }, "version": "0.0.1" }
- If When one record in the source is updated, one Kafka record is generated is not selected, two Kafka messages are generated for a data update in a source. One Kafka message describes the status of data before the update and the other Kafka message describes the status of data after the update. The following sample messages show the formats:
- Format of the Kafka message for data deletion from a source:
{ "schema": { "dataColumn": [ { "name": "name", "type": "STRING" }, { "name": "job", "type": "STRING" }, { "name": "sex", "type": "STRING" }, { "name": "#alibaba_rds_row_id#", "type": "LONG" } ], "primaryKey": null, "source": { "dbType": "MySQL", "dbName": "pkset_test", "tableName": "pkset_test_no_pk" } }, "payload": { "before": { "dataColumn": { "name": "name11", "job": "job11", "sex": "woman", "#alibaba_rds_row_id#": 15 } }, "after": null, "sequenceId": "1620457642589000002", "timestamp": { "eventTime": 1620458266000, "systemTime": 1620458266101, "checkpointTime": 1620458266000 }, "op": "DELETE", "ddl": null }, "version": "0.0.1" }
Field types
Field type | Description |
---|---|
BOOLEAN | Corresponds to the BOOLEAN type in JSON. Valid values: true and false. |
DATE | Corresponds to the NUMBER type in JSON. The value is a 13-digit timestamp in milliseconds. |
BYTES | Corresponds to the STRING type in JSON. Before data is written to Kafka, the byte arrays are encoded in Base64 and converted into strings. A consumer needs to decode the Base64-encoded strings before it consumes the strings. Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) is used for encoding and Base64.getDecoder().decode(encodedText)) is used for decoding. |
STRING | Corresponds to the STRING type in JSON. |
LONG | Corresponds to the NUMBER type in JSON. |
DOUBLE | Corresponds to the NUMBER type in JSON. |
Fields
The following table describes the meaning of each field in a message that is written to Kafka.
Level-1 field | Level-2 field | Description |
---|---|---|
schema | dataColumn | The names and types of columns. The value is of the JSONArray type. dataColumn records the names and types of columns that are updated in the source. A change operation can be data addition, deletion, or modification, or a table schema change in the source.
|
primaryKey | The primary key information. The value is of the List type. pk: the name of the primary key. | |
source | The information about the source database or source table. The value is of the Object type.
| |
payload | before | The data before a change. The value is of the JSONObject type. For example, if a MySQL database is the source and data in this source is updated, the before field records the data before the update.
|
after | The data after a change. The after field records the data after a change in the same data format as that of the before field. | |
sequenceId | The unique sequence ID of each data record that is generated by StreamX after the incremental data and all data are merged. The value is of the STRING type. Note After a data update message is read from the source, two write records are generated: update before and update after. The two write records have the same sequence ID. | |
scn | The SCN of the source. This field is valid when the source is an Oracle database. | |
op | The type of the operation that is performed on the data in the source. Valid values:
| |
timestamp | The timestamp of a data record. The value is of the JSONObject type.
| |
ddl | This field is specified only if the schema of a table in the source is changed. The value is NULL when a DDL operation, such as data addition, deletion, or modification is performed in the source.
| |
version | N/A | The version of data in the JSON format. |