全部产品
Search
文档中心

大数据开发治理平台 DataWorks:附录:消息格式

更新时间:Apr 03, 2024

本文介绍写入Kafka消息的消息结构及各字段含义。

背景信息

同步整库数据至kafka任务,是将从上游数据源读取的数据,按照下面描述的JSON格式写入到Kafka的topic。消息总体格式包括变更记录的列信息、以及数据变更前后的状态信息等。为确保消费Kafka中数据时能够准确判断同步任务进度,同步任务还将定时产生op字段作为MHEARTBEAT的同步任务心跳记录写入Kafka的topic中。以下为您介绍写入Kafka的消息总体格式同步任务心跳消息格式源端更改数据对应的消息格式,关于字段类型及参数说明等信息,详情请参见字段类型参数说明

消息总体格式

写入Kafka消息的总体格式如下所示:

{
    "schema": { //变更的元数据信息,仅指定列名与列类型信息
        "dataColumn": [//变更的数据列信息,更新目标表记录内容
            {
                "name": "id",
                "type": "LONG"
            },
            {
                "name": "name",
                "type": "STRING"
            },
            {
                "name": "binData",
                "type": "BYTES"
            },
            {
                "name": "ts",
                "type": "DATE"
            },
            {
              "name":"rowid",// 数据源为Oracle时,rowid会放在数据列中
              "type":"STRING"
            }
        ],
        "primaryKey": [
            "pkName1",
            "pkName2"
        ],
        "source": {
            "dbType": "mysql",
            "dbVersion": "1.0.0",
            "dbName": "myDatabase",
            "schemaName": "mySchema",
            "tableName": "tableName"
        }
    },
    "payload": {
        "before": {
            "dataColumn":{
                "id": 111,
                "name":"scooter",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"//字符串类型,Oracle的rowid信息
            }
        },
        "after": {
            "dataColumn":{
                "id": 222,
                "name":"donald",
                "binData": "[base64 string]",
                "ts": 1590315269000,
                "rowid": "AAIUMPAAFAACxExAAE"//字符串类型,Oracle的rowid信息
            }
        },
        "sequenceId":"XXX",//字符串类型,用于增全量数据合并的数据排序,
        "scn":"xxxx",//字符串类型,Oracle的scn信息
        "op": "INSERT/UPDATE_BEFOR/UPDATE_AFTER/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT...",//大小写敏感,
        "timestamp": {
            "eventTime": 1,//必选,记录源端库发生变更的时间,毫秒精度的13位时间戳
            "systemTime": 2,//可选,同步任务处理该条变更消息的时间,毫秒精度的13位时间戳
            "checkpointTime": 3//可选,重置同步位点时的设置时间,毫秒精度的13位时间戳,一般等于eventTime
        },
        "ddl": {
            "text": "ADD COLUMN ...",
            "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]"
        }
    },
    "version":"1.0.0"
}
说明

关于字段类型及参数说明等信息,详情请参见字段类型参数说明

同步任务心跳消息格式

{
    "schema": {
        "dataColumn": null,
        "primaryKey": null,
        "source": null
    },
    "payload": {
        "before": null,
        "after": null,
        "sequenceId": null,
        "timestamp": {
            "eventTime": 1620457659000,
            "checkpointTime": 1620457659000
        },
        "op": "MHEARTBEAT",
        "ddl": null
    },
    "version": "0.0.1"
}
说明

关于字段类型及参数说明等信息,详情请参见字段类型参数说明

源端更改数据对应的消息格式

  • 源端插入数据对应的Kafka消息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": null,
            "after": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "man",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "sequenceId": "1620457642589000000",
            "timestamp": {
                "eventTime": 1620457896000,
                "systemTime": 1620457896977,
                "checkpointTime": 1620457896000
            },
            "op": "INSERT",
            "ddl": null
        },
        "version": "0.0.1"
    }
  • 源端更新数据对应的Kafka消息格式:

    • 当未勾选源端update变更对应一条Kafka记录时,源端更新数据对应的Kafka消息格式包含两条Kafka消息,分别描述更新前的数据状态和更新后的数据状态。具体消息格式如下:

      更新前的数据状态消息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": null,
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_BEFOR",
              "ddl": null
          },
          "version": "0.0.1"
      }

      更新后的数据状态消息格式:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": null,
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
    • 当勾选了源端update变更对应一条Kafka记录时,源端更新数据对应的Kafka消息格式包含一条Kafka消息,描述更新前的数据状态和更新后的数据状态。具体消息格式如下:

      {
          "schema": {
              "dataColumn": [
                  {
                      "name": "name",
                      "type": "STRING"
                  },
                  {
                      "name": "job",
                      "type": "STRING"
                  },
                  {
                      "name": "sex",
                      "type": "STRING"
                  },
                  {
                      "name": "#alibaba_rds_row_id#",
                      "type": "LONG"
                  }
              ],
              "primaryKey": null,
              "source": {
                  "dbType": "MySQL",
                  "dbName": "pkset_test",
                  "tableName": "pkset_test_no_pk"
              }
          },
          "payload": {
              "before": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "man",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "after": {
                  "dataColumn": {
                      "name": "name11",
                      "job": "job11",
                      "sex": "woman",
                      "#alibaba_rds_row_id#": 15
                  }
              },
              "sequenceId": "1620457642589000001",
              "timestamp": {
                  "eventTime": 1620458077000,
                  "systemTime": 1620458077779,
                  "checkpointTime": 1620458077000
              },
              "op": "UPDATE_AFTER",
              "ddl": null
          },
          "version": "0.0.1"
      }
  • 源端删除数据对应的Kafka消息格式:

    {
        "schema": {
            "dataColumn": [
                {
                    "name": "name",
                    "type": "STRING"
                },
                {
                    "name": "job",
                    "type": "STRING"
                },
                {
                    "name": "sex",
                    "type": "STRING"
                },
                {
                    "name": "#alibaba_rds_row_id#",
                    "type": "LONG"
                }
            ],
            "primaryKey": null,
            "source": {
                "dbType": "MySQL",
                "dbName": "pkset_test",
                "tableName": "pkset_test_no_pk"
            }
        },
        "payload": {
            "before": {
                "dataColumn": {
                    "name": "name11",
                    "job": "job11",
                    "sex": "woman",
                    "#alibaba_rds_row_id#": 15
                }
            },
            "after": null,
            "sequenceId": "1620457642589000002",
            "timestamp": {
                "eventTime": 1620458266000,
                "systemTime": 1620458266101,
                "checkpointTime": 1620458266000
            },
            "op": "DELETE",
            "ddl": null
        },
        "version": "0.0.1"
    }
