本文为您介绍DataHub不同数据类型对应操作的支持情况,不同数据类型的分片策略、数据格式及相关消息示例。
不同数据类型对应操作的支持情况
Topic是DataHub订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型:
DataHub类型 | 写入DML消息 | 写入上游心跳消息 | 写入DDL消息 | 源表和目标topic映射方式 | 数据类型 |
Tuple | 支持 | 不支持 | 不支持 | 单表对单topic | DataHub支持的类型 |
Blob | 支持 | 支持 | 支持 | 单库(多表)对单topic | Blob二进制数据 |
Tuple类型由于schema各字段在topic创建后无法更改,所以适用于schema固定,且源表无add column、drop column等改变schema的DDL操作场景。Tuple类型不支持保留上游传递的DDL消息以及心跳消息,即Tuple不能将此类消息透穿给消费DataHub的下游。而且源表和topic的映射方式为单表对单topic,如果源表数量过多,则需要创建大量的topic,将不便于下游消费。
Blob类型由于不存在schema,topic内只存放Blob二进制数据,因此有较大的自由度。支持存放源表的DDL消息和源端的心跳消息,可传递给下游消费,且采用单库(多表)对单topic的映射方式,不论源表数量多少仅需创建一个topic,方便下游消费。更适用于DataHub作为中间消息队列进行整库迁移的场景。
不同数据类型分片策略
Shard表示对DataHub的一个topic进行数据传输的并发通道,单个Shard写入速率有上限,多个Shard可以提高写入性能,但DataHub仅能保证单个Shard在消费时的有序性,不保证多个Shard之间消息的顺序。因此,为了既能通过增加Shard数量提高写入性能,又能够保证多个Shard之间消息的有序性,同时避免数据倾斜,现针对Blob和Tuple类型提供以下分片策略。
场景 | Tuple | Blob |
有主键(包含自定义主键) | 按主键进行分片 | 按主键进行分片 |
顺序保证 | 同一主键消息保证有序 | 同一主键消息保证有序 |
无主键 | 随机分片 | 按表名进行分片 |
顺序保证 | 不保证有序 | 同一表消息保证有序 |
同步数据格式
Tuple
数据类型为DataHub Tuple topic自身支持的类型。当使用数据集成创建的topic时,会增加部分元数据列。其中
_sequence_id_
、_excute_time_
、_source_table_
、_before_image_
、_after_image_
为元信息列。参数
描述
_sequence_id_
string类型,由数字组成,每条消息唯一(update before和update after共用一个sequence id)。
_excute_time_
数据产生时间。
_source_table_
数据源表名。
_before_image_
前镜像(update before和delete为Y,update after和insert为N)。
_after_image_
后镜像(update before和delete为N,update after和insert为Y)。
示例:下表为一条Insert、Update、Delete语句同步到DataHub的结果。
_sequence_id_
_operation_type_
_excute_time_
_before_image_
_after_image_
1649991610688000000
I
1649991726000
N
Y
1649991610688000001
U
1649991756000
Y
N
1649991610688000001
U
1649991756000
N
Y
1649991610688000002
D
1649991774000
Y
N
Blob
Blob类型的消息格式为JSON字符串转化的二进制数据,其对应的JSON格式如下:
{ "schema": { //变更的元数据信息,仅指定列名与列类型信息 "dataColumn": [//变更的数据列信息,更新目标表记录内容 { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "binData", "type": "BYTES" }, { "name": "ts", "type": "DATE" } ], "primaryKey": [ "pkName1", "pkName2" ], "source": { "dbType": "mysql", "dbVersion": "1.0.0", "dbName": "myDatabase", "schemaName": "mySchema", "tableName": "tableName" } }, "payload": { "before": { "dataColumn":{ "id": 111, "name":"scooter", "binData": "[base64 string]", "ts": 1590315269000 } }, "after": { "dataColumn":{ "id": 222, "name":"donald", "binData": "[base64 string]", "ts": 1590315269000 } }, "sequenceId":XXX//字符串类型,用于增全量数据合并的数据排序, "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT..."//大小写敏感, "timestamp": { "eventTime": 1,//必选,记录的变更时间,13为时间戳,ms精度 "systemTime": 2,//可选,oracle CDC等部分数据源存在 "checkpointTime": 3//可选,部分数据库如oceanbase等数据源包含 }, "ddl": { "text": "ADD COLUMN ...", "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]" } }, "version":"1.0.0" }
Blob字段说明
重要消息中的所有字段类型范围为StreamX定义的BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六种类型。
BOOLEAN:取值为true,false DATE:取值为13为整形,时间精确到ms级 BYTES: 存储bytes类型,格式为base64编码后的字符串 BASE64编解码使用java.util.Base64中的接口实现: String text = "测试text123"; //编码 Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) //编码 Base64.getDecoder().decode(encodedText)//解码
一级元素
二级元素
说明
schema
dataColumn
JSONArray类型,数据列的类型信息。dataColumn记录上游数据变更记录的所有列和对应的列类型信息。变更操作包括数据库对数据的更改(新增、删除及修改)和数据库表结构等变更。
name:列名
type:列类型
primaryKey
List类型,主键信息。
pk:主键名。
source
Object 类型,源端数据库或表信息。
dbType:String类型,数据库类型
dbVersion:String类型,数据库版本
dbName:String类型,数据库名
schemaName:String类型,Schema名(针对Postgres和SQL Server等)
tableName:String 类型,数据表名
payload
before
JSONObject类型,修改前的数据。例如:数据源端为mysql,做了一次记录的update操作,before字段存储记录被update之前的数据内容。
在从源端读取到更新、删除操作消息时,在写入记录中填充该字段。
dataColumn:JSONObject类型,表示数据信息。格式为列名:列值, 列名为字符串,列值取决于本身类型,BYTES类型使用Base64 String进行表示,DATE类型采用long表示的13位时间戳,其余类型的值均为本身类型。
after
修改后的数据。格式同before相同。
说明在更新、插入操作时必填。
op
操作类型。取值如下:
INSERT:数据插入
UPDATE_BEFOR:数据更新前
UPDATE_AFTER:数据更新后
DELETE:数据删除
TRANSACTION_BEGIN:数据库事务开始
TRANSACTION_END:数据库事务结束
CREATE:数据库建表
ALTER:数据库表变更
QUERY:数据库变更的原始SQL
TRUNCATE:数据库表清空
RENAME:数据库表重命名
CINDEX:创建索引
DINDEX:删除索引
MHEARTBEAT:用于在源端无新增数据时标识同步仍正常进行的心跳消息
timestamp
JSONObject 类型,本条数据的相关时间戳。
eventTime:Long类型,记录源端库发生变更的时间,毫秒精度的13位时间戳。
systemTime:Long类型,同步任务处理该条变更消息的时间,毫秒精度的13位时间戳。
checkpointTime:Long类型,重置同步位点时的设置时间,毫秒精度的13位时间戳,一般与eventTime值一致。
ddl
该字段只在更改数据库的表结构时才会填充数据,更改数据(包括新增、删除和修改)时对应的ddl直接填充为null。
text:String类型,数据库DDL语句文本。
ddlMeta:String类型,使用FastSQL对DDL进行解析后生成的SQLStatement Object进行序列化的二进制表示,并使用Base64编码为String存储。
开启ddl支持时,需要传递的SQLStatement序列化对象,下游链路反序列化解析对象后,还原成目标数据源的ddl语句做变更。
version
无
格式的版本号。
Blob序列化说明
本文定义的JSON格式,一条消息对应一个JSONObject,JSONObject内部按照消息格式,逐级映射为相应的格式(JSONObject,JSONArray,相应类型的value等)。
整个JSONObject中每个字段的存放类型均按照上述字段说明。序列化将JSONObject转换为String(如fastJSON的toJSONString方法)然后再采用String的getBytes(Charsets.UTF_8)方法,指定UTF_8字符集转化为byte[]。
相关消息的JSON样例
Insert:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "INSERT", "after": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000004", "timestamp": { "eventTime": 1605339932000, "systemTime": 1605339932736, "checkpointTime": 1605339932000 } }, "version": "0.0.1" }
update before:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_BEFOR", "before": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
update after:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_AFTER", "after": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
delete:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "DELETE", "before": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000006", "timestamp": { "eventTime": 1605339937000, "systemTime": 1605339937671, "checkpointTime": 1605339937000 } }, "version": "0.0.1" }
Heartbeat:
{ "schema": {}, "payload": { "op": "MHEARTBEAT", "timestamp": { "eventTime": 1605339953629, "checkpointTime": 1605339953629 } }, "version": "0.0.1" }
DDL:
{ "schema": { "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_nopk" } }, "payload": { "op": "ALTER", "sequenceId": "1605339516000000035", "ddl": { "text": "alter table t_shiyu_nopk add column holo text", "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA==" }, "timestamp": { "eventTime": 1605342109000, "systemTime": 1605342109259, "checkpointTime": 1605342109000 } }, "version": "0.0.1" }