すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB for MySQLクラスターからKafkaクラスターへのデータの移行

最終更新日:Oct 31, 2024

このトピックでは、data Transmission Service (DTS) を使用して、PolarDB for MySQLクラスターからKafkaクラスターにデータを移行する方法について説明します。 これにより、メッセージの管理機能が向上します。

前提条件

  • 移行先のセルフマネージドKafkaクラスターまたはApsaraMQ for Kafkaインスタンスが作成されました。

    説明

    ApsaraMQ for Kafkaインスタンスがターゲットインスタンスとして使用されている場合、インスタンスが自己管理型Kafkaクラスターとして設定され、移行するデータを受け取るトピックが作成されていることを確認します。 トピックの作成方法については、「手順1: トピックの作成」をご参照ください。

  • ターゲットインスタンスの使用可能なストレージ容量が、ソースPolarDB for MySQLクラスターのデータの合計サイズよりも大きいこと。

制限事項

説明

DTSは、ソースデータベースの外部キーをターゲットデータベースに移行しません。 したがって、ソースデータベースのカスケードおよび削除操作は、ターゲットデータベースに移行されません。

カテゴリ

説明

ソースデータベースの制限

  • ソースデータベースが属するサーバーには、十分なアウトバウンド帯域幅が必要です。 そうしないと、データ移行速度が低下します。

  • 移行するテーブルには、PRIMARY KEYまたはUNIQUE制約が必要であり、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。

  • 移行するオブジェクトとしてテーブルを選択し、移行先データベースのテーブルを編集する場合 (テーブルまたは列の名前の変更など) 、1つのデータ移行タスクで最大1,000のテーブルを移行できます。 タスクを実行して1,000を超えるテーブルを移行すると、リクエストエラーが発生します。 この場合、複数のタスクを構成してテーブルをバッチで移行するか、タスクを構成してデータベース全体を移行することをお勧めします。

  • ソースの読み取り専用ノードのデータクラスターは移行できません。

  • 増分データを移行する必要がある場合は、バイナリロギング機能を有効にし、loose_polar_log_binパラメーターをonに設定する必要があります。 それ以外の場合、事前チェック中にエラーメッセージが返され、データ移行タスクを開始できません。 バイナリログ機能を有効にし、loose_polar_log_binパラメーターを設定する方法の詳細については、「バイナリログの有効化」および「パラメーターの変更」をご参照ください。

    説明
    • PolarDB for MySQLクラスターのバイナリログ機能を有効にすると、バイナリログによって占有されているストレージ容量に対して課金されます。

    • 増分データ移行タスクの場合、ソースデータベースのバイナリログを24時間以上保存する必要があります。 完全データおよび増分データ移行タスクの場合、ソースデータベースのバイナリログを少なくとも7日間保存する必要があります。 そうしないと、DTSはバイナリログの取得に失敗し、タスクが失敗する可能性があります。 例外的な状況では、データの不整合または損失が発生します。 完全なデータ移行が完了したら、保持期間を24時間以上に設定できます。 上記の要件に基づいて、バイナリログの保持期間を設定してください。 それ以外の場合、DTSのサービスレベル契約 (SLA) はサービスの信頼性またはパフォーマンスを保証しません。

  • ソースデータベースで実行する操作の制限:

    • スキーマ移行中および完全データ移行中は、DDL操作を実行してデータベースまたはテーブルのスキーマを変更しないでください。 それ以外の場合、データ移行タスクは失敗します。

    • フルデータ移行のみを実行する場合は、データ移行中にソースデータベースにデータを書き込まないでください。 そうしないと、ソースデータベースとターゲットデータベース間でデータの不一致が発生します。 データの一貫性を確保するために、移行タイプとしてスキーマ移行、フルデータ移行、および増分データ移行を選択することを推奨します。

その他の制限

  • データ移行中にオブジェクトに対してDDL操作を実行するためにpt-online-schema-changeなどのツールを使用しないことを推奨します。 そうしないと、データ移行が失敗する可能性があります。

  • DTSはROUND(COLUMN,PRECISION) 関数を使用して、FLOATまたはDOUBLEデータ型の列から値を取得します。 精度を指定しない場合、DTSはFLOATデータ型の精度を38桁に設定し、DOUBLEデータ型の精度を308桁に設定します。 業務要件を満足する精度が設定されていることを確認する必要があります。

  • データを移行する前に、移行元データベースと移行先データベースのパフォーマンスに対するデータ移行の影響を評価します。 オフピーク時にデータを移行することを推奨します。 完全データ移行中、DTSはソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。 これは、データベースサーバの負荷を増加させる可能性がある。

  • DTSは、過去7日以内に失敗したデータ移行タスクを再開しようとします。 ワークロードをターゲットデータベースに切り替える前に、失敗したタスクを停止またはリリースします。 REVOKEステートメントを実行して、DTSがターゲットデータベースにアクセスするために使用するアカウントの書き込み権限を取り消すこともできます。 そうしないと、タスクの再開後に、ソースデータベースのデータがターゲットデータベースのデータを上書きします。

