すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:自己管理型TiDBデータベースからAnalyticDB for MySQLクラスターへのデータの同期

最終更新日:Oct 18, 2024

このトピックでは、data Transmission Service (DTS) を使用して、自己管理型TiDBデータベースからAnalyticDB for MySQLクラスターにデータを同期する方法について説明します。 この例では、Pump、Drainer、およびKafkaクラスターがデプロイされています。

前提条件

  • AnalyticDB for MySQL クラスターが作成されます。 詳細については、「クラスターの作成」をご参照ください。

  • AnalyticDB for MySQLクラスターには十分なストレージがあります。

背景情報

Synchronize incremental data from TiDB

TiDBデータベースのバイナリログ形式と実装メカニズムは、MySQLデータベースのものとは異なります。 データを同期し、ソースTiDBデータベースへの変更を最小限に抑えるには、Pump、Drainer、およびKafkaクラスターをデプロイする必要があります。

Pumpは、TiDBで生成されたバイナリログファイルをリアルタイムで記録し、バイナリログファイルをDrainerに送信します。 Drainerは、バイナリログファイルを下流のKafkaクラスターに書き込みます。 増分データ同期中、DTSはKafkaクラスターからデータを取得し、そのデータを同期先データベースにリアルタイムで同期します。 たとえば、DTSはデータをAnalyticDB For MySQLクラスターに同期できます。

使用上の注意

  • DTSは、最初の完全データ同期中に、ソースRDSインスタンスとターゲットRDSインスタンスの読み取りおよび書き込みリソースを使用します。 これにより、RDSインスタンスの負荷が増加する可能性があります。 インスタンスのパフォーマンスが悪い場合、仕様が低い場合、またはデータ量が多い場合、データベースサービスが利用できなくなる可能性があります。 たとえば、ソースRDSインスタンスで多数の低速SQLクエリが実行されている場合、テーブルにプライマリキーがない場合、またはターゲットRDSインスタンスでデッドロックが発生する場合、DTSは大量の読み取りおよび書き込みリソースを占有します。 データ同期の前に、ソースRDSインスタンスとターゲットRDSインスタンスのパフォーマンスに対するデータ同期の影響を評価します。 オフピーク時にデータを同期することを推奨します。 たとえば、ソースRDSインスタンスとターゲットRDSインスタンスのCPU使用率が30% 未満の場合にデータを同期できます。

  • データ同期中に必要なオブジェクトに対してDDL操作を実行するために、gh-ostまたはpt-online-schema-changeを使用しないことを推奨します。 そうしないと、データの同期に失敗する可能性があります。

  • AnalyticDB for MySQLの制限により、AnalyticDB for MySQLのノードのディスク領域使用率 クラスターが80% を超えると、クラスターはロックされます。 同期するオブジェクトに基づいて、必要なディスク容量を見積もることを推奨します。 移行先クラスターに十分なストレージがあることを確認します。

  • プレフィックスインデックスは同期できません。 ソースデータベースにプレフィックスインデックスが含まれている場合、データの同期に失敗する可能性があります。

課金

同期タイプタスク設定料金
スキーマ同期と完全データ同期無料です。
増分データ同期有料。 詳細については、「課金の概要」をご参照ください。

同期可能なSQL操作

  • DDL操作: CREATE TABLE、DROP TABLE、RENAME TABLE、TRUNCATE TABLE、ADD COLUMN、およびDROP COLUMN

  • DML操作: INSERT、UPDATE、およびDELETE

説明

データ同期中にソーステーブルのフィールドのデータ型が変更された場合、エラーメッセージが報告され、データ同期タスクが中断されます。 この問題を解決する方法の詳細については、「フィールドタイプの変更により発生する同期障害のトラブルシューティング」トピックの「フィールドタイプの変更により発生する同期障害のトラブルシューティング」をご参照ください。

準備

説明

