このトピックでは、data Transmission Service (DTS) を使用して、PolarDB for MySQLクラスターからKafkaクラスターにデータを移行する方法について説明します。 これにより、メッセージの管理機能が向上します。
前提条件
移行先のセルフマネージドKafkaクラスターまたはApsaraMQ for Kafkaインスタンスが作成されました。
説明ApsaraMQ for Kafkaインスタンスがターゲットインスタンスとして使用されている場合、インスタンスが自己管理型Kafkaクラスターとして設定され、移行するデータを受け取るトピックが作成されていることを確認します。 トピックの作成方法については、「手順1: トピックの作成」をご参照ください。
ターゲットインスタンスの使用可能なストレージ容量が、ソースPolarDB for MySQLクラスターのデータの合計サイズよりも大きいこと。
制限事項
DTSは、ソースデータベースの外部キーをターゲットデータベースに移行しません。 したがって、ソースデータベースのカスケードおよび削除操作は、ターゲットデータベースに移行されません。
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
使用上の注意 |
|
課金
移行タイプ | インスタンス設定料金 | インターネットトラフィック料金 |
スキーマ移行とフルデータ移行 | 無料です。 | インターネット経由でAlibaba Cloudからデータが移行された場合にのみ課金されます。 詳細については、「課金の概要」をご参照ください。 |
増分データ移行 | 有料。 詳細については、「課金の概要」をご参照ください。 |
移行タイプ
スキーマ移行
DTSは、選択したオブジェクトのスキーマをソースデータベースからターゲットデータベースに移行します。
完全なデータ移行
DTSは、必要なオブジェクトの履歴データをソースデータベースからターゲットデータベースに移行します。
増分データ移行
完全データ移行が完了すると、DTSは増分データをソースデータベースからターゲットデータベースに移行します。 増分データ移行により、データ移行中に自己管理型アプリケーションのサービスを中断することなく、データをスムーズに移行できます。
増分移行可能なSQL操作
操作タイプ | SQL文 |
DML | 挿入、更新、および削除 |
DDL | CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE、およびTRUNCATE TABLE |
データベースアカウントに必要な権限
データベース | 必要な権限 | 関連ドキュメント |
PolarDB for MySQLクラスター | 移行するオブジェクトの読み取り権限 |
手順
[データ移行タスク] ページに移動します。
データ管理 (DMS) コンソール にログインします。
上部のナビゲーションバーで、ポインタをDTSの上に移動します。
を選択します。
説明実際の操作は、DMSコンソールのモードとレイアウトによって異なります。 詳細については、「シンプルモード」および「DMSコンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
新しいDTSコンソールのデータ移行ページに行くこともできます 。
データ移行タスクの右側にあるドロップダウンリストから、データ移行インスタンスが存在するリージョンを選択します。
説明新しいDTSコンソールを使用する場合は、左上隅にデータ移行インスタンスが存在するリージョンを選択する必要があります。
[タスクの作成] をクリックします。 [データ移行タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。
セクション
パラメーター
説明
非該当
タスク名
タスクの名前。 タスク名は自動生成されます。 タスクを識別するために、有益な名前を指定することを推奨します。 一意のタスク名を指定する必要はありません。
ソースデータベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
移行元ディスクのタイプを設定します。 [PolarDB for MySQL] を選択します。
アクセス方法
ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。
インスタンスリージョン
ソースPolarDB for MySQLインスタンスが存在するリージョン。
Alibaba Cloudアカウント全体でのデータの複製
Alibaba Cloudアカウント間でデータを移行するかどうかを指定します。 この例では、[いいえ] が選択されています。
PolarDBクラスターID
ソースPolarDB for MySQLクラスターのID。
データベースアカウント
ソースPolarDB for MySQLクラスターのデータベースアカウント。 アカウントに必要な権限の詳細については、このトピックの「データベースアカウントに必要な権限」をご参照ください。
データベースパスワード
データベースインスタンスへのアクセスに使用されるパスワード。
宛先データベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
ターゲットデータベースのタイプ。 Kafkaを選択します。
アクセス方法
ターゲットデータベースのアクセス方法。 この例では、ECS上の自己管理データベースが選択されています。
説明ターゲットデータベースが自己管理データベースの場合、データベースのネットワーク環境を展開する必要があります。 詳細については、「準備の概要」をご参照ください。
DTSでは、ApsaraMQ for Kafkaインスタンスのアクセス方法を直接設定することはできません。 ApsaraMQ for Kafkaインスタンスをターゲットインスタンスとして使用する場合、Express Connect、VPN Gateway、またはSmart Access Gatewayを選択し、インスタンスをセルフマネージドKafkaクラスターとして設定します。
インスタンスリージョン
ターゲットKafkaクラスターが存在するリージョン。
ECSインスタンスID
ターゲットKafkaクラスターがデプロイされているElastic Compute Service (ECS) インスタンスのID。
ポート番号
宛先Kafkaクラスターのサービスポート番号。 デフォルト値: 9092
データベースアカウント
ターゲットKafkaクラスターへのログインに使用されるユーザー名。 Kafkaクラスターで認証が有効になっていない場合は、ユーザー名を入力する必要はありません。
データベースパスワード
ユーザー名のパスワード。 Kafkaクラスターで認証が有効になっていない場合は、パスワードを入力する必要はありません。
Kafkaバージョン
ターゲットKafkaクラスターのバージョン。
説明セルフマネージドKafkaクラスターのバージョンが1.0以降の場合は、later Than 1.0を選択します。
暗号化
接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。
トピック
ドロップダウンリストからトピックを選択します。
DDL情報を保存するトピック
ドロップダウンリストからトピックを選択します。 このトピックは、DDL情報を格納するために使用されます。 このパラメーターを設定しない場合、DDL情報はtopicパラメーターで指定されたトピックに保存されます。
Kafkaスキーマレジストリの使用
メタデータのサービングレイヤーを提供するKafka Schema Registryを使用するかどうかを指定します。 Avroスキーマを保存および取得するためのRESTful APIを提供します。 有効な値:
いいえ: Kafka Schema Registryを使用しません。
はい: Kafka Schema Registryを使用します。 この場合、AvroスキーマのKafka Schema Registryに登録されているURLまたはIPアドレスを入力する必要があります。
ページの下部で、[接続のテストと続行] をクリックします。
、ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQL、ApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのIPアドレスホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 自己管理データベースが複数のECSインスタンスでホストされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのIPアドレスホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」トピックの「DTSサーバーのCIDRブロック」セクションをご参照ください。
警告DTSサーバーのパブリックCIDRブロックがデータベースインスタンスのホワイトリストまたはECSインスタンスのセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを移行する前に、潜在的なリスクを理解して認識し、ユーザー名とパスワードのセキュリティの強化、公開されるポートの制限、API呼び出しの認証、ホワイトリストまたはセキュリティグループルールの定期的なチェック、CIDRブロックの禁止、またはExpress Connectを使用したデータベースインスタンスのDTSへの接続、VPNゲートウェイ、またはSmart Access Gateway。
移行するオブジェクトと詳細設定を設定します。
パラメーター
説明
移行タイプ
フルデータ移行のみを実行するには、[スキーマ移行] と [フルデータ移行] を選択します。
データ移行中のサービスの継続性を確保するには、[スキーマ移行] 、[フルデータ移行] 、および [増分データ移行] を選択します。
説明増分データ移行を選択しない場合、データ移行中にソースデータベースにデータを書き込まないことを推奨します。 これにより、ソースデータベースとターゲットデータベース間のデータの整合性が確保されます。
競合テーブルの処理モード
事前チェックエラーとレポートエラー: ターゲットインスタンスに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットインスタンスに同じテーブル名を持つテーブルが含まれていない場合、事前チェックが渡されます。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。
説明オブジェクト名マッピング機能を使用して、ターゲットインスタンスに同期されるテーブルの名前を変更できます。 ソースデータベースとターゲットインスタンスに同じテーブル名が含まれており、ターゲットインスタンスのテーブルを削除または名前変更できない場合、この機能を使用できます。 詳細については、「マップオブジェクト名」をご参照ください。
宛先テーブルのクリア: 宛先テーブルからデータをクリアします。 作業は慎重に行ってください。
エラーを無視して続行: ソースデータベースとターゲットインスタンスの同じテーブル名の事前チェックをスキップします。
警告[エラーを無視して続行] を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
ソースデータベースとターゲットデータベースが同じスキーマを持ち、データレコードがターゲットデータベースの既存のデータレコードと同じ主キー値を持つ場合:
完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。
増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。
ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが移行されるか、データ移行タスクが失敗します。
Kafkaのデータ形式
データレコードがMessage Queue for Apache Kafkaインスタンスに格納される形式。
DTS Avroを選択した場合、データはDTS Avroのスキーマ定義に基づいて解析されます。 詳細については、『GitHub』をご参照ください。
Canal Jsonを選択した場合、データはCanal JSON形式で保存されます。 関連するパラメーターと例の詳細については、「Kafkaクラスターのデータ形式」トピックの「Canal JSON」セクションをご参照ください。
Kafkaパーティションへの出荷データのポリシー
ビジネス要件に基づいて、Kafkaパーティションへのデータ移行のポリシーを選択します。 詳細については、「データをKafkaパーティションに移行するためのポリシーの指定」をご参照ください。
宛先インスタンスでのオブジェクト名の大文字化
ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化が、ソースまたはターゲットデータベースのオブジェクト名のデフォルトの大文字化と一致するようにすることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから1つ以上のオブジェクトを選択します。 アイコンをクリックし、[選択済みオブジェクト] セクションにオブジェクトを追加します。
説明移行するオブジェクトとしてデータベースのみを選択できます。
[選択済みオブジェクト]
移行先インスタンスに移行するオブジェクトの名前を変更するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 詳細については、「単一オブジェクトの名前のマッピング」をご参照ください。
一度に複数のオブジェクトの名前を変更するには、[選択済みオブジェクト] セクションの右上隅にある [一括編集] をクリックします。 詳細については、「一度に複数のオブジェクト名をマップする」をご参照ください。
説明オブジェクト名マッピング機能を使用してオブジェクトの名前を変更すると、そのオブジェクトに依存する他のオブジェクトの移行に失敗する可能性があります。
データをフィルタリングするWHERE条件を指定するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルター条件の指定」をご参照ください。
特定のデータベースまたはテーブルで実行されたSQL操作を選択するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、移行するSQL操作を選択します。 移行可能なSQL操作の詳細については、「SQL操作の増分移行」をご参照ください。
[次へ: 詳細設定] をクリックします。
パラメーター
説明
Set Alerts
データ移行タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、移行の待ち時間が指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:
No: アラートを設定しません。
Yes: アラートを設定します。 この場合、アラートしきい値と アラート通知設定 詳細については、「モニタリングとアラートの設定」トピックの「DTSタスクを作成するときのモニタリングとアラートの設定」をご参照ください。
失敗した接続の再試行時間
失敗した接続のリトライ時間範囲。 データ移行タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 パラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ移行タスクを再開します。 それ以外の場合、データ移行タスクは失敗します。
説明ソースまたはターゲットデータベースが同じである複数のデータ移行タスクに対して異なるリトライ時間範囲を設定した場合、設定された最短のリトライ時間範囲が優先されます。
DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。
ETLの設定
抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 詳細については、「ETLとは何ですか? 」をご参照ください。有効な値:
Yes: ETL機能を設定します。 コードエディターでデータ処理ステートメントを入力できます。 詳細については、「データ移行またはデータ同期タスクでのETLの設定」をご参照ください。
No: ETL機能を設定しません。
順方向および逆方向タスクのハートビートテーブル sql を削除
DTSインスタンスの実行中に、ハートビートテーブルのSQL操作をソースデータベースに書き込むかどうかを指定します。 有効な値:
Yes: ハートビートテーブルにSQL操作を書き込みません。 この場合、DTSインスタンスのレイテンシが表示され得る。
No: ハートビートテーブルにSQL操作を書き込みます。 この場合、ソースデータベースの物理バックアップやクローニングなどの機能が影響を受ける可能性があります。
ページの下部で、[次へ: タスク設定の保存と事前チェック] をクリックします。
ポインタを 次:タスク設定の保存と事前チェック に移動し、[OpenAPIパラメーターのプレビュー] をクリックして、関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示できます。
説明データ移行タスクを開始する前に、DTSは事前チェックを実行します。 データ移行タスクは、タスクが事前チェックに合格した後にのみ開始できます。
タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。
事前チェック中にアイテムに対してアラートがトリガーされた場合:
アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
成功率が100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。
[インスタンスの購入] ページで、データ移行インスタンスのインスタンスクラスパラメーターを設定します。 下表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
リソースグループ
データ移行インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「リソース管理とは 」をご参照ください。
インスタンスクラス
DTSは、移行速度が異なるインスタンスクラスを提供します。 ビジネスシナリオに基づいてインスタンスクラスを選択できます。 詳細については、「データ移行インスタンスのインスタンスクラス」をご参照ください。
読んで同意するデータ伝送サービス (従量課金) サービス規約チェックボックスを選択します。
[購入して開始] をクリックします。 表示されるメッセージで、 [OK] をクリックします。
[データ移行] ページでタスクの進行状況を確認できます。