使用上の注意

  • DTSは、CREATE DATABASE IF NOT EXISTS 'test' ステートメントをソースデータベースで実行し、バイナリログファイルの位置を前に移動します。

  • 完全データ移行中、同時INSERT操作により、ターゲットデータベースのテーブルが断片化されます。 完全データ移行が完了すると、移行先データベースの使用表領域のサイズが移行元データベースのサイズよりも大きくなります。

課金

移行タイプ

インスタンス設定料金

インターネットトラフィック料金

スキーマ移行とフルデータ移行

無料です。

インターネット経由でAlibaba Cloudからデータが移行された場合にのみ課金されます。 詳細については、「課金の概要」をご参照ください。

増分データ移行

有料。 詳細については、「課金の概要」をご参照ください。

移行タイプ

  • スキーマ移行

    DTSは、選択したオブジェクトのスキーマをソースデータベースからターゲットデータベースに移行します。

  • 完全なデータ移行

    DTSは、必要なオブジェクトの履歴データをソースデータベースからターゲットデータベースに移行します。

  • 増分データ移行

    完全データ移行が完了すると、DTSは増分データをソースデータベースからターゲットデータベースに移行します。 増分データ移行により、データ移行中に自己管理型アプリケーションのサービスを中断することなく、データをスムーズに移行できます。

増分移行可能なSQL操作

操作タイプ

SQL文

DML

挿入、更新、および削除

DDL

CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE、およびTRUNCATE TABLE

データベースアカウントに必要な権限

データベース

必要な権限

関連ドキュメント

PolarDB for MySQLクラスター

移行するオブジェクトの読み取り権限

データベースアカウントの作成と管理

