すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB-X 1.0インスタンスからApsaraMQ for Kafkaインスタンスへのデータの同期

最終更新日:Oct 31, 2024

このトピックでは、data Transmission Service (DTS) を使用して、PolarDB-X 1.0インスタンスからApsaraMQ for Kafkaインスタンスにデータを同期する方法について説明します。

前提条件

  • ソースPolarDB-X 1.0インスタンスが作成されました。 詳細については、次をご参照ください: PolarDB-X 1.0インスタンスの作成

  • ターゲットApsaraMQ for Kafkaインスタンスで、同期データを受信するトピックが作成されます。 詳細については、「概要」をご参照ください。

  • ターゲットApsaraMQ for Kafkaインスタンスの使用可能なストレージ容量が、ソースPolarDB-X 1.0インスタンスのデータの合計サイズよりも大きいこと。

制限事項

カテゴリ

説明

ソースデータベースの制限

  • 同期するテーブルには、PRIMARY KEYまたはUNIQUE制約が必要であり、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。 UNIQUE制約のみがあるテーブルは、スキーマ同期をサポートしません。 したがって、PRIMARY KEY制約があるテーブルを同期することを推奨します。 セカンダリインデックスを持つテーブルは同期できません。

  • 同期するオブジェクトとしてテーブルを選択し、ターゲットデータベースのテーブルを編集する場合 (テーブルや列の名前の変更など) 、1つのデータ同期タスクで最大5,000のテーブルを同期できます。 タスクを実行して5,000を超えるテーブルを同期すると、リクエストエラーが発生します。 この場合、複数のタスクを構成してテーブルをバッチで同期するか、タスクを構成してデータベース全体を同期することをお勧めします。

  • にアタッチされたApsaraDB RDS for MySQLインスタンスのバイナリログの次の要件 PolarDB-X 1.0インスタンスを満たす必要があります。

    • バイナリログ機能が有効になっています。 binlog_row_imageパラメーターの値がfullに設定されています。 それ以外の場合、事前チェック中にエラーメッセージが返され、データ同期タスクを開始できません。

    • 増分データ同期タスクの場合、ソースデータベースのバイナリログは少なくとも24時間保持する必要があります。 完全および増分データ同期タスクの場合、ソースデータベースのバイナリログは少なくとも7日間保持する必要があります。 そうしないと、DTSはバイナリログの取得に失敗し、タスクが失敗する可能性があります。 例外的な状況では、データが矛盾または失われる可能性があります。 完全なデータ同期が完了したら、保持期間を24時間以上に設定できます。 上記の要件に従って、バイナリログの保持期間を設定してください。 そうしないと、DTSのサービスレベル契約 (SLA) に記載されているサービスの信頼性とパフォーマンスを達成できません。

  • ソースデータベースで実行する操作の制限:

    • のネットワークタイプを変更すると、 PolarDB-X 1.0インスタンスのデータ同期中に、ネットワークタイプを変更した後、データ同期タスクのネットワーク接続情報を変更する必要があります。

    • データ同期中は、ソースインスタンスにアタッチされているApsaraDB RDS for MySQLインスタンスをアップグレードまたはダウングレードしたり、ApsaraDB RDS for MySQLインスタンスの論理データベースおよびテーブルに対応する物理データベースおよびテーブルの分布を変更したり、シャードキーを変更したり、ソースオブジェクトに対してDDL操作を実行したりしないでください。 そうしないと、データ同期タスクが失敗するか、データの不一致が発生します。

    • 完全データ同期および増分データ同期中、DTSはセッションレベルで外部キーに対する制約チェックおよびカスケード操作を一時的に無効にします。 データ同期中にソースデータベースに対してカスケード更新および削除操作を実行すると、データの不整合が発生する可能性があります。

    • データ同期中にPolarDB-X 1.0インスタンスのネットワークタイプを変更する場合は、データ同期タスクのネットワーク接続情報も変更する必要があります。

    • 完全データ同期のみを実行する場合は、データ同期中にソースデータベースにデータを書き込まないでください。 そうしないと、ソースデータベースとターゲットデータベースの間でデータの不一致が発生する可能性があります。 データの一貫性を確保するために、同期タイプとしてスキーマ同期、完全データ同期、および増分データ同期を選択することを推奨します。