ソースデータベースがデプロイされているサーバーは、Pump、Drainer、およびKafkaクラスターがデプロイされているサーバーと同じ内部ネットワークにある必要があります。 これにより、データ同期に対するネットワーク待ち時間の影響が最小限に抑えられます。

  1. ポンプと水切りを展開します。 詳細については、「TiDB Binlogクラスターのデプロイ」をご参照ください。

  2. Drainerの設定ファイルを変更し、Drainerからデータを受信するKafkaクラスターを指定します。 詳細については、「Binlog Slave Clientユーザーガイド」をご参照ください。

  3. 次のいずれかの方法を使用してKafkaクラスターをデプロイします。

    • 自己管理Kafkaクラスターをデプロイします。 詳細については、Apache Kafka公式Webサイトをご覧ください。

      警告

      Kafkaブローカーのmessage.max.bytesおよびreplica.fetch.max.bytesパラメーターをより大きな値に設定することを推奨します。 また、Kafkaコンシューマーのfetch.message.max.bytesパラメーターをより大きな値に設定することを推奨します。 これらの設定により、KafkaクラスターはTiDBで生成されたバイナリログファイルを受け取ることができます。 詳細については、「Kafka 2.5ドキュメント」をご参照ください。

    • Message Queue for Apache Kafkaインスタンスを購入してデプロイします。 詳細については、「Apache KafkaのMessage Queueのクイックスタート」をご参照ください。

      説明

      Message Queue for Apache Kafkaインスタンスは、ソースデータベースサーバーと同じ仮想プライベートクラウド (VPC) にデプロイする必要があります。 これにより、信頼性の高いデータ送信が保証され、データ同期に対するネットワーク待ち時間の影響が最小限に抑えられます。

  4. セルフマネージドKafkaクラスターまたはApache KafkaインスタンスのMessage Queueでトピックを作成します。

  5. DTSサーバーのCIDRブロックをTiDBデータベースのホワイトリストに追加します。 詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。

