すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:Kafkaクラスターのデータ形式

最終更新日:Nov 14, 2024

Data Transmission Service (DTS) を使用してデータをKafkaクラスターに移行または同期する場合、データがKafkaクラスターに保存される形式を指定できます。 このトピックでは、Kafkaクラスターにデータを格納するために使用できる形式について説明します。 これらのデータ形式の定義に基づいてデータを解析できます。

データ形式

DTSを使用すると、次のいずれかの形式でデータをKafkaクラスターに保存できます。

  • DTS Avro: 保存と送信を容易にするためにデータ構造またはオブジェクトを変換できるデータシリアル化形式。

  • Shareplex Json: データレプリケーションソフトウェアSharePlexを使用してソースデータベースから読み取られたデータが格納される形式。

  • Canal Json: Canalがソースデータベースの増分データに関するログを解析し、その増分データをKafkaクラスターに送信した後、データがKafkaクラスターに格納される形式。

DTSアヴロ

デフォルトのデータ形式はDTS Avroです。 デフォルトでは、DTSを使用してKafkaクラスターに移行または同期されるデータは、DTS Avro形式で保存されます。 DTS Avroのスキーマ定義に基づいてデータを解析する必要があります。 詳細については、GitHubの「subscribe_example」をご参照ください。

説明

DTS Avro形式では、DDLステートメントはSTRING型です。

Shareplex Json

表 1. Shareplex Json形式に関連するパラメーター

パラメーター

説明

時間

データベース内のトランザクションがコミットされたUTC時刻。 このパラメーターの値は、yyyy-MM-ddTHH:mm:ssZ形式です。

userid

トランザクションをコミットしたユーザーのID。

op

操作タイプです。 有効な値: INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、およびUPDATE AFTER。

scn

データベースが特定の時点でコミットするトランザクションのバージョンを識別するシステム変更番号 (SCN) 。 コミットされた各トランザクションには固有のSCNが割り当てられる。

rowid

データベース内のレコードを識別するために使用される比較的一意のアドレス値。

トランス

トランザクションのID。

seq

トランザクション内の操作のシーケンス番号。 番号は1から始まります。

サイズ

トランザクション内の操作の総数。

テーブル

テーブルの名前。

idx

トランザクション内の操作のインデックス (seq/size形式) 。 たとえば、1/11は、11個の操作を含むトランザクションで操作のシーケンス番号が1であることを示します。

ポストタイム

トランザクションがターゲットデータベースにコミットされた時刻。

例:

  • 挿入されたデータ

    {
        "meta": {
            "time": "2017-06-16T14:24:34", 
            "userid": 84,                                    
            "op": "ins",                                   
              "scn": "14589063118712",                  
              "rowid": "AAATGpAAIAAItcIAAA",      
            "trans": "7.0.411499",                 
            "seq": 1,                                          
            "size": 11,                                         
            "table": "CL_BIZ1.MIO_LOG",       
              "idx": "1/11",                                       
            "posttime": "2017-06-16T14:33:52"
        },
        "data": {
            "MIO_LOG_ID": "32539737"
         }
    }
  • 更新されたデータ

    {
        "meta": {
            "time": "2017-06-16T15:38:13",
            "userid": 84,
            "op": "upd",                             
            "table": "CL_BIZ1.MIO_LOG"
            ...
        },
        "data": {                                          
            "CNTR_NO": "1171201606"
        },
        "key": {                                            
            "MIO_LOG_ID": "32537893",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CNTR_NO": "1171201606syui26"
        }
    }
  • 削除されたデータ

    {
        "meta": {
            "time": "2017-06-16T15:51:35",
            "userid": 84,
            "op": "del",                      
         },
        "data": {                                    
            "MIO_LOG_ID": "32539739",
            "PLNMIO_REC_ID": "31557806",
            "POL_CODE": null,
            "CNTR_TYPE": null,
            "CG_NO": null
         }
    }

Json運河

表 2. Canal Json形式に関連するパラメーター

パラメーター

説明

データベース

データベースの名前。

es

データベースで操作が実行された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。

説明

検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

id

操作のシリアル番号。

isDdl

操作がDDL操作かどうかを示します。

  • true: 操作はDDL操作です。

  • false: 操作はDDL操作ではありません。

mysqlType

フィールドのデータ型。

古いデータ

更新前と更新後のデータ。

説明

2022年3月20日より前に作成された変更追跡インスタンスの場合、oldの値は更新後のデータであり、dataの値は更新前のデータです。 オープンソースコミュニティとの一貫性を保つために、dataの値は更新後のデータであり、oldの値は2022年3月20日から作成または再起動された変更追跡インスタンスの更新前のデータです。

pkNames

主キーの名前。

sql

SQL文。

sqlType

変換されたフィールドタイプ。 たとえば、LONG型はUNSIGNED INTEGERから変換され、BIGDECIMAL型はUNSIGNED LONGから変換されます。

テーブル

テーブルの名前。

ts

ターゲットデータベースで操作の実行が開始された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。

説明

検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

タイプ

操作タイプです。 有効な値には、DELETE、UPDATE、およびINSERTが含まれます。

gtid

トランザクションを識別するグローバルトランザクション識別子 (GTID) 。 各トランザクションには、グローバルに一意のGTIDが割り当てられます。

更新されたデータの例

説明

3月20日より前、2022ソーステーブルのDELETEステートメントが同期された後に作成された変更追跡インスタンスの場合、oldの値はdataであり、dataの値はNULLです。 オープンソースコミュニティとの一貫性を保つために、2022年3月20日から作成または再起動された変更追跡インスタンスでは、dataの値はdataであり、oldの値はNULLです。

3月20日より前に作成された変更トラッキングインスタンス2022

{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint(20)", 
        "shipping_type": "varchar(50)"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}

3月20日から作成または再起動された変更追跡インスタンス2022

{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint(20)", 
        "shipping_type": "varchar(50)"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            

DDL操作の例

{
    "database":"dbname", the name of the source database.
    "es":1600161894000, the time when the data in the source database is written to the binary logs.
    "id":58, the offset of the DTS cache.
    "isDdl":true, specifies whether to synchronize DDL operations.
    "sql":"eg:createxxx", the DDL statements recorded in the binary logs.
    "table":"tablename", the name of the source table.
    "ts":1600161894771, the time when DTS writes data to the destination database.
    "type":"DDL"
}