その他の制限

  • PolarDB-X 1.0インスタンスからのデータ同期は分散同期です。 PolarDB-X 1.0インスタンスにアタッチされているApsaraDB RDS for MySQLインスタンスごとに1つのデータ同期サブタスクのみを設定できます。 タスクトポロジ内の各サブタスクのステータスを照会できます。

  • データ同期中にターゲットApsaraMQ for Kafkaインスタンスがアップグレードまたはダウングレードされた場合、インスタンスを再起動する必要があります。

  • データを同期する前に、ソースデータベースとターゲットデータベースのパフォーマンスに対するデータ同期の影響を評価します。 オフピーク時にデータを同期することを推奨します。 最初の完全データ同期中、DTSはソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。 これにより、データベースサーバーの負荷が増加する可能性があります。

  • 初期の完全データ同期中に、同時INSERT操作により、ターゲットデータベースのテーブルが断片化されます。 完全なデータ同期が完了すると、ターゲットデータベースの使用されるテーブルスペースのサイズは、ソースデータベースのサイズよりも大きくなります。

  • 同期するオブジェクトに対してDDL操作を実行するために、gh-ostまたはpt-online-schema-changeを使用しないことを推奨します。 そうしないと、データ同期が失敗する可能性があります。

  • データ同期中は、DTSのみを使用してデータをターゲットデータベースに書き込むことをお勧めします。 これにより、ソースデータベースとターゲットデータベース間のデータの不一致が防止されます。 DTS以外のツールを使用してターゲットデータベースにデータを書き込む場合、DMSを使用してオンラインDDL操作を実行すると、ターゲットデータベースでデータ損失が発生する可能性があります。

サポートしている同期トポロジ

  • 一方向の 1 対 1 の同期

  • 一方向の 1 対多の同期

  • 一方向のカスケード同期

  • 一方向の多対 1 の同期

DTSでサポートされている同期トポロジの詳細については、「同期トポロジ」をご参照ください。

同期可能なSQL操作

操作タイプ

SQL文

DML

挿入、更新、および削除

データベースアカウントに必要な権限

データベース

必要な権限

関連ドキュメント

ソースPolarDB-X 1.0インスタンス

同期するオブジェクトの読み取り権限。

アカウントの管理

