このトピックでは、data Transmission Service (DTS) を使用して、セルフマネージドMySQLデータベースからMessage Queue for Apache Kafkaインスタンスにデータを同期する方法について説明します。 データ同期機能を使用すると、メッセージ処理機能を拡張できます。
前提条件
自己管理型MySQLデータベースのバージョンは、5.1、5.5、5.6、5.7、または8.0です。
Apache Kafkaインスタンスの宛先Message Queueのバージョンは、0.10.1.0〜2.xです。
Apache Kafkaインスタンスの宛先Message Queueで、同期されたデータを受信するトピックが作成されます。 詳細については、次をご参照ください: トピックを作成します。
背景情報
ApsaraMQ for Kafkaは、Alibaba Cloudが提供する分散型、高スループット、スケーラブルなメッセージキューサービスです。 オープンソースのApache Kafkaにフルマネージドサービスを提供し、オープンソース製品の長年の欠点を解決します。 Apache KafkaのMessage Queueを使用すると、デプロイとO&Mに多くの時間を費やすことなく、ビジネス開発に集中できます。Message Queue for Apache Kafkaは、ログ収集、モニタリングデータ集約、ストリーミングデータ処理、オンラインおよびオフライン分析などのビッグデータシナリオで使用されます。 ビッグデータエコシステムにとって重要です。
使用上の注意
DTSは、最初の完全データ同期中に、ソースRDSインスタンスとターゲットRDSインスタンスの読み取りおよび書き込みリソースを使用します。 これにより、RDSインスタンスの負荷が増加する可能性があります。 インスタンスのパフォーマンスが悪い場合、仕様が低い場合、またはデータ量が多い場合、データベースサービスが利用できなくなる可能性があります。 たとえば、ソースRDSインスタンスで多数の低速SQLクエリが実行されている場合、テーブルにプライマリキーがない場合、またはターゲットRDSインスタンスでデッドロックが発生する場合、DTSは大量の読み取りおよび書き込みリソースを占有します。 データ同期の前に、ソースRDSインスタンスとターゲットRDSインスタンスのパフォーマンスに対するデータ同期の影響を評価します。 オフピーク時にデータを同期することを推奨します。 たとえば、ソースRDSインスタンスとターゲットRDSインスタンスのCPU使用率が30% 未満の場合にデータを同期できます。
ソースデータベースにはPRIMARY KEYまたはUNIQUE制約が必要で、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。
制限事項
同期するオブジェクトとして選択できるのはテーブルのみです。
DTSは、名前が変更されたテーブルのデータをターゲットKafkaクラスターに同期しません。 これは、新しいテーブル名が同期するオブジェクトに含まれていない場合に適用されます。 名前が変更されたテーブルのデータをターゲットKafkaクラスターに同期する場合は、同期するオブジェクトを再選択する必要があります。 詳細については、「データ同期タスクへのオブジェクトの追加」をご参照ください。
始める前に
手順
データ同期インスタンスを購入します。 詳細については、「DTSインスタンスの購入」をご参照ください。
説明購入ページで、ソースインスタンスをMySQLに設定し、DstinationインスタンスをKafkaに設定し、同期トポロジを一方向同期に設定します。
にログインします。 DTSコンソール。
説明データ管理 (DMS) コンソールにリダイレクトされている場合は、にあるアイコンをクリックして、以前のバージョンのDTSコンソールに移動し。
左側のナビゲーションウィンドウで、[データ同期] をクリックします。
[同期タスク] ページの上部で、ターゲットインスタンスが存在するリージョンを選択します。
データ同期インスタンスを見つけ、[操作] 列の [タスクの設定] をクリックします。
ソースインスタンスとターゲットインスタンスを設定します。
セクション
パラメーター
説明
なし
同期タスク名
タスク名は自動生成されます。 タスクを識別するために、有益な名前を指定することを推奨します。 一意のタスク名を使用する必要はありません。
ソースインスタンスの詳細
インスタンスタイプ
ソースデータベースのデプロイに基づいてインスタンスタイプを選択します。 この例では、[ECSインスタンスのユーザー作成データベース] を選択します。 この手順に従って、他のタイプの自己管理型MySQLデータベースのデータ同期タスクを設定することもできます。
インスタンスリージョン
購入ページで選択したソースリージョン。 このパラメーターの値は変更できません。
ECS インスタンス ID
自己管理型MySQLデータベースをホストするElastic Compute Service (ECS) インスタンスのIDを選択します。
データベースエンジン
このパラメーターはMySQLに設定されており、変更できません。
ポート番号
自己管理型MySQLデータベースのサービスポート番号を入力します。
データベースアカウント
自己管理型MySQLデータベースのアカウントを入力します。 アカウントには、必要なオブジェクトに対するSELECT権限、REPLICATION CLIENT権限、REPLICATION SLAVE権限、およびSHOW VIEW権限が必要です。
データベースパスワード
データベースアカウントのパスワードを入力します。
ターゲットインスタンスの詳細
インスタンスタイプ
Express Connect、VPN Gateway、またはSmart Access Gateway経由で接続されたユーザー作成データベースを選択します。
説明インスタンスタイプとしてApache KafkaのMessage Queueを選択することはできません。 Apache KafkaのMessage Queueを自己管理Kafkaデータベースとして使用して、データ同期を設定できます。
インスタンスリージョン
購入ページで選択したターゲットリージョン。 このパラメーターの値は変更できません。
ピア VPC
Apache Kafkaインスタンスの宛先Message Queueが属する仮想プライベートクラウド (VPC) のIDを選択します。 VPC IDを取得するには、Message Queue for Apache Kafkaコンソールにログインし、Message Queue for Apache Kafkaインスタンスの [インスタンスの詳細] ページに移動します。 [基本情報] セクションで、VPC IDを表示できます。
データベースエンジン
Kafkaを選択します。
IP アドレス
Message Queue for Apache KafkaインスタンスのDefault Endpointパラメーターに含まれるIPアドレスを入力します。
説明IPアドレスを取得するには、Message Queue for Apache Kafkaコンソールにログインし、Message Queue for Apache Kafkaインスタンスの [インスタンスの詳細] ページに移動します。 [基本情報] セクションでは、[デフォルトのエンドポイント] パラメーターからIPアドレスを取得できます。
ポート番号
Message Queue for Apache Kafkaインスタンスのサービスポート番号を入力します。 デフォルトのポート番号は9092です。
データベースアカウント
Apache KafkaインスタンスのMessage Queueへのログインに使用するユーザー名を入力します。
説明Apache KafkaインスタンスのMessage QueueのインスタンスタイプがVPCインスタンスの場合、データベースアカウントまたはデータベースパスワードを指定する必要はありません。
データベースパスワード
ユーザー名のパスワードを入力します。
トピック
[トピックリストの取得] をクリックし、ドロップダウンリストからトピック名を選択します。
Kafkaバージョン
Apache KafkaインスタンスのMessage Queueのバージョンを選択します。
暗号化
ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。
ページの右下隅にあるをクリックします。ホワイトリストと次への設定.
、ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQL、ApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのIPアドレスホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 自己管理データベースが複数のECSインスタンスでホストされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのIPアドレスホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。
警告DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。
同期するオブジェクトを選択します。
パラメーター
説明
Kafkaのデータ形式
Kafkaクラスターに同期されるデータは、AvroまたはCanal JSON形式で保存されます。 詳細については、「Kafkaクラスターのデータ形式」をご参照ください。
Kafkaパーティションへの出荷データのポリシー
データをKafkaパーティションに同期するために使用されるポリシー。 ビジネス要件に基づいてポリシーを選択します。 詳細については、「Kafkaパーティションにデータを同期するためのポリシーの指定」をご参照ください。
同期するオブジェクト
[使用可能] セクションから1つ以上のテーブルを選択し、アイコンをクリックして、[選択済み] セクションにテーブルを追加します。
説明DTSは、テーブル名をステップ6で選択したトピック名にマップします。 テーブル名マッピング機能を使用して、ターゲットクラスターに同期されるトピックを変更できます。 詳細については、「同期するオブジェクトの名前変更」をご参照ください。
データベースとテーブルの名前変更
オブジェクト名マッピング機能を使用して、ターゲットインスタンスに同期されるオブジェクトの名前を変更できます。 詳細は、オブジェクト名のマッピングをご参照ください。
失敗した接続の再試行時間
既定では、DTSがソースデータベースまたはターゲットデータベースへの接続に失敗した場合、DTSは次の720分 (12時間) 以内に再試行します。 必要に応じて再試行時間を指定できます。 DTSが指定された時間内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。
説明DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 ビジネスニーズに基づいて再試行時間を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。
ページの右下隅にある [次へ] をクリックします。
初期同期を設定します。
パラメーター
説明
初期同期
[初期スキーマ同期] と [初期フルデータ同期] の両方を選択します。 DTSは、必要なオブジェクトのスキーマと履歴データを同期し、増分データを同期します。
フィルターオプション
デフォルトでは、増分同期フェーズでDDLを無視が選択されています。 この場合、DTSは、増分データ同期中にソースデータベースで実行されるDDL操作を同期しません。
ページの右下隅にあるをクリックします。事前チェック.
説明データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。
タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にあるアイコンをクリックして詳細を表示できます。
詳細に基づいて問題をトラブルシューティングした後、新しい事前チェックを開始します。
問題をトラブルシューティングする必要がない場合は、失敗した項目を無視して新しい事前チェックを開始してください。
次のメッセージが表示されたら、[事前チェック] ダイアログボックスを閉じます。[事前チェックの合格] その後、データ同期タスクが開始されます。
[データ同期] ページで、データ同期タスクのステータスを表示できます。