手順

  1. データ同期インスタンスを購入します。 詳細については、「DTSインスタンスの購入」をご参照ください。

    説明

    購入ページで、ソースインスタンスをTiDBに設定し、デスティネーションインスタンスをAnalyticDB for MySQLに設定します。

  2. にログインします。 DTSコンソール

    説明

    データ管理 (DMS) コンソールにリダイレクトされている場合は、にあるoldアイコンをクリックして、以前のバージョンのDTSコンソールに移動しimage

  3. 左側のナビゲーションウィンドウで、データ同期.

  4. の上部にデータ同期タスクページで、データ同期タスクが作成されているリージョンを選択します。

  5. データ同期タスクを見つけて、タスクの設定[アクション] 列に表示されます。

  6. ソースインスタンスとターゲットインスタンスを設定します。

    1. タスク名とソースデータベースを設定します。

      Configure the task name and the source database

      パラメーター

      説明

      同期タスク名

      DTSが自動的に生成するタスク名。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を使用する必要はありません。

      インスタンスタイプ

      ソースデータベースのアクセス方法。 この例では、ECSインスタンスのユーザー作成データベースが選択されています。

      説明

      他のインスタンスタイプを選択した場合、自己管理データベースのネットワーク環境をデプロイする必要があります。 詳細については、「準備の概要」をご参照ください。

      インスタンスリージョン

      購入ページで選択したソースリージョン。 このパラメーターの値は変更できません。

      データベースエンジン

      このパラメーターの値はTiDBに設定されており、変更することはできません。

      ポート番号

      ソースTiDBデータベースのサービスポート番号。 デフォルト値: 4000

      データベースアカウント

      ソースTiDBデータベースのアカウント。 アカウントには、同期するオブジェクトに対するSELECT権限とSHOW VIEW権限が必要です。 詳細については、「特権管理」をご参照ください。

      データベースパスワード

      データベースアカウントのパスワードを設定します。

      Kafkaクラスタタイプ

      Kafkaクラスターのアクセス方法。 この例では、ECSインスタンスのユーザー作成データベースが選択されています。 Kafkaクラスターが他の方法で接続されている場合は、Kafkaクラスターのネットワーク環境をデプロイする必要があります。 詳細については、「準備の概要」をご参照ください。

      説明

      Kafka Cluster TypeパラメーターにApache KafkaのMessage Queueを選択することはできません。 Apache KafkaインスタンスにMessage Queueをデプロイする場合は、[Express Connect、VPN Gateway、またはSmart Access Gateway経由で接続されたユーザー作成データベース] を選択する必要があります。 次に、Apache KafkaインスタンスのMessage Queueが属するVPCを選択する必要があります。

      インスタンスリージョン

      このパラメーターの値は、ソースデータベースのリージョンと同じであり、変更することはできません。

      ECS インスタンス ID

      自己管理型KafkaクラスターをホストするElastic Compute Service (ECS) インスタンスのID。

      カフカポート番号

      自己管理Kafkaクラスターのサービスポート番号。 デフォルト値: 9092

      Kafkaクラスターアカウント

      Kafkaクラスターへのログインに使用されるユーザー名。 Kafkaクラスターで認証が有効になっていない場合は、ユーザー名を入力する必要はありません。

      Kafkaクラスターパスワード

      ユーザー名に対応するパスワード。 Kafkaクラスターで認証が有効になっていない場合は、パスワードを入力する必要はありません。

      トピック

      [トピックリストの取得] をクリックし、ドロップダウンリストからトピック名を選択します。

      Kafkaバージョン

      自己管理Kafkaクラスターのバージョン。

      Kafkaクラスター暗号化

      ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。

    2. ターゲットデータベースを設定します。

      Configure the destination database

      パラメーター

      説明

      インスタンスタイプ

      このパラメーターの値はAnalyticDBに設定されており、変更することはできません。

      インスタンスリージョン

      購入ページで選択したターゲットリージョン。 このパラメーターの値は変更できません。

      Version

      このパラメーターの値は3.0に設定されており、変更できません。

      データベース

      移行先のAnalyticDB for MySQLクラスターのID。

      データベースアカウント

      ターゲットAnalyticDB for MySQLクラスターのデータベースアカウント。 アカウントには、ターゲットデータベースに対する読み取りおよび書き込み権限が必要です。 詳細については、「データベースアカウントの作成」をご参照ください。

      データベースパスワード

      データベースアカウントのパスワードを設定します。

  7. ページの右下隅にあるをクリックします。ホワイトリストと次への設定.

    、ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのIPアドレスホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 自己管理データベースが複数のECSインスタンスでホストされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのIPアドレスホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。

    警告

    DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。

  8. 同期ポリシーと同期するオブジェクトを選択します。

    パラメータまたは設定

    説明

    初期同期タイプの選択

    多くの場合、[スキーマ同期初期化][完全データ同期初期化 ] 両方を選択する必要があります。 事前チェックが完了すると、DTSは必要なオブジェクトのスキーマとデータをソースインスタンスからターゲットクラスターに同期します。 スキーマとデータは、後続の増分同期のベースラインとなります。

    競合するテーブルの処理モードを選択する

    • 事前チェックエラーとレポートエラー: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 移行先データベースに、移行元データベースのテーブルと同じ名前のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。

      説明

      オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 ソースデータベースとターゲットデータベースに同じテーブル名が含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、この機能を使用できます。 詳細については、「同期するオブジェクトの名前変更」をご参照ください。

    • エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。

      警告

      [エラーを無視して続行] を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

      • ソースデータベースとターゲットデータベースのスキーマが同じである場合、DTSは、ターゲットデータベースのデータレコードと同じ主キーを持つデータレコードを同期しません。

      • ソースデータベースとターゲットデータベースのスキーマが異なる場合、初期データ同期が失敗する可能性があります。 この場合、特定の列のみが同期されるか、データ同期タスクが失敗します。

    テーブルをマージするかどうかの指定

    • [はい] を選択した場合、DTSはデータソースを格納する各テーブルに __dts_data_source列を追加します。 この場合、DDL操作は同期できません。

    • デフォルトでは [いいえ] が選択されています。 この場合、DDL動作を同期させることができる。

    説明

    このパラメーターを [はい] に設定すると、タスクで選択したすべてのソーステーブルがターゲットテーブルにマージされます。 特定のテーブルのデータソース列のみをマージするには、2つのデータ同期タスクを作成できます。

    同期する操作タイプを選択する

    ビジネス要件に基づいて、同期する操作の種類を選択します。 すべての操作タイプがデフォルトで選択されています。 詳細については、「同期可能なSQL操作」をご参照ください。

    同期するオブジェクトを選択する

    [使用可能] セクションから1つ以上のオブジェクトを選択し、Rightwards arrowアイコンをクリックして、オブジェクトを [選択済み] セクションに追加します。

    同期するオブジェクトとしてテーブルまたはデータベースを選択できます。

    説明
    • 同期するオブジェクトとしてデータベースを選択した場合、データベース内のすべてのスキーマ変更が同期先データベースに同期されます。

    • 同期するオブジェクトとしてテーブルを選択した場合、テーブルに対して実行されたADD COLUMN操作のみがターゲットデータベースに同期されます。

    • デフォルトでは、オブジェクトがターゲットクラスターに同期された後、オブジェクトの名前は変更されません。 オブジェクト名マッピング機能を使用して、同期先クラスターに同期されるオブジェクトの名前を変更できます。 詳細については、「同期するオブジェクトの名前変更」をご参照ください。

    データベースとテーブルの名前変更

    オブジェクト名マッピング機能を使用して、ターゲットインスタンスに同期されるオブジェクトの名前を変更できます。 詳細は、オブジェクト名のマッピングをご参照ください。

    DMSがDDL操作を実行するときの一時テーブルのレプリケート

    DMSを使用してソースデータベースでオンラインDDL操作を実行する場合、オンラインDDL操作によって生成された一時テーブルを同期するかどうかを指定できます。

    • Yes: DTSは、オンラインDDL操作によって生成された一時テーブルのデータを同期します。

      説明

      オンラインDDL操作が大量のデータを生成する場合、データ同期タスクが遅延する可能性があります。

    • No: DTSは、オンラインDDL操作によって生成された一時テーブルのデータを同期しません。 ソースデータベースの元のDDLデータのみが同期されます。

      説明

      [いいえ] を選択すると、ターゲットデータベースのテーブルがロックされる可能性があります。

    失敗した接続の再試行時間

    既定では、DTSがソースデータベースまたはターゲットデータベースへの接続に失敗した場合、DTSは次の720分 (12時間) 以内に再試行します。 必要に応じて再試行時間を指定できます。 DTSが指定された時間内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    説明

    DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 ビジネスニーズに基づいて再試行時間を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

  9. ページの右下隅にあるをクリックします。次へ.

  10. ターゲットデータベースに同期するテーブルのタイプを指定します。

    Specify a type for the tables that you want to synchronize to the destination database

    説明

    [初期スキーマ同期] を選択した後、同期先AnalyticDB for MySQLクラスターに同期するテーブルのタイププライマリキー列、およびパーティションキー列を指定する必要があります。 詳細については、次をご参照ください: テーブルを作成します。

  11. ページの右下隅にあるをクリックします。事前チェック.

    説明
    • データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にある提示アイコンをクリックして詳細を表示できます。

      • 詳細に基づいて問題をトラブルシューティングした後、新しい事前チェックを開始します。

      • 問題をトラブルシューティングする必要がない場合は、失敗した項目を無視して新しい事前チェックを開始してください。

  12. 次のメッセージが表示されたら、[事前チェック] ダイアログボックスを閉じます。[事前チェックの合格] その後、データ同期タスクが開始されます。

  13. 初期同期が完了し、データ同期タスクが同期状態になるまで待ちます。

    データ同期タスクのステータスは、[同期タスク] ページで確認できます。 View the status of a data synchronization task