Data Transmission Service (DTS) を使用してデータをKafkaクラスターに移行または同期する場合、データがKafkaクラスターに保存される形式を指定できます。 このトピックでは、Kafkaクラスターにデータを格納するために使用できる形式について説明します。 これらのデータ形式の定義に基づいてデータを解析できます。
データ形式
DTSを使用すると、次のいずれかの形式でデータをKafkaクラスターに保存できます。
DTS Avro: 保存と送信を容易にするためにデータ構造またはオブジェクトを変換できるデータシリアル化形式。
Shareplex Json: データレプリケーションソフトウェアSharePlexを使用してソースデータベースから読み取られたデータが格納される形式。
Canal Json: Canalがソースデータベースの増分データに関するログを解析し、その増分データをKafkaクラスターに送信した後、データがKafkaクラスターに格納される形式。
DTSアヴロ
デフォルトのデータ形式はDTS Avroです。 デフォルトでは、DTSを使用してKafkaクラスターに移行または同期されるデータは、DTS Avro形式で保存されます。 DTS Avroのスキーマ定義に基づいてデータを解析する必要があります。 詳細については、GitHubの「subscribe_example」をご参照ください。
DTS Avro形式では、DDLステートメントはSTRING型です。
Shareplex Json
表 1. Shareplex Json形式に関連するパラメーター
パラメーター | 説明 |
| データベース内のトランザクションがコミットされたUTC時刻。 このパラメーターの値は、yyyy-MM-ddTHH:mm:ssZ形式です。 |
| トランザクションをコミットしたユーザーのID。 |
| 操作タイプです。 有効な値: INSERT、UPDATE、DELETE、TRUNCATE、DROP COLUMN、UPDATE BEFORE、およびUPDATE AFTER。 |
| データベースが特定の時点でコミットするトランザクションのバージョンを識別するシステム変更番号 (SCN) 。 コミットされた各トランザクションには固有のSCNが割り当てられる。 |
| データベース内のレコードを識別するために使用される比較的一意のアドレス値。 |
| トランザクションのID。 |
| トランザクション内の操作のシーケンス番号。 番号は1から始まります。 |
| トランザクション内の操作の総数。 |
| テーブルの名前。 |
| トランザクション内の操作のインデックス ( |
| トランザクションがターゲットデータベースにコミットされた時刻。 |
例:
挿入されたデータ
{ "meta": { "time": "2017-06-16T14:24:34", "userid": 84, "op": "ins", "scn": "14589063118712", "rowid": "AAATGpAAIAAItcIAAA", "trans": "7.0.411499", "seq": 1, "size": 11, "table": "CL_BIZ1.MIO_LOG", "idx": "1/11", "posttime": "2017-06-16T14:33:52" }, "data": { "MIO_LOG_ID": "32539737" } }
更新されたデータ
{ "meta": { "time": "2017-06-16T15:38:13", "userid": 84, "op": "upd", "table": "CL_BIZ1.MIO_LOG" ... }, "data": { "CNTR_NO": "1171201606" }, "key": { "MIO_LOG_ID": "32537893", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CNTR_NO": "1171201606syui26" } }
削除されたデータ
{ "meta": { "time": "2017-06-16T15:51:35", "userid": 84, "op": "del", }, "data": { "MIO_LOG_ID": "32539739", "PLNMIO_REC_ID": "31557806", "POL_CODE": null, "CNTR_TYPE": null, "CG_NO": null } }
Json運河
表 2. Canal Json形式に関連するパラメーター
パラメーター | 説明 |
| データベースの名前。 |
| データベースで操作が実行された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。 説明 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。 |
| 操作のシリアル番号。 |
| 操作がDDL操作かどうかを示します。
|
| フィールドのデータ型。 |
| 更新前と更新後のデータ。 説明 2022年3月20日より前に作成された変更追跡インスタンスの場合、 |
| 主キーの名前。 |
| SQL文。 |
| 変換されたフィールドタイプ。 たとえば、LONG型はUNSIGNED INTEGERから変換され、BIGDECIMAL型はUNSIGNED LONGから変換されます。 |
| テーブルの名前。 |
| ターゲットデータベースで操作の実行が開始された時刻。 値は13ビットのUNIXタイムスタンプです。 単位:ミリ秒。 説明 検索エンジンを使用して、UNIXタイムスタンプコンバーターを取得できます。 |
| 操作タイプです。 有効な値には、DELETE、UPDATE、およびINSERTが含まれます。 |
| トランザクションを識別するグローバルトランザクション識別子 (GTID) 。 各トランザクションには、グローバルに一意のGTIDが割り当てられます。 |
更新されたデータの例
3月20日より前、2022ソーステーブルのDELETE
ステートメントが同期された後に作成された変更追跡インスタンスの場合、old
の値はdataであり、data
の値はNULLです。 オープンソースコミュニティとの一貫性を保つために、2022年3月20日から作成または再起動された変更追跡インスタンスでは、data
の値はdataであり、old
の値はNULLです。
3月20日より前に作成された変更トラッキングインスタンス2022
{
"old": [
{
"shipping_type": "aaa"
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"shipping_type": "varchar(50)"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
3月20日から作成または再起動された変更追跡インスタンス2022
{
"data": [
{
"id": "500000287",
"shipping_type": null
}
],
"database": "dbname",
"es": 1600161894000,
"id": 58,
"isDdl": false,
"mysqlType": {
"id": "bigint(20)",
"shipping_type": "varchar(50)"
},
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"shipping_type": 12
},
"table": "tablename",
"ts": 1600161894771,
"type": "DELETE"
}
DDL操作の例
{
"database":"dbname", the name of the source database.
"es":1600161894000, the time when the data in the source database is written to the binary logs.
"id":58, the offset of the DTS cache.
"isDdl":true, specifies whether to synchronize DDL operations.
"sql":"eg:createxxx", the DDL statements recorded in the binary logs.
"table":"tablename", the name of the source table.
"ts":1600161894771, the time when DTS writes data to the destination database.
"type":"DDL"
}