すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:ApsaraDB RDS for MySQLインスタンスのデータをセルフマネージドKafkaクラスターに同期する

最終更新日:Nov 11, 2024

Kafkaは、高スループットと高スケーラビリティを備えた分散メッセージキューサービスです。 Kafkaは、ログ収集、モニタリングデータ集約、ストリーミング処理、オンラインおよびオフライン分析などのビッグデータ分析に広く使用されています。 Kafkaはビッグデータエコシステムにとって不可欠なサービスです。 このトピックでは、RDS MySQLからデータを同期する方法について説明します。 Data Transmission Service (DTS) を使用して、インスタンスを自己管理Kafkaクラスターに追加します。 データ同期機能を使用すると、メッセージ処理機能を拡張できます。

前提条件

  • Kafkaクラスターが作成され、Kafkaのバージョンは0.10.1.0〜2.7.0です。

  • ApsaraDB RDS for MySQLインスタンスが作成されました。 詳細については、「ApsaraDB RDS For MySQLインスタンスの作成」をご参照ください。

使用上の注意

  • DTSは、最初の完全データ同期中に、ソースRDSインスタンスとターゲットRDSインスタンスの読み取りおよび書き込みリソースを使用します。 これにより、RDSインスタンスの負荷が増加する可能性があります。 インスタンスのパフォーマンスが悪い場合、仕様が低い場合、またはデータ量が多い場合、データベースサービスが利用できなくなる可能性があります。 たとえば、ソースRDSインスタンスで多数の低速SQLクエリが実行されている場合、テーブルにプライマリキーがない場合、またはターゲットRDSインスタンスでデッドロックが発生する場合、DTSは大量の読み取りおよび書き込みリソースを占有します。 データ同期の前に、ソースRDSインスタンスとターゲットRDSインスタンスのパフォーマンスに対するデータ同期の影響を評価します。 オフピーク時にデータを同期することを推奨します。 たとえば、ソースRDSインスタンスとターゲットRDSインスタンスのCPU使用率が30% 未満の場合にデータを同期できます。

  • ソースデータベースにはPRIMARY KEYまたはUNIQUE制約が必要で、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。

課金

同期タイプ

タスク設定料金

スキーマ同期と完全データ同期

無料です。

増分データ同期

有料。 詳細については、「課金の概要」をご参照ください。

制限事項

  • 同期するオブジェクトとして選択できるのはテーブルのみです。

  • DTSは、名前が変更されたテーブルのデータをターゲットKafkaクラスターに同期しません。 これは、新しいテーブル名が同期するオブジェクトに含まれていない場合に適用されます。 名前が変更されたテーブルのデータをターゲットKafkaクラスターに同期する場合は、同期するオブジェクトを再選択する必要があります。 詳細については、「データ同期タスクへのオブジェクトの追加」をご参照ください。

サポートしている同期トポロジ

  • 一方向の 1 対 1 の同期

  • 一方向の 1 対多の同期

  • 一方向の多対 1 の同期

  • 一方向のカスケード同期