手順

  1. 新しいDTSコンソールのデータ同期タスクページに移動します。

    説明

    データ管理 (DMS) コンソール にログインすることもできます。 上部のナビゲーションバーで、ポインタを [データ開発] に移動し、[DTS (DTS)] > [データ同期] を選択します。

  2. 上部のナビゲーションバーで、データ同期タスクを作成するリージョンを選択します。

  3. [タスクの作成] をクリックします。 表示されるページで、ソースデータベースとターゲットデータベースを設定します。

    カテゴリ

    パラメーター

    説明

    非該当

    タスク名

    DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。

    ソースデータベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    移行元ディスクのタイプを設定します。 [PolarDB-X 1.0] を選択します。

    アクセス方法

    ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。

    インスタンスリージョン

    ソースPolarDB-X 1.0インスタンスが存在するリージョン。

    Alibaba Cloudアカウント全体でのデータの複製

    同じAlibaba Cloudアカウント内でデータが同期される場合は、[いいえ] を選択します。

    インスタンスID

    ソースPolarDB-X 1.0インスタンスのID。

    データベースアカウント

    ソースPolarDB-Xデータベースインスタンスの1.0アカウント。 アカウントに必要な権限の詳細については、このトピックの「データベースアカウントに必要な権限」をご参照ください。

    データベースパスワード

    データベースインスタンスへのアクセスに使用されるパスワード。

    宛先データベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    ターゲットデータベースのタイプ。 Kafkaを選択します。

    アクセス方法

    ターゲットデータベースのアクセス方法。 Express Connect、VPN Gateway、またはSmart Access Gatewayを選択します。

    説明

    Alibaba Cloudインスタンスはサポートされていません。

    インスタンスリージョン

    ターゲットApsaraMQ for Kafkaインスタンスが存在するリージョン。

    接続済みVPC

    ターゲットApsaraMQ for Kafkaインスタンスが属する仮想プライベートクラウド (VPC) のID。 ApsaraMQ for Kafkaコンソールにログインし、ApsaraMQ for Kafkaインスタンスの [インスタンスの詳細] ページに移動します。 [インスタンス情報] タブの [設定情報] セクションで、VPC IDを表示します。

    ドメイン名または IP アドレス

    宛先ApsaraMQ for KafkaインスタンスのIPアドレス。

    説明

    ApsaraMQ for KafkaインスタンスのIPアドレスを取得するには、次の操作を実行します。ApsaraMQ for Kafkaコンソールにログインし、ApsaraMQ for Kafkaインスタンスの [インスタンスの詳細] ページに移動します。 [インスタンス情報] タブの [エンドポイント情報] セクションで、Default EndpointパラメーターからIPアドレスを取得します。

    ポート番号

    宛先ApsaraMQ for Kafkaインスタンスのサービスポート番号。 デフォルト値: 9092

    データベースアカウント

    ターゲットApsaraMQ for Kafkaインスタンスのデータベースアカウント。

    説明

    データベースアカウントとデータベースパスワードは、アクセス制御リスト (ACL) 機能が有効になっているApsaraMQ for Kafkaインスタンスにのみ必要です。 ACL機能を有効にする方法の詳細については、「SASLユーザーへの権限の付与」をご参照ください。

    データベースのパスワード

    Kafkaバージョン

    ターゲットApsaraMQ for Kafkaインスタンスのバージョン。

    暗号化

    接続先インスタンスへの接続を暗号化するかどうかを指定します。 ビジネスとセキュリティの要件に基づいて、[非暗号化] または [SCRAM-SHA 256] を選択します。

    トピック

    同期されたデータの受信に使用されるトピック。 ドロップダウンリストからトピックを選択します。

    DDL情報を保存するトピック

    DDL情報を格納するために使用されるトピック。 ドロップダウンリストからトピックを選択します。 このパラメーターを指定しない場合、DDL情報はtopicパラメーターで指定されたトピックに格納されます。

    Kafkaスキーマレジストリの使用

    Kafka Schema Registryを使用するかどうかを指定します。 Kafka Schema Registryは、メタデータの提供レイヤーを提供します。 Avroスキーマを保存および取得するためのRESTful APIを提供します。 有効な値:

    • いいえ: Kafka Schema Registryを使用しません。

    • はい: Kafka Schema Registryを使用します。 この場合、AvroスキーマのKafka Schema Registryに登録されているURLまたはIPアドレスを入力する必要があります。

  4. ページの下部で、接続性をテストして続行をクリックします。

    ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLインスタンスやApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 データベースが複数のECSインスタンスにデプロイされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」トピックの「DTSサーバーのCIDRブロック」セクションをご参照ください。

    警告

    DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。

  5. タスクに同期するオブジェクトを選択し、詳細設定を構成します。

    パラメーター

    説明

    同期タイプ

    同期タイプ。 デフォルトでは、増分データ同期が選択されています。 [スキーマ同期] および [完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTSは選択したオブジェクトの履歴データをソースデータベースからターゲットクラスターに同期します。 履歴データは、その後の増分同期の基礎となる。

    説明

    [フルデータ同期] が選択されている場合、CREATE TABLEステートメントを使用して作成されたテーブルのスキーマとデータをターゲットデータベースに同期できます。

    競合テーブルの処理モード

    • エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。

      説明

      ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。

    • エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。

      警告

      エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

      • ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:

        • 完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。

        • 増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。

      • ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。

    Kafka のデータ形式

    ApsaraMQ for Kafkaインスタンスにデータが格納される形式。

    • DTS Avroを選択した場合、データはDTS Avroのスキーマ定義に基づいて解析されます。 詳細については、『GitHub』をご参照ください。

    • Canal Jsonを選択した場合、データはCanal JSON形式で保存されます。 関連するパラメーターと例の詳細については、「Kafkaクラスターのデータ形式」トピックの「Canal Json」セクションをご参照ください。

    説明

    PolarDB-X 1.0はCanal Jsonをサポートしていません。 デフォルトでは、DTS Avroが選択されています。

    Kafka パーティションへのデータ転送ポリシー

    この機能はサポートされていません。

    宛先インスタンスでのオブジェクト名の大文字化

    ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。

    ソースオブジェクト

    ソースオブジェクト セクションから1つ以上のオブジェクトを選択し、向右アイコンをクリックして 選択中のオブジェクト セクションにオブジェクトを追加します。

    説明

    同期するオブジェクトとしてテーブルを選択できます。

    同期するオブジェクトとしてデータベース全体を選択した場合、ソースデータベースにテーブルを作成したり、ソースデータベースからテーブルを削除したりするために行われた変更は、同期されません。

    [選択済みオブジェクト]

    • 同期先のインスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名のマップ」トピックの「単一オブジェクトの名前のマップ」セクションをご参照ください。

    • 一度に複数のオブジェクトの名前を変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名のマップ」トピックの「一度に複数のオブジェクト名をマップする」セクションをご参照ください。

    説明

    データをフィルタリングするWHERE条件を指定するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルター条件の指定」をご参照ください。

  6. [次へ: 詳細設定] をクリックして詳細設定を構成します。

    パラメーター

    説明

    専用クラスターから共有クラスターに DTS インスタンスを移行

    この例では、タスクのスケジュールに使用するDTS専用クラスターを選択する必要はありません。 詳細については、「DTS専用クラスターとは 」をご参照ください。

    Set Alerts

    データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:

    失敗した接続のリトライ時間範囲の指定

    失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    説明
    • ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。

    • DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

    ソースデータベースとターゲットデータベースで他の問題が発生した場合の再試行前の待機時間

    その他の問題の再試行時間範囲。 たとえば、データ同期タスクの開始後にDDLまたはDML操作の実行に失敗した場合、DTSはその時間範囲内ですぐに操作を再試行します。 有効な値: 1 ~ 1440 単位: 分。 デフォルト値は 10 です。 このパラメーターを10より大きい値に設定することを推奨します。 指定された時間範囲内で失敗した操作が正常に実行されると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    重要

    移行元データベースと移行先データベースで他の問題が発生した場合の、再試行までの待機時間です。 パラメーターの値は、失敗した接続の再試行時間 パラメーターの値よりも小さくする必要があります。

    ETLの設定

    抽出、変換、および読み込み (ETL) 機能を設定するかどうかを指定します。 ETLの詳細については、「ETLとは何ですか? 」をご参照ください。

  7. タスク設定を保存し、事前チェックを実行します。

    • 関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。

    • パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。

    説明
    • データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。

    • 事前チェック中にアイテムに対してアラートがトリガーされた場合:

      • アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。

      • アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

  8. 成功率100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。

  9. 購入ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表にパラメーターを示します。

    セクション

    パラメーター

    説明

    新しいインスタンスクラス

    Billing Method

    • サブスクリプション: データ同期インスタンスの作成時にサブスクリプションの料金を支払います。 サブスクリプションの課金方法は、長期使用の場合、従量課金の課金方法よりも費用対効果が高くなります。

    • 従量課金: 従量課金インスタンスは1時間ごとに課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。

    リソースグループの設定

    データ同期インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「リソース管理とは 」をご参照ください。

    インスタンスクラス

    DTSは、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。

    サブスクリプション期間

    サブスクリプションの課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1〜9か月、1年、2年、3年、または5年とすることができる。

    説明

    このパラメーターは、サブスクリプション の課金方法を選択した場合にのみ使用できます。

  10. データ伝送サービス (従量課金) サービス規約を読んで選択します。

  11. [購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。

    タスクリストでタスクの進行状況を確認できます。