手順

  1. [データ移行タスク] ページに移動します。

    1. データ管理 (DMS) コンソール にログインします。

    2. 上部のナビゲーションバーで、ポインタをDTS上に移動します。

    3. DTS (DTS) > データ移行を選択します。

    説明
  2. データ移行タスクの右側にあるドロップダウンリストから、データ移行インスタンスが存在するリージョンを選択します。

    説明

    新しいDTSコンソールを使用する場合は、左上隅にデータ移行インスタンスが存在するリージョンを選択する必要があります。

  3. [タスクの作成] をクリックします。 [データ移行タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。

    セクション

    パラメーター

    説明

    非該当

    タスク名

    タスクの名前。 タスク名は自動生成されます。 タスクを識別するために、有益な名前を指定することを推奨します。 一意のタスク名を指定する必要はありません。

    ソースデータベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    移行元ディスクのタイプを設定します。 [PolarDB for MySQL] を選択します。

    アクセス方法

    ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。

    インスタンスリージョン

    ソースPolarDB for MySQLインスタンスが存在するリージョン。

    Alibaba Cloudアカウント全体でのデータの複製

    Alibaba Cloudアカウント間でデータを移行するかどうかを指定します。 この例では、[いいえ] が選択されています。

    PolarDBクラスターID

    ソースPolarDB for MySQLクラスターのID。

    データベースアカウント

    ソースPolarDB for MySQLクラスターのデータベースアカウント。 アカウントに必要な権限の詳細については、このトピックの「データベースアカウントに必要な権限」をご参照ください。

    データベースパスワード

    データベースインスタンスへのアクセスに使用されるパスワード。

    宛先データベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    ターゲットデータベースのタイプ。 Kafkaを選択します。

    アクセス方法

    ターゲットデータベースのアクセス方法。 この例では、ECS上の自己管理データベースが選択されています。

    説明
    • ターゲットデータベースが自己管理データベースの場合、データベースのネットワーク環境を展開する必要があります。 詳細については、「準備の概要」をご参照ください。

    • DTSでは、ApsaraMQ for Kafkaインスタンスのアクセス方法を直接設定することはできません。 ApsaraMQ for Kafkaインスタンスをターゲットインスタンスとして使用する場合、Express Connect、VPN Gateway、またはSmart Access Gatewayを選択し、インスタンスをセルフマネージドKafkaクラスターとして設定します。

    インスタンスリージョン

    ターゲットKafkaクラスターが存在するリージョン。

    ECSインスタンスID

    ターゲットKafkaクラスターがデプロイされているElastic Compute Service (ECS) インスタンスのID。

    ポート番号

    宛先Kafkaクラスターのサービスポート番号。 デフォルト値: 9092

    データベースアカウント

    ターゲットKafkaクラスターへのログインに使用されるユーザー名。 Kafkaクラスターで認証が有効になっていない場合は、ユーザー名を入力する必要はありません。

    データベースパスワード

    ユーザー名のパスワード。 Kafkaクラスターで認証が有効になっていない場合は、パスワードを入力する必要はありません。

    Kafkaバージョン

    ターゲットKafkaクラスターのバージョン。

    説明

    セルフマネージドKafkaクラスターのバージョンが1.0以降の場合は、later Than 1.0を選択します。

    暗号化

    接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。

    トピック

    ドロップダウンリストからトピックを選択します。

    DDL情報を保存するトピック

    ドロップダウンリストからトピックを選択します。 このトピックは、DDL情報を格納するために使用されます。 このパラメーターを設定しない場合、DDL情報はtopicパラメーターで指定されたトピックに保存されます。

    Kafkaスキーマレジストリの使用

    メタデータのサービングレイヤーを提供するKafka Schema Registryを使用するかどうかを指定します。 Avroスキーマを保存および取得するためのRESTful APIを提供します。 有効な値:

    • いいえ: Kafka Schema Registryを使用しません。

    • はい: Kafka Schema Registryを使用します。 この場合、AvroスキーマのKafka Schema Registryに登録されているURLまたはIPアドレスを入力する必要があります。

  4. ページの下部で、[接続のテストと続行] をクリックします。

    、ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのIPアドレスホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 自己管理データベースが複数のECSインスタンスでホストされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのIPアドレスホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」トピックの「DTSサーバーのCIDRブロック」セクションをご参照ください。

    警告

    DTSサーバーのパブリックCIDRブロックがデータベースインスタンスのホワイトリストまたはECSインスタンスのセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを移行する前に、潜在的なリスクを理解して認識し、ユーザー名とパスワードのセキュリティの強化、公開されるポートの制限、API呼び出しの認証、ホワイトリストまたはセキュリティグループルールの定期的なチェック、CIDRブロックの禁止、またはExpress Connectを使用したデータベースインスタンスのDTSへの接続、VPNゲートウェイ、またはSmart Access Gateway。

  5. 移行するオブジェクトと詳細設定を設定します。

    パラメーター

    説明

    移行タイプ

    • フルデータ移行のみを実行するには、[スキーマ移行][フルデータ移行] を選択します。

    • データ移行中のサービスの継続性を確保するには、[スキーマ移行][フルデータ移行] 、および [増分データ移行] を選択します。

    説明

    増分データ移行を選択しない場合、データ移行中にソースデータベースにデータを書き込まないことを推奨します。 これにより、ソースデータベースとターゲットデータベース間のデータの整合性が確保されます。

    競合テーブルの処理モード

    • 事前チェックエラーとレポートエラー: ターゲットインスタンスに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットインスタンスに同じテーブル名を持つテーブルが含まれていない場合、事前チェックが渡されます。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。

      説明

      オブジェクト名マッピング機能を使用して、ターゲットインスタンスに同期されるテーブルの名前を変更できます。 ソースデータベースとターゲットインスタンスに同じテーブル名が含まれており、ターゲットインスタンスのテーブルを削除または名前変更できない場合、この機能を使用できます。 詳細については、「マップオブジェクト名」をご参照ください。

    • 宛先テーブルのクリア: 宛先テーブルからデータをクリアします。 作業は慎重に行ってください。

    • エラーを無視して続行: ソースデータベースとターゲットインスタンスの同じテーブル名の事前チェックをスキップします。

      警告

      [エラーを無視して続行] を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

      • ソースデータベースとターゲットデータベースが同じスキーマを持ち、データレコードがターゲットデータベースの既存のデータレコードと同じ主キー値を持つ場合:

        • 完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。

        • 増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。

      • ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが移行されるか、データ移行タスクが失敗します。

    Kafkaのデータ形式

    データレコードがMessage Queue for Apache Kafkaインスタンスに格納される形式。

    • DTS Avroを選択した場合、データはDTS Avroのスキーマ定義に基づいて解析されます。 詳細については、『GitHub』をご参照ください。

    • Canal Jsonを選択した場合、データはCanal JSON形式で保存されます。 関連するパラメーターと例の詳細については、「Kafkaクラスターのデータ形式」トピックの「Canal JSON」セクションをご参照ください。

    Kafkaパーティションへの出荷データのポリシー

    ビジネス要件に基づいて、Kafkaパーティションへのデータ移行のポリシーを選択します。 詳細については、「データをKafkaパーティションに移行するためのポリシーの指定」をご参照ください。

    宛先インスタンスでのオブジェクト名の大文字化

    ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化が、ソースまたはターゲットデータベースのオブジェクト名のデフォルトの大文字化と一致するようにすることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。

    ソースオブジェクト

    ソースオブジェクト セクションから1つ以上のオブジェクトを選択します。 Rightwards arrowアイコンをクリックし、[選択済みオブジェクト] セクションにオブジェクトを追加します。

    説明

    移行するオブジェクトとしてデータベースのみを選択できます。

    [選択済みオブジェクト]

    • 移行先インスタンスに移行するオブジェクトの名前を変更するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 詳細については、「単一オブジェクトの名前のマッピング」をご参照ください。

    • 一度に複数のオブジェクトの名前を変更するには、[選択済みオブジェクト] セクションの右上隅にある [一括編集] をクリックします。 詳細については、「一度に複数のオブジェクト名をマップする」をご参照ください。

    説明
    • オブジェクト名マッピング機能を使用してオブジェクトの名前を変更すると、そのオブジェクトに依存する他のオブジェクトの移行に失敗する可能性があります。

    • データをフィルタリングするWHERE条件を指定するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルター条件の指定」をご参照ください。

    • 特定のデータベースまたはテーブルで実行されたSQL操作を選択するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、移行するSQL操作を選択します。 移行可能なSQL操作の詳細については、「SQL操作の増分移行」をご参照ください。

  6. [次へ: 詳細設定] をクリックします。

    パラメーター

    説明

    Set Alerts

    データ移行タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、移行の待ち時間が指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:

    失敗した接続の再試行時間

    失敗した接続のリトライ時間範囲。 データ移行タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 パラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ移行タスクを再開します。 それ以外の場合、データ移行タスクは失敗します。

    説明
    • ソースまたはターゲットデータベースが同じである複数のデータ移行タスクに対して異なるリトライ時間範囲を設定した場合、設定された最短のリトライ時間範囲が優先されます。

    • DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

    ETLの設定

    抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 詳細については、「ETLとは何ですか? 」をご参照ください。有効な値:

    順方向および逆方向タスクのハートビートテーブル sql を削除

    DTSインスタンスの実行中に、ハートビートテーブルのSQL操作をソースデータベースに書き込むかどうかを指定します。 有効な値:

    • Yes: ハートビートテーブルにSQL操作を書き込みません。 この場合、DTSインスタンスのレイテンシが表示され得る。

    • No: ハートビートテーブルにSQL操作を書き込みます。 この場合、ソースデータベースの物理バックアップやクローニングなどの機能が影響を受ける可能性があります。

  7. ページの下部で、[次へ: タスク設定の保存と事前チェック] をクリックします。

    ポインタを 次:タスク設定の保存と事前チェック に移動し、[OpenAPIパラメーターのプレビュー] をクリックして、関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示できます。

    説明
    • データ移行タスクを開始する前に、DTSは事前チェックを実行します。 データ移行タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • タスクが事前チェックに合格しなかった場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。

    • 事前チェック中にアイテムに対してアラートがトリガーされた場合:

      • アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。

      • アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

  8. 成功率100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。

  9. [インスタンスの購入] ページで、データ移行インスタンスのインスタンスクラスパラメーターを設定します。 下表にパラメーターを示します。

    セクション

    パラメーター

    説明

    新しいインスタンスクラス

    リソースグループ

    データ移行インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「リソース管理とは 」をご参照ください。

    インスタンスクラス

    DTSは、移行速度が異なるインスタンスクラスを提供します。 ビジネスシナリオに基づいてインスタンスクラスを選択できます。 詳細については、「データ移行インスタンスのインスタンスクラス」をご参照ください。

  10. 読んで同意するデータ伝送サービス (従量課金) サービス規約チェックボックスを選択します。

  11. [購入して開始] をクリックします。 表示されるメッセージで、 [OK] をクリックします。

    [データ移行] ページでタスクの進行状況を確認できます。