すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:Express Connect、VPN Gateway、またはSmart Access Gatewayを介して接続された自己管理型MySQLデータベースのデータを自己管理型Kafkaクラスターに同期する

最終更新日:Nov 11, 2024

Kafkaは、高スループットと高スケーラビリティを備えた分散メッセージキューサービスです。 Kafkaは、ログ収集、モニタリングデータ集約、ストリーミング処理、オンラインおよびオフライン分析などのビッグデータ分析に広く使用されています。 ビッグデータエコシステムにとって重要です。 このトピックでは、data Transmission Service (DTS) を使用して、Express Connect、VPN Gateway、またはSmart Access Gateway経由で接続された自己管理型MySQLデータベースから自己管理型Kafkaクラスターにデータを同期する方法について説明します。 データ同期機能を使用すると、メッセージ処理機能を拡張できます。

前提条件

  • Kafkaクラスターが作成され、Kafkaのバージョンは0.10.1.0〜2.7.0です。

  • 自己管理型MySQLデータベースのエンジンバージョンは、5.1、5.5、5.6、5.7、または8.0です。

  • 自己管理型MySQLデータベースは、Alibaba Cloud仮想プライベートクラウド (VPC) に接続されています。 詳細については、「CENを使用したオンプレミスデータベースのDTSへの接続」をご参照ください。

使用上の注意

  • DTSは、最初の完全データ同期中に、ソースRDSインスタンスとターゲットRDSインスタンスの読み取りおよび書き込みリソースを使用します。 これにより、RDSインスタンスの負荷が増加する可能性があります。 インスタンスのパフォーマンスが悪い場合、仕様が低い場合、またはデータ量が多い場合、データベースサービスが利用できなくなる可能性があります。 たとえば、ソースRDSインスタンスで多数の低速SQLクエリが実行されている場合、テーブルにプライマリキーがない場合、またはターゲットRDSインスタンスでデッドロックが発生する場合、DTSは大量の読み取りおよび書き込みリソースを占有します。 データ同期の前に、ソースRDSインスタンスとターゲットRDSインスタンスのパフォーマンスに対するデータ同期の影響を評価します。 オフピーク時にデータを同期することを推奨します。 たとえば、ソースRDSインスタンスとターゲットRDSインスタンスのCPU使用率が30% 未満の場合にデータを同期できます。

  • ソースデータベースにはPRIMARY KEYまたはUNIQUE制約が必要で、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。

制限事項

  • 同期するオブジェクトとして選択できるのはテーブルのみです。

  • DTSは、名前が変更されたテーブルのデータをターゲットKafkaクラスターに同期しません。 これは、新しいテーブル名が同期するオブジェクトに含まれていない場合に適用されます。 名前が変更されたテーブルのデータをターゲットKafkaクラスターに同期する場合は、同期するオブジェクトを再選択する必要があります。 詳細については、「データ同期タスクへのオブジェクトの追加」をご参照ください。

サポートしている同期トポロジ

  • 一方向の 1 対 1 の同期

  • 一方向の 1 対多の同期

  • 一方向の多対 1 の同期

  • 一方向のカスケード同期

準備

データ同期タスクを設定する前に、データベースアカウントを作成し、自己管理型MySQLデータベースのバイナリログを設定する必要があります。 詳細については、「自己管理型MySQLデータベースのアカウントの作成とバイナリログの設定」をご参照ください。