说明

关于字段类型及参数说明等信息,详情请参见字段类型参数说明

字段类型

写入Kafka topic中的消息将从源端读取数据映射为BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六种类型,再以不同的JSON格式写入kafka topic中。

类型

说明

BOOLEAN

对应JSON中的布尔类型,取值为true,false

DATE

对应JSON中的数值类型,取值为13位数字时间戳,精确到毫秒(ms)级。

BYTES

对应JSON中的字符串类型,写入Kafka前会先对字节数组进行base64编码转换为字符串,消费时需要进行base64解码(编码Base64.getEncoder().encodeToString(text.getBytes("UTF-8"));解码Base64.getDecoder().decode(encodedText))

STRING

对应JSON中的字符串类型

LONG

对应JSON中的数值类型

DOUBLE

对应JSON中的数值类型

参数说明

以下为您介绍写入Kafka的消息中的各个字段的含义及说明。

一级元素

二级元素

说明

schema

dataColumn

JSONArray类型,数据列的类型信息。dataColumn记录上游数据变更记录的所有列和对应的列类型信息。变更操作包括数据库对数据的更改(新增、删除及修改)和数据库表结构等变更。

  • name:列名

  • type:列类型

primaryKey

List类型,主键信息。

pk:主键名。

source

Object 类型,源端数据库或表信息。

  • dbType:String类型,数据库类型

  • dbVersion:String类型,数据库版本

  • dbName:String类型,数据库名

  • schemaName:String类型,Schema名(针对Postgres和SQL Server等)

  • tableName:String 类型,数据表名

payload

before

JSONObject类型,修改前的数据。例如:数据源端为mysql,做了一次记录的update操作,before字段存储记录被update之前的数据内容。

  • 在从源端读取到更新、删除操作消息时,在写入记录中填充该字段。

  • dataColumn:JSONObject类型,表示数据信息。格式为列名:列值, 列名为字符串,列值BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING。

after

修改后的数据。格式同before相同。

sequenceId

字符串类型,Streamx产生,用于增量数据和全量数据合并的数据排序,每个streamx record都是唯一的。

说明

对于从源端读取的更新操作消息,会生成两条写入记录,一条update before记录和一条update after记录,这两条记录的sequenceId相同。

scn

当源端为Oracle数据库时有效,对应Oracle的scn信息。

op

对应源端读取到的消息类型,取值如下:

  • INSERT:数据插入

  • UPDATE_BEFOR:数据更新前

  • UPDATE_AFTER:数据更新后

  • DELETE:数据删除

  • TRANSACTION_BEGIN:数据库事务开始

  • TRANSACTION_END:数据库事务结束

  • CREATE:数据库建表

  • ALTER:数据库表变更

  • QUERY:数据库变更的原始SQL

  • TRUNCATE:数据库表清空

  • RENAME:数据库表重命名

  • CINDEX:创建索引

  • DINDEX:删除索引

  • MHEARTBEAT:用于在源端无新增数据时标识同步仍正常进行的心跳消息

timestamp

JSONObject 类型,本条数据的相关时间戳。

  • eventTime:Long类型,记录源端库发生变更的时间,毫秒精度的13位时间戳。

  • systemTime:Long类型,同步任务处理该条变更消息的时间,毫秒精度的13位时间戳。

  • checkpointTime:Long类型,重置同步位点时的设置时间,毫秒精度的13位时间戳,一般与eventTime值一致。

ddl

该字段只在更改数据库的表结构时才会填充数据,更改数据(包括新增、删除和修改)时对应的ddl直接填充为null。

  • text:String类型,数据库DDL语句文本。

  • ddlMeta:String类型,将数据库ddl类型变更记录到一个Java对象,使用对象序列化后再进行base64编码得到的字符串。

version

格式的版本号。