すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:メッセージキューのデータストレージ形式

最終更新日:Feb 07, 2026

Data Transmission Service (DTS) では、Kafka や RocketMQ などのメッセージキューにデータを同期または移行する際に、ストレージ形式を選択できます。このトピックでは、データを解析するのに役立つデータ形式について説明します。

データストレージ形式

DTS は、メッセージキューに書き込まれるデータに対して、次のストレージ形式をサポートしています:

  • DTS Avro:データ構造やオブジェクトを、保存や転送が容易な形式に変換するデータシリアル化形式です。

  • Shareplex Json」:データレプリケーションソフトウェア SharePlex がソースデータベースからデータを読み取り、メッセージキューにデータを書き込む場合、データは Shareplex Json フォーマットで保存されます。

  • Canal JsonCanal はデータベースから増分ログを解析し、増分データをメッセージキューに転送します。データは Canal Json 形式で保存されます。

DTS Avro

DTS Avro スキーマ定義に基づいてデータを解析する必要があります。詳細については、「DTS Avro schema definition」および「DTS Avro deserialization example」をご参照ください。

説明

DTS Avro 形式では、DDL 文は String 型です。

Shareplex Json

パラメーターの説明

パラメーター

説明

time

データベースでトランザクションがコミットされた時刻。形式は yyyy-MM-ddTHH:mm:ssZ (UTC) です。

userid

トランザクションをコミットしたユーザーの ID。

op

データ操作タイプ。有効な値は、INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER です。

scn

システム変更番号 (SCN)。データベース内の特定の時刻にコミットされたトランザクションのバージョンを識別します。コミットされた各トランザクションには、一意の SCN が割り当てられます。

rowid

データベース内のレコードを特定するために使用される、比較的一意なアドレス値。

trans

トランザクション ID。

seq

トランザクション内での操作の序数。値は 1 から始まります。

size

トランザクション内の操作の総数。

table

テーブル名。

idx

トランザクション内での操作のインデックス。形式は seq/size です。たとえば、1/11 は、11 個の操作を含むトランザクションの最初の操作であることを示します。

posttime

トランザクションがターゲットデータベースにコミットされた時刻。

データの挿入

{
    "meta": {
        "time": "2017-06-16T14:24:34", 
        "userid": 84,                                    
        "op": "ins",                                   
          "scn": "14589063118712",                  
          "rowid": "AAATGpAAIAAItcIAAA",      
        "trans": "7.0.411499",                 
        "seq": 1,                                          
        "size": 11,                                         
        "table": "CL_BIZ1.MIO_LOG",       
          "idx": "1/11",                                       
        "posttime": "2017-06-16T14:33:52"
    },
    "data": {
        "MIO_LOG_ID": "32539737"
     }
}

データの更新

{
    "meta": {
        "time": "2017-06-16T15:38:13",
        "userid": 84,
        "op": "upd",                             
        "table": "CL_BIZ1.MIO_LOG"
        ….
    },
    "data": {                                          
        "CNTR_NO": "1171201606"
    },
    "key": {                                            
        "MIO_LOG_ID": "32537893",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CNTR_NO": "1171201606syui26"
    }
}

データの削除

{
    "meta": {
        "time": "2017-06-16T15:51:35",
        "userid": 84,
        "op": "del",                      
     },
    "data": {                                    
        "MIO_LOG_ID": "32539739",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CG_NO": null
     }
}

Canal Json

説明

[パーティションキーの更新後のメッセージ配信の分割]」を有効にすると、パーティションキーが変更されたときに、Kafka は DELETE メッセージと INSERT メッセージを配信します。Kafka は、各メッセージのパーティションキーの値に基づいて、それぞれのメッセージのパーティションを選択します。

例: パーティションキー id の値は 1 です。メッセージは partition-1 に配信されます。「[パーティションキーの更新後にメッセージ配信を分割]」を有効にする前後での違いについては、以下のリストで説明します。

  • 有効:ソースデータベースで UPDATE SET id = 2 WHERE id = 1 コマンドを実行すると、id=1DELETE メッセージが partition-1 に配信され、id=2INSERT メッセージが partition-2 に配信されます。

  • 無効:partition-1 には UPDATE メッセージが 1 つだけ配信されます。UPDATE 操作では、変更前の値に基づいて配信先のパーティションが選択されます。

メトリックの説明

パラメーター

説明

database

データベース名。

es

ソースデータベースで操作が実行された時刻。ミリ秒単位の 13 桁の UNIX タイムスタンプです。

説明
  • 実際の実行時間は秒単位です。単位をミリ秒に変換するために、3 つのゼロが追加されます。

  • 検索エンジンを使用して、UNIX タイムスタンプ変換ツールを見つけることができます。

id

操作のシリアル番号。

説明

これは、タイムスタンプと内部 DTS オフセットから生成されます。レコードの順序を決定するのに役立ちます。

isDdl

操作が DDL 操作であるかどうかを指定します。

  • true:はい。

  • false:いいえ。

mysqlType

フィールドのデータの型。

説明

精度などのデータの型のパラメーターはサポートされていません。

old および data

変更前または変更後のデータ。

説明

2022 年 3 月 20 日より前に作成された同期または移行インスタンスの場合、old の値は変更後のデータ、data の値は変更前のデータです。デフォルトでは、old の値には変更された列だけでなく、すべての列が含まれます。オープンソースコミュニティとの整合性を保つため、2022 年 3 月 20 日以降に作成または再起動されたインスタンスの場合、data の値は変更後のデータ、old の値は変更前のデータになります。

pkNames

プライマリキー名。

sql

SQL 文。

sqlType

変換されたフィールドタイプ。値は dataTypeNumber の値と同じです。詳細については、「フィールドタイプと dataTypeNumber 値のマッピング」をご参照ください。

table

テーブル名。

ts

操作がターゲットデータベースへのデータ書き込みを開始した時刻。ミリ秒単位の 13 桁の UNIX タイムスタンプです。

説明

検索エンジンを使用して、UNIX タイムスタンプ変換ツールを見つけることができます。

type

DELETE、UPDATE、INSERT などの操作タイプ。

説明

完全データ同期または移行タスクの場合、値は [INIT] に固定されています。

gtid

グローバルトランザクション識別子 (GTID)。GTID はグローバルに一意です。各トランザクションは 1 つの GTID に対応します。

データの更新

説明

2022 年 3 月 20 日より前に作成された同期または移行インスタンスの場合、ソーステーブルからの DELETE 文が Kafka に同期または移行されると、old フィールドにデータが含まれ、data フィールドは null になります。オープンソースコミュニティとの整合性を保つため、2022 年 3 月 20 日以降に作成または再起動されたインスタンスの場合、data フィールドにデータが含まれ、old フィールドは null になります。

2022 年 3 月 20 日より前に作成された同期または移行インスタンス

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

2022 年 3 月 20 日以降に作成または再起動された同期または移行インスタンス

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL 操作

{
    "database":"dbname", // 同期または移行対象のデータベース名。
    "es":1600161894000, // ソースデータがバイナリログに書き込まれた時刻。
    "id":58, // DTS キャッシュ内のオフセット。
    "isDdl":true, // DDL 文を同期または移行するかどうかを指定します。
    "sql":"eg:createxxx", // バイナリログからの DDL 文。
    "table":"tablename", // 同期または移行対象のテーブル名。
    "ts":1600161894771, // DTS がターゲットにデータを書き込んだ時刻。
    "type":"DDL"
}