Data Transmission Service (DTS) を使用して、KafkaクラスターやApsaraMQ for RocketMQなどのメッセージキューにデータを移行または同期する場合、メッセージキューにデータを格納する形式を指定できます。このトピックでは、メッセージキューにデータを格納するために使用できる形式について説明します。 これらのデータ形式の定義に基づいてデータを解析できます。
データ形式
DTSを使用すると、次のいずれかの形式でデータをメッセージキューに格納できます。
DTS Avro: 保存と送信を容易にするためにデータ構造またはオブジェクトを変換できるデータシリアル化形式。
Shareplex Json: データレプリケーションソフトウェアを使用してソースデータベースからデータを読み取る形式
SharePlexはメッセージキューに格納されます。Canal Json: Canalがソースデータベースの増分データに関するログを解析し、増分データをメッセージキューに送信した後に、データがメッセージキューに格納される形式。
DTSアヴロ
デフォルトのデータ形式はDTS Avroです。 DTS Avroのスキーマ定義に基づいてデータを解析する必要があります。 詳細については、GitHubの「subscribe_example」をご参照ください。
DTS Avro形式では、データ定義言語 (DDL) ステートメントはSTRING型です。
Shareplex Json
パラメーター
パラメーター | 説明 |
| データベース内のトランザクションがコミットされたUTC時刻。 このパラメーターの値は、yyyy-MM-ddTHH:mm:ssZ形式です。 |
| トランザクションをコミットしたユーザーのID。 |
| 操作タイプです。 有効な値: INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、およびUPDATE AFTER。 |
| データベースが特定の時点でコミットするトランザクションのバージョンを識別するシステム変更番号 (SCN) 。 コミットされた各トランザクションには固有のSCNが割り当てられる。 |
| データベース内のレコードを識別するために使用される比較的一意のアドレス値。 |
| トランザクションのID。 |
| トランザクション内の操作のシーケンス番号。 番号は1から始まります。 |
| トランザクション内の操作の総数。 |
| テーブルの名前。 |
| トランザクション内の操作のインデックス ( |
| トランザクションがターゲットデータベースにコミットされた時刻。 |
例:
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CG_NO": null
}
}
Json運河
パラメーター
パラメーター | 説明 |
| データベースの名前。 |
| データベースで操作が実行された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。 |
| 操作のシリアル番号。 |
| 操作がDDL操作かどうかを示します。
|
| フィールドのデータ型。 精密タイプなどの操作パラメータはサポートされていません。 |
| 更新前と更新後のデータ。 2022年3月20日より前に作成されたデータ同期または移行インスタンスの場合、 |
| 主キーの名前。 |
| SQL文。 |
| 変換されたフィールドタイプ。 有効な値は、dataTypeNumberパラメーターの有効な値と同じです。 詳細については、「Kafkaクライアントを使用して追跡データを消費する」のトピックの「MySQLデータ型とdataTypeNumber値のマッピング」を参照してください。 |
| テーブルの名前。 |
| ターゲットデータベースで操作の実行が開始された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。 |
| 操作タイプです。 有効な値: DELETE、UPDATE、およびINSERT。 完全データ同期または移行中、操作タイプはINITに固定されます。 |
| トランザクションを識別するグローバルトランザクション識別子 (GTID) 。 各トランザクションには、グローバルに一意のGTIDが割り当てられます。 |
例
3月20日2022より前に作成され、ソーステーブルのDELETE
ステートメントを使用してkafkaクラスターに同期された変更追跡インスタンスの場合、old
の値はdataであり、data
の値はNULLです。 オープンソースコミュニティとの一貫性を保つために、2022年3月20日から作成または再起動された変更追跡インスタンスのdata
の値はdataで、old
の値はNULLです。
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
{
"database":"dbname", the name of the source database
"es":1600161894000, the time when the data in the source database is written to the binary logs.
"id":58, the offset of the DTS cache.
"isDdl":true, specifies whether to synchronize or migrate DDL operations.
"sql":"eg:createxxx", the DDL statements recorded in the binary logs.
"table":"tablename", the name of the source table.
"ts":1600161894771, the time when DTS writes data to the destination database.
"type":"DDL"
}