手順

  1. データ同期インスタンスを購入します。 詳細については、「DTSインスタンスの購入」をご参照ください。

    説明

    購入ページで、ソースインスタンスパラメーターをMySQLに、宛先インスタンスパラメーターをKafkaに、同期トポロジパラメーターを片道同期に設定します。

  2. Data Transmission Service (DTS) コンソールにログインします。

  3. 左側のナビゲーションウィンドウで、データ同期をクリックします。

  4. データ同期タスクページの上部で、ターゲットインスタンスが存在するリージョンを選択します。

  5. データ同期タスクを見つけて、タスクの設定[アクション] 列に表示されます。

  6. ソースインスタンスとターゲットクラスターを設定します。

    Configure the source instance and destination cluster

    セクション

    パラメーター

    説明

    非該当

    同期タスク名

    DTSが自動的に生成するタスク名。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を使用する必要はありません。

    ソースインスタンスの詳細

    インスタンスタイプ

    ソースインスタンスの種類。 RDS インスタンスを選択します。

    インスタンスリージョン

    購入ページで選択したソースリージョン。 このパラメーターの値は変更できません。

    インスタンス ID

    ソースApsaraDB RDSインスタンスのID。

    データベースアカウント

    ソースデータベースへの接続に使用されるアカウント。 アカウントには、必要なオブジェクトに対するSELECT権限と、REPLICATION CLIENT、REPLICATION SLAVE、およびSHOW VIEW権限が必要です。

    データベースパスワード

    ソースデータベースアカウントのパスワード。

    暗号化

    ソースインスタンスへの接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SSL暗号化] を選択します。 SSL暗号化を選択した場合、データ同期タスクを設定する前に、ApsaraDB RDSインスタンスのSSL暗号化を有効にする必要があります。 詳細については、次をご参照ください: クラウド証明書を使用してSSL暗号化を有効にします

    重要

    Encryptionパラメーターは、中国本土および中国 (香港) リージョン内でのみ使用できます。

    ターゲットインスタンスの詳細

    インスタンスタイプ

    Kafkaクラスターのデプロイ。 この例では、ECSインスタンスのユーザー作成データベースが選択されています。

    説明

    インスタンスタイプパラメーターを他の値に設定した場合、Kafkaクラスターのネットワーク環境をデプロイする必要があります。 詳細については、「準備の概要」をご参照ください。

    インスタンスリージョン

    購入ページで選択したターゲットリージョン。 このパラメーターの値は変更できません。

    ECS インスタンス ID

    KafkaクラスターがデプロイされているECS (Elastic Compute Service) インスタンスのID。

    説明

    Kafkaがクラスターアーキテクチャにデプロイされている場合、クラスターの1つのノードが存在するECSインスタンスのIDのみを選択する必要があります。 DTSは、Kafkaクラスター内のすべてのノードのトピック情報を自動的に取得します。

    データベースエンジン

    ターゲットデータベースのタイプ。 Kafkaを選択します。

    ポート番号

    Kafkaクラスターのサービスポート番号。 デフォルト値: 9092

    データベースアカウント

    Kafkaクラスターへのログインに使用されるユーザー名。 Kafkaクラスターで認証が有効になっていない場合は、ユーザー名を入力する必要はありません。

    データベースパスワード

    Kafkaクラスターのパスワード。 Kafkaクラスターで認証が有効になっていない場合は、パスワードを入力する必要はありません。

    Kafkaバージョン

    ターゲットKafkaクラスターのバージョン。

    暗号化

    接続先クラスターへの接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。

    トピック

    データが同期されるトピックの名前。 [トピックリストの取得] をクリックし、ドロップダウンリストからトピック名を選択します。

    DDL情報を保存するトピック

    DDL情報の格納に使用されるトピック。 ドロップダウンリストからトピックを選択します。 このパラメーターを指定しない場合、DDL情報はtopicパラメーターで指定されたトピックに格納されます。

    Kafkaスキーマレジストリの使用

    Kafka Schema Registryは、メタデータの提供レイヤーを提供します。 Avroスキーマを保存および取得するためのRESTful APIを提供します。

    • いいえ: Kafka Schema Registryを使用しません。

    • はい: Kafka Schema Registryを使用します。 この場合、AvroスキーマのKafka Schema Registryに登録されているURLまたはIPアドレスを入力する必要があります。

  7. ページの右下隅にあるホワイトリストと次への設定をクリックします。

    、ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのIPアドレスホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 自己管理データベースが複数のECSインスタンスでホストされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのIPアドレスホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。

    警告

    DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。

  8. 同期するオブジェクトを選択します。

    Select the objects to be synchronized

    パラメーター

    説明

    Kafkaのデータ形式

    Kafkaクラスターに同期されるデータは、AvroまたはCanal JSON形式で保存されます。 詳細については、「Kafkaクラスターのデータ形式」をご参照ください。

    Kafkaパーティションへの出荷データのポリシー

    データをKafkaパーティションに同期するために使用されるポリシー。 ビジネス要件に基づいてポリシーを選択します。 詳細については、「Kafkaパーティションにデータを同期するためのポリシーの指定」をご参照ください。

    同期するオブジェクト

    [使用可能] セクションから1つ以上のテーブルを選択し、Rightwards arrowアイコンをクリックして、[選択済み] セクションにテーブルを追加します。

    説明

    DTSは、テーブル名をステップ6で選択したトピック名にマップします。 テーブル名マッピング機能を使用して、ターゲットクラスターに同期されるトピックを変更できます。 詳細については、「同期するオブジェクトの名前変更」をご参照ください。

    データベースとテーブルの名前変更

    オブジェクト名マッピング機能を使用して、ターゲットインスタンスに同期されるオブジェクトの名前を変更できます。 詳細は、オブジェクト名のマッピングをご参照ください。

    失敗した接続の再試行時間

    既定では、DTSがソースデータベースまたはターゲットデータベースへの接続に失敗した場合、DTSは次の720分 (12時間) 以内に再試行します。 必要に応じて再試行時間を指定できます。 DTSが指定された時間内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    説明

    DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 ビジネスニーズに基づいて再試行時間を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

  9. ページの右下隅にある [次へ] をクリックします。

  10. 初期同期を設定します。

    Kafka: Configure initial synchronization

    パラメーター

    説明

    初期同期

    [初期スキーマ同期][初期フルデータ同期] の両方を選択します。 DTSは、必要なオブジェクトのスキーマと履歴データを同期し、増分データを同期します。

    フィルターオプション

    デフォルトでは、増分同期フェーズでDDLを無視が選択されています。 この場合、DTSは、増分データ同期中にソースデータベースで実行されるDDL操作を同期しません。

  11. ページの右下隅にある事前チェックをクリックします。

    説明
    • データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にある提示アイコンをクリックして詳細を表示できます。

      • 詳細に基づいて問題をトラブルシューティングした後、新しい事前チェックを開始します。

      • 問題をトラブルシューティングする必要がない場合は、失敗した項目を無視して新しい事前チェックを開始してください。

  12. 次のメッセージが表示されたら、[事前チェック] ダイアログボックスを閉じます。[事前チェックの合格] その後、データ同期タスクが開始されます。

    [データ同期] ページで、データ同期タスクのステータスを表示できます。 View the status of a data synchronization task