本文為您介紹DataHub不同資料類型對應操作的支援情況,不同資料類型的分區策略、資料格式及相關訊息樣本。
不同資料類型對應操作的支援情況
Topic是DataHub訂閱和發布的最小單位,使用者可以用Topic來表示一類或者一種流資料。目前支援Tuple與Blob兩種類型:
DataHub類型 | 寫入DML訊息 | 寫入上遊心跳訊息 | 寫入DDL訊息 | 源表和目標topic映射方式 | 資料類型 |
Tuple | 支援 | 不支援 | 不支援 | 單表對單topic | DataHub支援的類型 |
Blob | 支援 | 支援 | 支援 | 單庫(多表)對單topic | Blob位元據 |
Tuple類型由於schema各欄位在topic建立後無法更改,所以適用於schema固定,且源表無add column、drop column等改變schema的DDL操作環境。Tuple類型不支援保留上遊傳遞的DDL訊息以及心跳訊息,即Tuple不能將此類訊息透穿給消費DataHub的下遊。而且源表和topic的映射方式為單表對單topic,如果源表數量過多,則需要建立大量的topic,將不便於下遊消費。
Blob類型由於不存在schema,topic內只存放Blob位元據,因此有較大的自由度。支援存放源表的DDL訊息和源端的心跳訊息,可傳遞給下遊消費,且採用單庫(多表)對單topic的映射方式,不論源表數量多少僅需建立一個topic,方便下遊消費。更適用於DataHub作為中間訊息佇列進行整庫遷移的情境。
不同資料類型分區策略
Shard表示對DataHub的一個topic進行資料轉送的並發通道,單個Shard寫入速率有上限,多個Shard可以提高寫入效能,但DataHub僅能保證單個Shard在消費時的有序性,不保證多個Shard之間訊息的順序。因此,為了既能通過增加Shard數量提高寫入效能,又能夠保證多個Shard之間訊息的有序性,同時避免資料扭曲,現針對Blob和Tuple類型提供以下分區策略。
情境 | Tuple | Blob |
有主鍵(包含自訂主鍵) | 按主鍵進行分區 | 按主鍵進行分區 |
順序保證 | 同一主鍵訊息保證有序 | 同一主鍵訊息保證有序 |
無主鍵 | 隨機分區 | 按表名進行分區 |
順序保證 | 不保證有序 | 同一表訊息保證有序 |
同步資料格式
Tuple
資料類型為DataHub Tuple topic自身支援的類型。當使用Data Integration建立的topic時,會增加部分中繼資料列。其中
_sequence_id_
、_excute_time_
、_source_table_
、_before_image_
、_after_image_
為元資訊列。參數
描述
_sequence_id_
string類型,由數字組成,每條訊息唯一(update before和update after共用一個sequence id)。
_excute_time_
資料產生時間。
_source_table_
資料來源表名。
_before_image_
前鏡像(update before和delete為Y,update after和insert為N)。
_after_image_
後鏡像(update before和delete為N,update after和insert為Y)。
樣本:下表為一條Insert、Update、Delete語句同步到DataHub的結果。
_sequence_id_
_operation_type_
_excute_time_
_before_image_
_after_image_
1649991610688000000
I
1649991726000
N
Y
1649991610688000001
U
1649991756000
Y
N
1649991610688000001
U
1649991756000
N
Y
1649991610688000002
D
1649991774000
Y
N
Blob
Blob類型的訊息格式為JSON字串轉化的位元據,其對應的JSON格式如下:
{ "schema": { //變更的中繼資料資訊,僅指定列名與列類型資訊 "dataColumn": [//變更的資料列資訊,更新目標表記錄內容 { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "binData", "type": "BYTES" }, { "name": "ts", "type": "DATE" } ], "primaryKey": [ "pkName1", "pkName2" ], "source": { "dbType": "mysql", "dbVersion": "1.0.0", "dbName": "myDatabase", "schemaName": "mySchema", "tableName": "tableName" } }, "payload": { "before": { "dataColumn":{ "id": 111, "name":"scooter", "binData": "[base64 string]", "ts": 1590315269000 } }, "after": { "dataColumn":{ "id": 222, "name":"donald", "binData": "[base64 string]", "ts": 1590315269000 } }, "sequenceId":XXX//字串類型,用於增全量資料合併的資料排序, "op": "INSERT/UPDATE/DELETE/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/MHEARTBEAT..."//大小寫敏感, "timestamp": { "eventTime": 1,//必選,記錄的變更時間,13為時間戳記,ms精度 "systemTime": 2,//可選,oracle CDC等部分資料來源存在 "checkpointTime": 3//可選,部分資料庫如oceanbase等資料來源包含 }, "ddl": { "text": "ADD COLUMN ...", "ddlMeta": "[SQLStatement serialized binary, expressed in base64 string]" } }, "version":"1.0.0" }
Blob欄位說明
重要訊息中的所有欄位類型範圍為StreamX定義的BOOLEAN、DOUBLE、DATE、BYTES、LONG,STRING六種類型。
BOOLEAN:取值為true,false DATE:取值為13為整形,時間精確到ms級 BYTES: 儲存bytes類型,格式為base64編碼後的字串 BASE64編解碼使用java.util.Base64中的介面實現: String text = "測試text123"; //編碼 Base64.getEncoder().encodeToString(text.getBytes("UTF-8")) //編碼 Base64.getDecoder().decode(encodedText)//解碼
一級元素
二級元素
說明
schema
dataColumn
JSONArray類型,資料列的類型資訊。dataColumn記錄上遊資料變更記錄的所有列和對應的列類型資訊。變更操作包括資料庫對資料的更改(新增、刪除及修改)和資料庫表結構等變更。
name:列名
type:列類型
primaryKey
List類型,主鍵資訊。
pk:主鍵名。
source
Object 類型,源端資料庫或表資訊。
dbType:String類型,資料庫類型
dbVersion:String類型,資料庫版本
dbName:String類型,資料庫名
schemaName:String類型,Schema名(針對Postgres和SQL Server等)
tableName:String 類型,資料表名
payload
before
JSONObject類型,修改前的資料。例如:資料來源端為mysql,做了一次記錄的update操作,before欄位儲存記錄被update之前的資料內容。
在從源端讀取到更新、刪除操作訊息時,在寫入記錄中填充該欄位。
dataColumn:JSONObject類型,表示資料資訊。格式為列名:列值, 列名為字串,列值取決於本身類型,BYTES類型使用Base64 String進行表示,DATE類型採用long表示的13位時間戳記,其餘類型的值均為本身類型。
after
修改後的資料。格式同before相同。
說明在更新、插入操作時必填。
op
操作類型。取值如下:
INSERT:資料插入
UPDATE_BEFOR:資料更新前
UPDATE_AFTER:資料更新後
DELETE:資料刪除
TRANSACTION_BEGIN:資料庫事務開始
TRANSACTION_END:資料庫事務結束
CREATE:資料庫建表
ALTER:資料庫表變更
QUERY:資料庫變更的原始SQL
TRUNCATE:資料庫表清空
RENAME:資料庫表重新命名
CINDEX:建立索引
DINDEX:刪除索引
MHEARTBEAT:用於在源端無新增資料時身分識別同步處理仍正常進行的心跳訊息
timestamp
JSONObject 類型,本條資料的相關時間戳記。
eventTime:Long類型,記錄源端庫發生變更的時間,毫秒精度的13位時間戳記。
systemTime:Long類型,同步任務處理該條變更訊息的時間,毫秒精度的13位時間戳記。
checkpointTime:Long類型,重設同步位點時的設定時間,毫秒精度的13位時間戳記,一般與eventTime值一致。
ddl
該欄位只在更改資料庫的表結構時才會填充資料,更改資料(包括新增、刪除和修改)時對應的ddl直接填充為null。
text:String類型,資料庫DDL語句文本。
ddlMeta:String類型,使用FastSQL對DDL進行解析後產生的SQLStatement Object進行序列化的二進位表示,並使用Base64編碼為String儲存。
開啟ddl支援時,需要傳遞的SQLStatement序列化對象,下遊鏈路還原序列化解析對象後,還原成目標資料來源的ddl語句做變更。
version
無
格式的版本號碼。
Blob序列化說明
本文定義的JSON格式,一條訊息對應一個JSONObject,JSONObject內部按照訊息格式,逐級映射為相應的格式(JSONObject,JSONArray,相應類型的value等)。
整個JSONObject中每個欄位的存放類型均按照上述欄位說明。序列化將JSONObject轉換為String(如fastJSON的toJSONString方法)然後再採用String的getBytes(Charsets.UTF_8)方法,指定UTF_8字元集轉化為byte[]。
相關訊息的JSON範例
Insert:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "INSERT", "after": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000004", "timestamp": { "eventTime": 1605339932000, "systemTime": 1605339932736, "checkpointTime": 1605339932000 } }, "version": "0.0.1" }
update before:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_BEFOR", "before": { "dataColumn": { "name": "joe", "comment": "comment", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
update after:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "UPDATE_AFTER", "after": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000005", "timestamp": { "eventTime": 1605339934000, "systemTime": 1605339934951, "checkpointTime": 1605339934000 } }, "version": "0.0.1" }
delete:
{ "schema": { "dataColumn": [ { "name": "id", "type": "LONG" }, { "name": "name", "type": "STRING" }, { "name": "comment", "type": "STRING" } ], "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_pk" }, "primaryKey": [ "id", "name" ] }, "payload": { "op": "DELETE", "before": { "dataColumn": { "name": "joe", "comment": "com1", "id": 1 } }, "sequenceId": "1605339516000000006", "timestamp": { "eventTime": 1605339937000, "systemTime": 1605339937671, "checkpointTime": 1605339937000 } }, "version": "0.0.1" }
Heartbeat:
{ "schema": {}, "payload": { "op": "MHEARTBEAT", "timestamp": { "eventTime": 1605339953629, "checkpointTime": 1605339953629 } }, "version": "0.0.1" }
DDL:
{ "schema": { "source": { "dbName": "yunshi_db", "dbType": "MySQL", "tableName": "t_shiyu_nopk" } }, "payload": { "op": "ALTER", "sequenceId": "1605339516000000035", "ddl": { "text": "alter table t_shiyu_nopk add column holo text", "ddlMeta": "rO0ABXNyACljb20uYWxpYmFiYS5kaS5wbHVnaW4uY2VudGVyLm1ldGEuRERMTWV0YQLb5Cx/YWXtAgACTAAHZGRsVGV4dHQAEkxqYXZhL2xhbmcvU3RyaW5nO0wACXN0YXRlbWVudHQAKkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMU3RhdGVtZW50O3hwdAAtYWx0ZXIgdGFibGUgdF9zaGl5dV9ub3BrIGFkZCBjb2x1bW4gaG9sbyB0ZXh0c3IAPGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQWx0ZXJUYWJsZVN0YXRlbWVudBQPP3vMUl2cAgAPSQAHYnVja2V0c1oABmlnbm9yZVoAF2ludmFsaWRhdGVHbG9iYWxJbmRleGVzWgAPbWVyZ2VTbWFsbEZpbGVzWgAHb2ZmbGluZVoABm9ubGluZVoADnJlbW92ZVBhdGl0aW5nWgATdXBkYXRlR2xvYmFsSW5kZXhlc1oAD3VwZ3JhZGVQYXRpdGluZ0wAC2NsdXN0ZXJlZEJ5dAAQTGphdmEvdXRpbC9MaXN0O0wABWl0ZW1zcQB+AAZMAAlwYXJ0aXRpb250ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTFBhcnRpdGlvbkJ5O0wACHNvcnRlZEJ5cQB+AAZMAAx0YWJsZU9wdGlvbnNxAH4ABkwAC3RhYmxlU291cmNldAA6TGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9zdGF0ZW1lbnQvU1FMRXhwclRhYmxlU291cmNlO3hyACxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMU3RhdGVtZW50SW1wbEOxUUDVCJMGAgADWgAJYWZ0ZXJTZW1pTAAGZGJUeXBldAAcTGNvbS9hbGliYWJhL2Zhc3RzcWwvRGJUeXBlO0wACWhlYWRIaW50c3EAfgAGeHIAKWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5TUUxPYmplY3RJbXBs5LvqLFggFVECAAVJAAxzb3VyY2VDb2x1bW5JAApzb3VyY2VMaW5lTAAKYXR0cmlidXRlc3QAD0xqYXZhL3V0aWwvTWFwO0wABGhpbnR0ACxMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTENvbW1lbnRIaW50O0wABnBhcmVudHQAJ0xjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvU1FMT2JqZWN0O3hwAAAAAAAAAABwcHAAfnIAGmNvbS5hbGliYWJhLmZhc3RzcWwuRGJUeXBlAAAAAAAAAAASAAB4cgAOamF2YS5sYW5nLkVudW0AAAAAAAAAABIAAHhwdAAFbXlzcWxwAAAAAAAAAAAAAAAAc3IAE2phdmEudXRpbC5BcnJheUxpc3R4gdIdmcdhnQMAAUkABHNpemV4cAAAAAB3BAAAAAB4c3EAfgAUAAAAAXcEAAAAAXNyADxjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTEFsdGVyVGFibGVBZGRDb2x1bW4l5T6CFe//BAIABloAB2Nhc2NhZGVaAAVmaXJzdEwAC2FmdGVyQ29sdW1udAAlTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxOYW1lO0wAB2NvbHVtbnNxAH4ABkwAC2ZpcnN0Q29sdW1ucQB+ABhMAAhyZXN0cmljdHQAE0xqYXZhL2xhbmcvQm9vbGVhbjt4cQB+AAsAAAAAAAAAAHBwcQB+AA8AAHBzcQB+ABQAAAABdwQAAAABc3IAOWNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMQ29sdW1uRGVmaW5pdGlvbst0gLKZ0qAtAgAmWgANYXV0b0luY3JlbWVudFoADGRpc2FibGVJbmRleFoAB3ByZVNvcnRJAAxwcmVTb3J0T3JkZXJaAAZzdG9yZWRaAAd2aXJ0dWFsWgAHdmlzaWJsZUwACGFubkluZGV4dAApTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9TUUxBbm5JbmRleDtMAAZhc0V4cHJ0ACVMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTEV4cHI7TAALY2hhcnNldEV4cHJxAH4AHkwADWNvbFByb3BlcnRpZXNxAH4ABkwAC2NvbGxhdGVFeHBycQB+AB5MAAdjb21tZW50cQB+AB5MAAtjb21wcmVzc2lvbnQALkxjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3QvZXhwci9TUUxDaGFyRXhwcjtMAAtjb25zdHJhaW50c3EAfgAGTAAIZGF0YVR5cGV0AClMY29tL2FsaWJhYmEvZmFzdHNxbC9zcWwvYXN0L1NRTERhdGFUeXBlO0wABmRiVHlwZXEAfgAKTAALZGVmYXVsdEV4cHJxAH4AHkwACWRlbGltaXRlcnEAfgAeTAASZGVsaW1pdGVyVG9rZW5pemVycQB+AB5MAAZlbmFibGVxAH4AGUwABmVuY29kZXEAfgAfTAAGZm9ybWF0cQB+AB5MABBnZW5lcmF0ZWRBbGF3c0FzcQB+AB5MAAhpZGVudGl0eXQARExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTENvbHVtbkRlZmluaXRpb24kSWRlbnRpdHk7TAASanNvbkluZGV4QXR0cnNFeHBycQB+AB5MAAhtYXBwZWRCeXEAfgAGTAAEbmFtZXEAfgAYTAAMbmxwVG9rZW5pemVycQB+AB5MAAhvblVwZGF0ZXEAfgAeTAAEcmVseXEAfgAZTAAMc2VxdWVuY2VUeXBldAAvTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL2FzdC9BdXRvSW5jcmVtZW50VHlwZTtMAARzdGVwcQB+AB5MAAdzdG9yYWdlcQB+AB5MAAl1bml0Q291bnRxAH4AHkwACXVuaXRJbmRleHEAfgAeTAAIdmFsaWRhdGVxAH4AGUwACXZhbHVlVHlwZXEAfgAeeHEAfgALAAAAAAAAAABwcHEAfgAaAAAAAAAAAAAAAHBwcHBwcHBzcQB+ABQAAAAAdwQAAAAAeHNyADpjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTENoYXJhY3RlckRhdGFUeXBlqtJac/d+04cCAAVaAAloYXNCaW5hcnlMAAtjaGFyU2V0TmFtZXEAfgABTAAIY2hhclR5cGVxAH4AAUwAB2NvbGxhdGVxAH4AAUwABWhpbnRzcQB+AAZ4cgArY29tLmFsaWJhYmEuZmFzdHNxbC5zcWwuYXN0LlNRTERhdGFUeXBlSW1wbEWL29pc1gZFAgAJSgAObmFtZUhhc2hDb2RlNjRaAAh1bnNpZ25lZFoAEXdpdGhMb2NhbFRpbWVab25lWgAIemVyb2ZpbGxMAAlhcmd1bWVudHNxAH4ABkwABmRiVHlwZXEAfgAKTAAHaW5kZXhCeXEAfgAeTAAEbmFtZXEAfgABTAAMd2l0aFRpbWVab25lcQB+ABl4cQB+AAsAAAAAAAAAAHBwcQB+ACP6BPTvGZVAfgAAAHNxAH4AFAAAAAB3BAAAAAB4cHB0AAR0ZXh0cABwcHBwcQB+ABJwcHBwcHBwcHBwc3IAMmNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5leHByLlNRTElkZW50aWZpZXJFeHBy3DXH1zvWbgkCAARKAApoYXNoQ29kZTY0TAAEbmFtZXEAfgABTAAOcmVzb2x2ZWRDb2x1bW5xAH4ADkwAE3Jlc29sdmVkT3duZXJPYmplY3RxAH4ADnhyACdjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3QuU1FMRXhwckltcGxs2ypmFJxWrQIAAHhxAH4ACwAAAAAAAAAAcHBwQCnxzH5tIDl0AARob2xvcHBwcHBwcHBwcHBweHBweHBzcQB+ABQAAAAAdwQAAAAAeHNxAH4AFAAAAAB3BAAAAAB4c3IAOGNvbS5hbGliYWJhLmZhc3RzcWwuc3FsLmFzdC5zdGF0ZW1lbnQuU1FMRXhwclRhYmxlU291cmNlRHD7eYJ4eswCAAVMAAdjb2x1bW5zcQB+AAZMAARleHBycQB+AB5MAApwYXJ0aXRpb25zcQB+AAZMAAhzYW1wbGluZ3QAOExjb20vYWxpYmFiYS9mYXN0c3FsL3NxbC9hc3Qvc3RhdGVtZW50L1NRTFRhYmxlU2FtcGxpbmc7TAAMc2NoZW1hT2JqZWN0dAAxTGNvbS9hbGliYWJhL2Zhc3RzcWwvc3FsL3JlcG9zaXRvcnkvU2NoZW1hT2JqZWN0O3hyADhjb20uYWxpYmFiYS5mYXN0c3FsLnNxbC5hc3Quc3RhdGVtZW50LlNRTFRhYmxlU291cmNlSW1wbAqEMenTm5zUAgAESgAPYWxpYXNIYXNoQ29kZTY0TAAFYWxpYXNxAH4AAUwACWZsYXNoYmFja3EAfgAeTAAFaGludHNxAH4ABnhxAH4ACwAAAAAAAAAAcHBwAAAAAAAAAABwcHBwc3EAfgAqAAAAAAAAAABwcHEAfgA0NH7o4UvP9Dt0AAx0X3NoaXl1X25vcGtwcHBwcA==" }, "timestamp": { "eventTime": 1605342109000, "systemTime": 1605342109259, "checkpointTime": 1605342109000 } }, "version": "0.0.1" }