Data Transmission Service (DTS) では、Kafka や RocketMQ などのメッセージキューにデータを同期または移行する際に、ストレージ形式を選択できます。このトピックでは、データを解析するのに役立つデータ形式について説明します。
データストレージ形式
DTS は、メッセージキューに書き込まれるデータに対して、次のストレージ形式をサポートしています:
DTS Avro:データ構造やオブジェクトを、保存や転送が容易な形式に変換するデータシリアル化形式です。
「Shareplex Json」:データレプリケーションソフトウェア SharePlex がソースデータベースからデータを読み取り、メッセージキューにデータを書き込む場合、データは Shareplex Json フォーマットで保存されます。
Canal Json:Canal はデータベースから増分ログを解析し、増分データをメッセージキューに転送します。データは Canal Json 形式で保存されます。
DTS Avro
DTS Avro スキーマ定義に基づいてデータを解析する必要があります。詳細については、「DTS Avro schema definition」および「DTS Avro deserialization example」をご参照ください。
DTS Avro 形式では、DDL 文は String 型です。
Shareplex Json
パラメーターの説明
パラメーター | 説明 |
| データベースでトランザクションがコミットされた時刻。形式は yyyy-MM-ddTHH:mm:ssZ (UTC) です。 |
| トランザクションをコミットしたユーザーの ID。 |
| データ操作タイプ。有効な値は、INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、UPDATE AFTER です。 |
| システム変更番号 (SCN)。データベース内の特定の時刻にコミットされたトランザクションのバージョンを識別します。コミットされた各トランザクションには、一意の SCN が割り当てられます。 |
| データベース内のレコードを特定するために使用される、比較的一意なアドレス値。 |
| トランザクション ID。 |
| トランザクション内での操作の序数。値は 1 から始まります。 |
| トランザクション内の操作の総数。 |
| テーブル名。 |
| トランザクション内での操作のインデックス。形式は |
| トランザクションがターゲットデータベースにコミットされた時刻。 |
例
データの挿入
{
"meta": {
"time": "2017-06-16T14:24:34",
"userid": 84,
"op": "ins",
"scn": "14589063118712",
"rowid": "AAATGpAAIAAItcIAAA",
"trans": "7.0.411499",
"seq": 1,
"size": 11,
"table": "CL_BIZ1.MIO_LOG",
"idx": "1/11",
"posttime": "2017-06-16T14:33:52"
},
"data": {
"MIO_LOG_ID": "32539737"
}
}データの更新
{
"meta": {
"time": "2017-06-16T15:38:13",
"userid": 84,
"op": "upd",
"table": "CL_BIZ1.MIO_LOG"
….
},
"data": {
"CNTR_NO": "1171201606"
},
"key": {
"MIO_LOG_ID": "32537893",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CNTR_TYPE": null,
"CNTR_NO": "1171201606syui26"
}
}データの削除
{
"meta": {
"time": "2017-06-16T15:51:35",
"userid": 84,
"op": "del",
},
"data": {
"MIO_LOG_ID": "32539739",
"PLNMIO_REC_ID": "31557806",
"POL_CODE": null,
"CG_NO": null
}
}Canal Json
「[パーティションキーの更新後のメッセージ配信の分割]」を有効にすると、パーティションキーが変更されたときに、Kafka は DELETE メッセージと INSERT メッセージを配信します。Kafka は、各メッセージのパーティションキーの値に基づいて、それぞれのメッセージのパーティションを選択します。
例: パーティションキー id の値は 1 です。メッセージは partition-1 に配信されます。「[パーティションキーの更新後にメッセージ配信を分割]」を有効にする前後での違いについては、以下のリストで説明します。
有効:ソースデータベースで
UPDATE SET id = 2 WHERE id = 1コマンドを実行すると、id=1のDELETEメッセージがpartition-1に配信され、id=2のINSERTメッセージがpartition-2に配信されます。無効:
partition-1にはUPDATEメッセージが 1 つだけ配信されます。UPDATE操作では、変更前の値に基づいて配信先のパーティションが選択されます。
メトリックの説明
パラメーター | 説明 |
| データベース名。 |
| ソースデータベースで操作が実行された時刻。ミリ秒単位の 13 桁の UNIX タイムスタンプです。 説明
|
| 操作のシリアル番号。 説明 これは、タイムスタンプと内部 DTS オフセットから生成されます。レコードの順序を決定するのに役立ちます。 |
| 操作が DDL 操作であるかどうかを指定します。
|
| フィールドのデータの型。 説明 精度などのデータの型のパラメーターはサポートされていません。 |
| 変更前または変更後のデータ。 説明 2022 年 3 月 20 日より前に作成された同期または移行インスタンスの場合、old の値は変更後のデータ、 |
| プライマリキー名。 |
| SQL 文。 |
| 変換されたフィールドタイプ。値は dataTypeNumber の値と同じです。詳細については、「フィールドタイプと dataTypeNumber 値のマッピング」をご参照ください。 |
| テーブル名。 |
| 操作がターゲットデータベースへのデータ書き込みを開始した時刻。ミリ秒単位の 13 桁の UNIX タイムスタンプです。 説明 検索エンジンを使用して、UNIX タイムスタンプ変換ツールを見つけることができます。 |
| DELETE、UPDATE、INSERT などの操作タイプ。 説明 完全データ同期または移行タスクの場合、値は [INIT] に固定されています。 |
| グローバルトランザクション識別子 (GTID)。GTID はグローバルに一意です。各トランザクションは 1 つの GTID に対応します。 |
例
データの更新
2022 年 3 月 20 日より前に作成された同期または移行インスタンスの場合、ソーステーブルからの DELETE 文が Kafka に同期または移行されると、old フィールドにデータが含まれ、data フィールドは null になります。オープンソースコミュニティとの整合性を保つため、2022 年 3 月 20 日以降に作成または再起動されたインスタンスの場合、data フィールドにデータが含まれ、old フィールドは null になります。
2022 年 3 月 20 日より前に作成された同期または移行インスタンス
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}2022 年 3 月 20 日以降に作成または再起動された同期または移行インスタンス
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint",
"shipping_type": "varchar"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL 操作
{
"database":"dbname", // 同期または移行対象のデータベース名。
"es":1600161894000, // ソースデータがバイナリログに書き込まれた時刻。
"id":58, // DTS キャッシュ内のオフセット。
"isDdl":true, // DDL 文を同期または移行するかどうかを指定します。
"sql":"eg:createxxx", // バイナリログからの DDL 文。
"table":"tablename", // 同期または移行対象のテーブル名。
"ts":1600161894771, // DTS がターゲットにデータを書き込んだ時刻。
"type":"DDL"
}