Data Transmission Service (DTS) を使用してデータをKafkaクラスターに移行または同期する場合、データがKafkaクラスターに保存される形式を指定できます。 このトピックでは、Kafkaクラスターにデータを格納するために使用できる形式について説明します。 これらのデータ形式の定義に基づいてデータを解析できます。
データ形式
DTSを使用すると、次のいずれかの形式でデータをKafkaクラスターに保存できます。
DTS Avro: 保存と送信を容易にするためにデータ構造またはオブジェクトを変換できるデータシリアル化形式。
Shareplex Json: データレプリケーションソフトウェアSharePlexを使用してソースデータベースから読み取られたデータが格納される形式。
Canal Json: Canalがソースデータベースの増分データに関するログを解析し、その増分データをKafkaクラスターに送信した後、データがKafkaクラスターに格納される形式。
DTSアヴロ
デフォルトのデータ形式はDTS Avroです。 デフォルトでは、DTSを使用してKafkaクラスターに移行または同期されるデータは、DTS Avro形式で保存されます。 DTS Avroのスキーマ定義に基づいてデータを解析する必要があります。 詳細については、GitHubの「subscribe_example」をご参照ください。
DTS Avro形式では、データ定義言語 (DDL) ステートメントはSTRING型です。
Shareplex Json
表 1. Shareplex Json形式に関連するパラメーター
パラメーター | 説明 |
| データベース内のトランザクションがコミットされたUTC時刻。 このパラメーターの値は、yyyy-MM-ddTHH:mm:ssZ形式です。 |
| トランザクションをコミットしたユーザーのID。 |
| 操作タイプです。 有効な値: INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、およびUPDATE AFTER。 |
| データベースが特定の時点でコミットするトランザクションのバージョンを識別するシステム変更番号 (SCN) 。 コミットされた各トランザクションには固有のSCNが割り当てられる。 |
| データベース内のレコードを識別するために使用される比較的一意のアドレス値。 |
| トランザクションのID。 |
| トランザクション内の操作のシーケンス番号。 番号は1から始まります。 |
| トランザクション内の操作の総数。 |
| テーブルの名前。 |
| トランザクション内の操作のインデックス ( |
| トランザクションがターゲットデータベースにコミットされた時刻。 |
例:
挿入されたデータ
{ "meta": { "time": "2017-06-16T14:24:34", "userid": 84, "op": "ins", "scn": "14589063118712", "rowid": "AAATGpAAIAAItcIAAA", "trans": "7.0.411499", "seq": 1, "size": 11, "table": "CL_BIZ1.MIO_LOG", "idx": "1/11", "posttime": "2017-06-16T14:33:52" }, "data": { "MIO_LOG_ID": "32539737" } }
更新されたデータ
{ "meta": { "time": "2017-06-16T15:38:13", "userid": 84, "op": "upd", "table": "CL_BIZ1.MIO_LOG" ... }, "data": { "CNTR_NO": "1171201606" }, "key": { "MIO_LOG_ID": "32537893", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CNTR_NO": "1171201606syui26" } }
削除されたデータ
{ "meta": { "time": "2017-06-16T15:51:35", "userid": 84, "op": "del", }, "data": { "MIO_LOG_ID": "32539739", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CG_NO": null } }
Json運河
表 2. Canal Json形式に関連するパラメーター
パラメーター | 説明 |
| データベースの名前。 |
| データベースで操作が実行された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。 説明 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。 |
| 操作のシリアル番号。 |
| 操作がDDL操作かどうかを示します。
|
| フィールドのデータ型。 説明 精密タイプなどの操作パラメータはサポートされていません。 |
| 更新前と更新後のデータ。 説明 2022年3月20日より前に作成された変更追跡インスタンスの場合、 |
| 主キーの名前。 |
| SQL文。 |
| Canal Json形式で記録された各列のJava SQLタイプ。 詳細については、「SQLタイプフィールド」をご参照ください。 |
| テーブルの名前。 |
| ターゲットデータベースで操作の実行が開始された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。 説明 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。 |
| 操作タイプです。 有効な値: DELETE、UPDATE、およびINSERT。 説明 完全データ同期または移行中、操作タイプはINITに固定されます。 |
| トランザクションを識別するグローバルトランザクション識別子 (GTID) 。 各トランザクションには、グローバルに一意のGTIDが割り当てられます。 |
更新されたデータの例
3月20日2022より前に作成され、ソーステーブルのDELETE
ステートメントを使用してkafkaクラスターに同期された変更追跡インスタンスの場合、old
の値はdataであり、data
の値はNULLです。 オープンソースコミュニティとの一貫性を保つために、2022年3月20日から作成または再起動された変更追跡インスタンスのdata
の値はdataで、old
の値はNULLです。
3月20日より前に作成された変更追跡インスタンス2022
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"shipping_type": "varchar(50)"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
3月20日から作成または再起動された変更追跡インスタンス2022
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"shipping_type": "varchar(50)"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL操作の例
{
"database":"dbname", the name of the source database.
"es":1600161894000, the time when the data in the source database is written to the binary logs.
"id":58, the offset of the DTS cache.
"isDdl":true, specifies whether to synchronize DDL operations.
"sql":"eg:createxxx", the DDL statements recorded in the binary logs.
"table":"tablename", the name of the source table.
"ts":1600161894771, the time when DTS writes data to the destination database.
"type":"DDL"
}