このページは機械翻訳によるものです。内容の正確さは保証しておりません。 人力翻訳を依頼する

メッセージキュー内のデータ形式

更新日時2025-02-20 02:19

Data Transmission Service (DTS) を使用して、KafkaクラスターやApsaraMQ for RocketMQなどのメッセージキューにデータを移行または同期する場合、メッセージキューにデータを格納する形式を指定できます。このトピックでは、メッセージキューにデータを格納するために使用できる形式について説明します。 これらのデータ形式の定義に基づいてデータを解析できます。

データ形式

DTSを使用すると、次のいずれかの形式でデータをメッセージキューに格納できます。

  • DTS Avro: 保存と送信を容易にするためにデータ構造またはオブジェクトを変換できるデータシリアル化形式。

  • Shareplex Json: データレプリケーションソフトウェアを使用してソースデータベースからデータを読み取る形式

    SharePlexはメッセージキューに格納されます。

  • Canal Json: Canalがソースデータベースの増分データに関するログを解析し、増分データをメッセージキューに送信した後に、データがメッセージキューに格納される形式。

DTSアヴロ

デフォルトのデータ形式はDTS Avroです。 DTS Avroのスキーマ定義に基づいてデータを解析する必要があります。 詳細については、GitHubの「subscribe_example」をご参照ください。

説明

DTS Avro形式では、データ定義言語 (DDL) ステートメントはSTRING型です。

Shareplex Json

パラメーター

パラメーター

説明

時間

データベース内のトランザクションがコミットされたUTC時刻。 このパラメーターの値は、yyyy-MM-ddTHH:mm:ssZ形式です。

userid

トランザクションをコミットしたユーザーのID。

op

操作タイプです。 有効な値: INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、およびUPDATE AFTER。

scn

データベースが特定の時点でコミットするトランザクションのバージョンを識別するシステム変更番号 (SCN) 。 コミットされた各トランザクションには固有のSCNが割り当てられる。

rowid

データベース内のレコードを識別するために使用される比較的一意のアドレス値。

トランス

トランザクションのID。

seq

トランザクション内の操作のシーケンス番号。 番号は1から始まります。

サイズ

トランザクション内の操作の総数。

テーブル

テーブルの名前。

idx

トランザクション内の操作のインデックス (seq/size形式) 。 たとえば、1/11は、11個の操作を含むトランザクションで操作のシーケンス番号が1であることを示します。

ポストタイム

トランザクションがターゲットデータベースにコミットされた時刻。

例:

挿入されたデータ
更新されたデータ
削除されたデータ
{
    "meta": {
        "time": "2017-06-16T14:24:34", 
        "userid": 84,                                    
        "op": "ins",                                   
          "scn": "14589063118712",                  
          "rowid": "AAATGpAAIAAItcIAAA",      
        "trans": "7.0.411499",                 
        "seq": 1,                                          
        "size": 11,                                         
        "table": "CL_BIZ1.MIO_LOG",       
          "idx": "1/11",                                       
        "posttime": "2017-06-16T14:33:52"
    },
    "data": {
        "MIO_LOG_ID": "32539737"
     }
}
{
    "meta": {
        "time": "2017-06-16T15:38:13",
        "userid": 84,
        "op": "upd",                             
        "table": "CL_BIZ1.MIO_LOG"
        ….
    },
    "data": {                                          
        "CNTR_NO": "1171201606"
    },
    "key": {                                            
        "MIO_LOG_ID": "32537893",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CNTR_NO": "1171201606syui26"
    }
}
{
    "meta": {
        "time": "2017-06-16T15:51:35",
        "userid": 84,
        "op": "del",                      
     },
    "data": {                                    
        "MIO_LOG_ID": "32539739",
        "PLNMIO_REC_ID": "31557806",
        "POL_CODE": null,
        "CNTR_TYPE": null,
        "CG_NO": null
     }
}

Json運河

パラメーター

パラメーター

説明

データベース

データベースの名前。

es

データベースで操作が実行された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。

説明

検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

id

操作のシリアル番号。

isDdl

操作がDDL操作かどうかを示します。

  • true: 操作はDDL操作です。

  • false: 操作はDDL操作ではありません。

mysqlType

フィールドのデータ型。

説明

精密タイプなどの操作パラメータはサポートされていません。

古いデータ

更新前と更新後のデータ。

説明

2022年3月20日より前に作成されたデータ同期または移行インスタンスの場合、oldの値は更新後のデータであり、dataの値は更新前のデータです。 デフォルトでは、すべての列のデータが含まれます。 オープンソースコミュニティとの整合性を保つために、dataの値は更新後のデータであり、oldの値は2022年3月20日から作成または再起動されたデータ同期または移行インスタンスの更新前のデータです。

pkNames

主キーの名前。

sql

SQL文。

sqlType

変換されたフィールドタイプ。 有効な値は、dataTypeNumberパラメーターの有効な値と同じです。 詳細については、「Kafkaクライアントを使用して追跡データを消費する」のトピックの「MySQLデータ型とdataTypeNumber値のマッピング」を参照してください。

テーブル

テーブルの名前。

ts

ターゲットデータベースで操作の実行が開始された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。

説明

検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。

タイプ

操作タイプです。 有効な値: DELETE、UPDATE、およびINSERT。

説明

完全データ同期または移行中、操作タイプはINITに固定されます。

gtid

トランザクションを識別するグローバルトランザクション識別子 (GTID) 。 各トランザクションには、グローバルに一意のGTIDが割り当てられます。

更新されたデータ
DDL操作
説明

3月20日2022より前に作成され、ソーステーブルのDELETEステートメントを使用してkafkaクラスターに同期された変更追跡インスタンスの場合、oldの値はdataであり、dataの値はNULLです。 オープンソースコミュニティとの一貫性を保つために、2022年3月20日から作成または再起動された変更追跡インスタンスのdataの値はdataで、oldの値はNULLです。

D ata同期または移行インスタンスが3月20日より前に作成されました2022
データ同期または移行インスタンスが3月20日から作成または再起動されました。2022
{
    "old": [
        {
            "shipping_type": "aaa"
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
{
    "data": [
        {
            "id": "500000287", 
            "shipping_type": null
        }
    ], 
    "database": "dbname", 
    "es": 1600161894000, 
    "id": 58, 
    "isDdl": false, 
    "mysqlType": {
        "id": "bigint", 
        "shipping_type": "varchar"
    }, 
    "pkNames": [
        "id"
    ], 
    "sql": "", 
    "sqlType": {
        "id": -5, 
        "shipping_type": 12
    }, 
    "table": "tablename", 
    "ts": 1600161894771, 
    "type": "DELETE"
}
            
{
    "database":"dbname", the name of the source database
    "es":1600161894000, the time when the data in the source database is written to the binary logs.
    "id":58, the offset of the DTS cache.
    "isDdl":true, specifies whether to synchronize or migrate DDL operations.
    "sql":"eg:createxxx", the DDL statements recorded in the binary logs.
    "table":"tablename", the name of the source table.
    "ts":1600161894771, the time when DTS writes data to the destination database.
    "type":"DDL"
}

  • 目次 (1, M)
  • データ形式
  • DTSアヴロ
  • Shareplex Json
  • パラメーター
  • 例:
  • Json運河
  • パラメーター
フィードバック