このトピックでは、data Transmission Service (DTS) を使用して、PolarDB for MySQLクラスターの増分データをDataHubプロジェクトにリアルタイムで同期する方法について説明します。 データを同期した後、Realtime Compute for Apache Flinkなどのビッグデータサービスを使用して、データをリアルタイムで分析できます。
前提条件
PolarDB for MySQLクラスターが作成されました。 詳細については、「Enterprise Editionクラスターの購入」および「サブスクリプションクラスターの購入」をご参照ください。
DataHubが有効化され、同期するデータを受け取るプロジェクトが作成されます。 詳細については、「DataHubの使用を開始する」をご参照ください。 および プロジェクトの管理
PolarDB for MySQLクラスターでバイナリログ機能が有効になっています。 詳細については、「バイナリログの有効化」をご参照ください。
制限事項
DTSは、ソースデータベースの外部キーをターゲットデータベースに同期しません。 したがって、ソースデータベースのカスケードおよび削除操作は、ターゲットデータベースと同期されません。
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
特別なケース | DTSは、CREATE DATABASE IF NOT EXISTS 'test' ステートメントをソースデータベースで実行し、バイナリログファイルの位置を前に移動します。 |
課金
同期タイプ | タスク設定料金 |
スキーマ同期と完全データ同期 | 無料です。 |
増分データ同期 | 有料。 詳細については、「課金の概要」をご参照ください。 |
サポートしている同期トポロジ
一方向の 1 対 1 の同期
一方向の 1 対多の同期
一方向の多対 1 の同期
一方向のカスケード同期
DTSでサポートされている同期トポロジの詳細については、「同期トポロジ」をご参照ください。
同期可能なSQL操作
挿入、更新、および削除
データベースアカウントに必要な権限
ソースPolarDB for MySQLクラスターのデータベースアカウントには、同期するオブジェクトに対する読み取り権限以上が必要です。
手順
この手順は、新しいDTSコンソールに基づいて説明します。 DTSコンソールとデータ管理 (DMS) コンソールのDTSモジュールの間で操作に不一致が発生した場合は、DMSコンソールが優先されます。
[データ同期タスク] ページに移動します。
データ管理 (DMS) コンソールにログインします。
上部のナビゲーションバーで、DTSをクリックします。
左側のナビゲーションウィンドウで、を選択します。
説明操作は、DMSコンソールのモードとレイアウトによって異なります。 詳細については、「シンプルモード」および「DMSコンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
新しいDTSコンソールのデータ同期タスクページに行くこともできます。
データ同期タスクの右側で、データ同期インスタンスが存在するリージョンを選択します。
説明新しいDTSコンソールを使用する場合は、上部のナビゲーションバーでデータ同期インスタンスが存在するリージョンを選択する必要があります。
[タスクの作成] をクリックします。 [データ同期タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。
セクション
パラメーター
説明
非該当
タスク名
DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。
ソースデータベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
移行元ディスクのタイプを設定します。 [PolarDB for MySQL] を選択します。
アクセス方法
ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。
インスタンスリージョン
ソースPolarDB for MySQLクラスターが存在するリージョン。
Alibaba Cloudアカウント全体でのデータの複製
Alibaba Cloudアカウント間でデータを同期するかどうかを指定します。 この例では、[いいえ] が選択されています。
PolarDBクラスターID
ソースPolarDB for MySQLクラスターのID。
データベースアカウント
ソースPolarDB for MySQLクラスターのデータベースアカウント。 アカウントに必要な権限の詳細については、このトピックの「データベースアカウントに必要な権限」をご参照ください。
データベースパスワード
データベースインスタンスへのアクセスに使用されるパスワード。
宛先データベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
ターゲットデータベースのタイプ。 [DataHub] を選択します。
アクセス方法
ターゲットデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。
インスタンスリージョン
ターゲットDataHubプロジェクトが存在するリージョン。
プロジェクト
データの同期先のDataHubプロジェクト。
ページの下部で、[接続をテストして続行] をクリックします。
ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLインスタンスやApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 データベースが複数のECSインスタンスにデプロイされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」トピックの「DTSサーバーのCIDRブロック」セクションをご参照ください。
警告DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。
同期するオブジェクトと詳細設定を設定します。
パラメーター
説明
同期タイプ
デフォルトでは、増分データ同期が選択されています。 スキーマ同期のみ選択できます。 [フルデータ同期] は選択できません。
説明初期スキーマ同期中に、DTSは、ソースデータベースのテーブルなど、選択したオブジェクトのスキーマをターゲットDataHubプロジェクトに同期します。
競合テーブルの処理モード
エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。
説明ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。
エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。
警告エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:
完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。
増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。
ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。
追加列の命名規則
DTSがデータをDataHubプロジェクトに同期すると、DTSはターゲットトピックに列を追加します。 追加の列の名前がターゲットトピックの既存の列の名前と同じである場合、データ同期タスクは失敗します。 [はい] または [いいえ] を選択して、ビジネス要件に基づいて追加の列の新しい命名規則を有効にするかどうかを指定できます。
警告このパラメーターを設定する前に、追加の列にターゲットトピックの既存の列と名前が競合しているかどうかを確認します。 そうしないと、データ同期タスクが失敗したり、データが失われたりします。 詳細については、「追加の列の命名規則の変更」トピックの「追加の列の命名規則」セクションをご参照ください。
宛先インスタンスでのオブジェクト名の大文字化
ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。
ソースオブジェクト
[ソースオブジェクト] セクションから1つ以上のオブジェクトを選択し、アイコンをクリックして [選択済みオブジェクト] セクションにオブジェクトを追加します。
説明同期するオブジェクトとしてテーブルまたはデータベースを選択できます。
[選択済みオブジェクト]
同期先のインスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名のマップ」トピックの「単一オブジェクトの名前のマップ」セクションをご参照ください。
一度に複数のオブジェクトの名前を変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名のマップ」トピックの「一度に複数のオブジェクト名をマップする」セクションをご参照ください。
[次へ:詳細設定] をクリックして詳細設定を構成します。
パラメーター
説明
Set Alerts
データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:
No: アラートを有効にしません。
Yes: アラートを設定します。 この場合、アラートしきい値と アラート通知設定 詳細については、「モニタリングとアラートの設定」トピックの「DTSタスクを作成するときのモニタリングとアラートの設定」をご参照ください。
失敗した接続のリトライ時間範囲の指定
失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。
説明ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。
DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。
ETLの設定
抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 ビジネス要件に基づいて、[はい] または [いいえ] を選択します。 [はい] を選択した場合、コードエディターにドメイン固有言語 (DSL) ステートメントを入力する必要があります。 詳細については、「データ移行または同期タスクでのETLの設定」をご参照ください。
オプション: [選択したオブジェクト] セクションで、同期するトピックの名前の上にポインターを移動し、右クリックします。 表示されるダイアログボックスで、テーブルまたはデータベースの名前を変更し、ビジネス要件に基づいてパーティション分割のシャードキーを設定します。
タスク設定を保存し、事前チェックを実行します。
関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。
事前チェック中にアイテムに対してアラートがトリガーされた場合:
アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
成功率が100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。
購入ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
Billing Method
サブスクリプション: データ同期インスタンスの作成時にサブスクリプションの料金を支払います。 サブスクリプションの課金方法は、長期使用の場合、従量課金の課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは1時間ごとに課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。
リソースグループの設定
データ同期インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「リソース管理とは 」をご参照ください。
インスタンスクラス
DTSは、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプションの課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1〜9か月、1年、2年、3年、または5年とすることができる。
説明このパラメーターは、サブスクリプション の課金方法を選択した場合にのみ使用できます。
データ伝送サービス (従量課金) サービス規約を読んで選択します。
[購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進行状況を確認できます。
DataHubトピックのスキーマ
DTSが増分データをDataHubトピックに同期すると、DTSはトピック内のメタデータを格納する列を追加します。 次の図は、DataHubトピックのスキーマを示しています。
この例では、id
、name
、およびaddress
はデータフィールドです。 DTSは、以前のバージョンの追加の列の命名規則が使用されているため、ソースデータベースからターゲットデータベースに同期される元のデータフィールドを含むデータフィールドにdts_
プレフィックスを追加します。 追加の列に新しい命名規則を使用する場合、DTSは、ソースデータベースからターゲットデータベースに同期される元のデータフィールドにプレフィックスを追加しません。
次の表に、DataHubトピックの追加の列を示します。
前の追加の列名 | 新しい追加列名 | タイプ | 説明 |
|
| String | 増分ログエントリの一意のID。 説明
|
|
| String | 操作タイプ。 有効な値:
|
|
| String | データベースのサーバーID。 |
|
| String | データベース名。 |
|
| String | <td class="en-UStry align-left colsep-1 rowsep-1">テーブル名。</td> |
|
| String | UTCに表示される操作のタイムスタンプ。 ログファイルのタイムスタンプでもあります。 |
|
| String | 列の値が更新前の値かどうかを示します。 有効値: YとN。 |
|
| String | 列の値が更新後の値であるかどうかを示します。 有効値: YとN。 |
dts_before_flagおよびdts_after_flagフィールドに関する追加情報
増分ログエントリのdts_before_flag
およびdts_after_flag
フィールドの値は、操作の種類によって異なります。
INSERT
INSERT操作の場合、列の値は新しく挿入されたレコードの値 (更新後の値) です。
dts_before_flag
フィールドの値はNであり、dts_after_flag
フィールドの値はYである。UPDATE
DTSは、UPDATE操作用に2つの増分ログエントリを生成します。 2つの増分ログエントリは、
dts_record_id
、dts_operation_flag
、およびdts_utc_timestamp
フィールドの値が同じです。第1のログエントリは、更新前の値を記録する。 したがって、
dts_before_flag
フィールドの値はYであり、dts_after_flag
フィールドの値はNである。第2のログエントリは、更新後の値を記録する。 したがって、dts_before_flag
フィールドの値はNであり、dts_after_flag
フィールドの値はYである。DELETE
DELETE操作の場合、列の値は削除されたレコードの値 (更新前の値) です。
dts_before_flag
フィールドの値はYであり、dts_after_flag
フィールドの値はNである。
次のステップ
データ同期タスクを設定した後、Alibaba Cloud Realtime Compute for Apache Flinkを使用して、DataHubプロジェクトに同期されたデータを分析できます。 詳細については、「Alibaba Cloud Realtime Compute for Apache Flinkとは 」をご参照ください。