手順

  1. データ同期インスタンスを購入します。 詳細については、「DTSインスタンスの購入」をご参照ください。

    説明

    購入ページで、ソースインスタンスをMySQL、宛先インスタンスをKafka、同期トポロジを片道同期に設定します。

  2. DTSコンソールにログインします。

    説明

    Data Management (DMS) コンソールにリダイレクトされている場合は、右下隅にあるoldアイコンをクリックして、以前のバージョンのDTSコンソールに移動します。

  3. 左側のナビゲーションウィンドウで、[データ同期] をクリックします。

  4. [データ同期タスク] ページの上部で、ターゲットインスタンスが存在するリージョンを選択します。

  5. データ同期インスタンスを見つけ、[操作] 列の [タスクの設定] をクリックします。

  6. ソースインスタンスとターゲットインスタンスを設定します。

    Configure the source and destination instances

    セクション

    パラメーター

    説明

    なし

    同期タスク名

    DTSが自動的に生成するタスク名。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を使用する必要はありません。

    ソースインスタンスの詳細

    インスタンスタイプ

    Express Connect、VPN Gateway、またはSmart Access Gateway経由で接続されたユーザー作成データベースを選択します。

    インスタンスリージョン

    購入ページで選択したソースリージョン。 このパラメーターの値は変更できません。

    ピア VPC

    自己管理型MySQLデータベースに接続されているVPCのID。

    データベースエンジン

    このパラメーターはMySQLに設定されており、変更できません。

    IP アドレス

    自己管理型MySQLデータベースのサーバーIPアドレス。

    ポート番号

    自己管理型MySQLデータベースのサービスポート番号。 デフォルト値:3306。

    データベースアカウント

    自己管理型MySQLデータベースのアカウント。 アカウントには、必要なオブジェクトに対するSELECT権限と、REPLICATION CLIENT、REPLICATION SLAVE、およびSHOW VIEW権限が必要です。

    データベースパスワード

    データベースアカウントのパスワードを設定します。

    ターゲットインスタンスの詳細

    インスタンスタイプ

    Kafkaクラスターのインスタンスタイプ。 この例では、このパラメーターに [ECSインスタンスのユーザー作成データベース] が選択されています。

    説明

    他のインスタンスタイプを選択した場合、Kafkaクラスターのネットワーク環境をデプロイする必要があります。 詳細については、「準備の概要」をご参照ください。

    インスタンスリージョン

    購入ページで選択したターゲットリージョン。 このパラメーターの値は変更できません。

    ECS インスタンス ID

    KafkaクラスターがデプロイされているECS (Elastic Compute Service) インスタンスのID。

    データベースエンジン

    Kafkaを選択します。

    ポート番号

    Kafkaクラスターのサービスポート番号。 デフォルト値: 9092

    データベースアカウント

    Kafkaクラスターへのログインに使用されるユーザー名。 Kafkaクラスターで認証が有効になっていない場合は、ユーザー名を入力する必要はありません。

    データベースパスワード

    ユーザー名のパスワード。 Kafkaクラスターで認証が有効になっていない場合は、パスワードを入力する必要はありません。

    トピック

    [トピックリストの取得] をクリックし、ドロップダウンリストからトピック名を選択します。

    Kafkaバージョン

    ターゲットKafkaクラスターのバージョン。

    暗号化

    ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。

  7. ページの右下隅にある [ホワイトリストと次への設定] をクリックします。

    説明
    • ApsaraDBインスタンス (ApsaraDB RDS for MySQLおよびApsaraDB for MongoDBなど) およびECSホスト型データベースのセキュリティ設定を変更する必要はありません。 DTSは、DTSサーバーのCIDRブロックをApsaraDBインスタンスのホワイトリストまたはElastic Compute Service (ECS) インスタンスのセキュリティグループルールに自動的に追加します。 詳細については、「DTSサーバーのCIDRブロックをオンプレミスデータベースのセキュリティ設定に追加する」をご参照ください。

    • データ同期が完了したら、DTSサーバーのCIDRブロックをホワイトリストまたはセキュリティグループから削除することを推奨します。

  8. 同期するオブジェクトを選択します。

    Select the objects to be synchronized

    パラメーター

    説明

    Kafkaのデータ形式

    Kafkaクラスターに同期されるデータは、AvroまたはCanal JSON形式で保存されます。 詳細については、「Kafkaクラスターのデータ形式」をご参照ください。

    Kafkaパーティションへの出荷データのポリシー

    データをKafkaパーティションに同期するために使用されるポリシー。 ビジネス要件に基づいてポリシーを選択します。 詳細については、「Kafkaパーティションにデータを同期するためのポリシーの指定」をご参照ください。

    同期するオブジェクト

    [使用可能] セクションから1つ以上のテーブルを選択し、Rightwards arrowアイコンをクリックして、[選択済み] セクションにテーブルを追加します。

    説明

    DTSは、テーブル名をステップ6で選択したトピック名にマップします。 テーブル名マッピング機能を使用して、ターゲットクラスターに同期されるトピックを変更できます。 詳細については、「同期するオブジェクトの名前変更」をご参照ください。

    データベースとテーブルの名前変更

    オブジェクト名マッピング機能を使用して、ターゲットインスタンスに同期されるオブジェクトの名前を変更できます。 詳細は、オブジェクト名のマッピングをご参照ください。

    失敗した接続の再試行時間

    既定では、DTSがソースデータベースまたはターゲットデータベースへの接続に失敗した場合、DTSは次の720分 (12時間) 以内に再試行します。 必要に応じて再試行時間を指定できます。 DTSが指定された時間内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    説明

    DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 ビジネスニーズに基づいて再試行時間を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

  9. ページの右下隅にある [次へ] をクリックします。

  10. 初期同期を設定します。

    Kafka: Configure initial synchronization

    パラメーター

    説明

    初期同期

    [初期スキーマ同期][初期フルデータ同期] の両方を選択します。 DTSは、必要なオブジェクトのスキーマと履歴データを同期し、増分データを同期します。

    フィルターオプション

    デフォルトでは、増分同期フェーズでDDLを無視が選択されています。 この場合、DTSは、増分データ同期中にソースデータベースで実行されるDDL操作を同期しません。

  11. ページの右下隅にある [事前チェック] をクリックします。

    説明
    • データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にある提示アイコンをクリックして詳細を表示できます。

      • 詳細に基づいて問題をトラブルシューティングした後、新しい事前チェックを開始します。

      • 問題をトラブルシューティングする必要がない場合は、失敗した項目を無視して新しい事前チェックを開始してください。

  12. 次のメッセージが表示されたら、[事前チェック] ダイアログボックスを閉じます。[事前チェックの合格] その後、データ同期タスクが開始されます。

    [データ同期] ページで、データ同期タスクのステータスを表示できます。 View the status of a data synchronization task