すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB-X 2.0インスタンスのデータをMaxComputeプロジェクトに同期する

最終更新日:Oct 31, 2024

このトピックでは、data Transmission Service (DTS) を使用して、PolarDB-XインスタンスからMaxComputeプロジェクトにデータを同期する方法について説明します。

前提条件

制限事項

説明

DTSは、ソースデータベースの外部キーをターゲットデータベースに同期しません。 したがって、ソースデータベースのカスケードおよび削除操作は、ターゲットデータベースと同期されません。

カテゴリ

説明

ソースデータベースの制限

  • 移行するテーブルには、PRIMARY KEYまたはUNIQUE制約が必要であり、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。

  • 同期するオブジェクトとしてテーブルを選択し、ターゲットデータベースのテーブルを編集する必要がある場合 (テーブルまたは列の名前の変更など) 、1つのデータ同期タスクで最大5,000のテーブルを同期できます。 タスクを実行して5,000を超えるテーブルを同期すると、リクエストエラーが発生します。 この場合、複数のタスクを構成してテーブルをバッチで同期するか、タスクを構成してデータベース全体を同期することをお勧めします。

  • バイナリログの次の要件を満たす必要があります。

    • PolarDB-X 2.0コンソールでバイナリログ機能を有効にする必要があります。 詳細については、「パラメーター設定」をご参照ください。 binlog_row_imageパラメーター値がfullに設定されています。 それ以外の場合、事前チェック中にエラーメッセージが返され、データ同期タスクを開始できません。

    • 増分データ同期のみを実行する場合、ソースデータベースのバイナリログは少なくとも24時間保持されます。 完全データ同期と増分データ同期の両方を実行する場合、ソースデータベースのバイナリログは少なくとも7日間保持されます。 完全なデータ同期が完了したら、保持期間を24時間以上に設定できます。 そうしないと、DTSはバイナリログの取得に失敗し、タスクが失敗する可能性があります。 例外的な状況では、データの不整合または損失が発生します。 上記の要件に従って、バイナリログの保持期間を設定してください。 そうしないと、DTSのサービスレベル契約 (SLA) に記載されているサービスの信頼性とパフォーマンスを達成できません。

  • PolarDB-XインスタンスはMySQL V5.7と互換性がある必要があります。

その他の制限

  • 同期するオブジェクトとして選択できるのはテーブルのみです。

  • データを同期する前に、ソースデータベースとターゲットデータベースのパフォーマンスに対するデータ同期の影響を評価します。 オフピーク時にデータを同期することを推奨します。 最初の完全データ同期中、DTSはソースデータベースとターゲットデータベースの読み取りおよび書き込みリソースを使用します。 これにより、データベースサーバーの負荷が増加する可能性があります。

  • 初期の完全データ同期中に、同時INSERT操作により、ターゲットデータベースのテーブルが断片化されます。 完全なデータ同期が完了すると、ターゲットデータベースの使用されるテーブルスペースのサイズは、ソースデータベースのサイズよりも大きくなります。

  • オブジェクトに対してDDL操作を実行するためにpt-online-schema-changeを使用しないことを推奨します。 そうしないと、データ同期が失敗する可能性があります。

  • データ同期中は、DTSのみを使用してデータをターゲットデータベースに書き込むことをお勧めします。 これにより、ソースデータベースとターゲットデータベース間のデータの不一致が防止されます。 DTS以外のツールを使用してターゲットデータベースにデータを書き込む場合、DMSを使用してオンラインDDL操作を実行すると、ターゲットデータベースでデータ損失が発生する可能性があります。

  • MaxComputeはPRIMARY KEY制約をサポートしていません。 ネットワークエラーが発生した場合、DTSは重複するデータレコードをMaxComputeプロジェクトに同期する可能性があります。

注意事項

DTSは、バイナリログファイルの位置を移動するようにスケジュールされたソースデータベースの 'dts_health_check '.'ha_health_check' テーブルを更新します。

課金

同期タイプ

タスク設定料金

スキーマ同期と完全データ同期

無料です。

増分データ同期

有料。 詳細については、「課金の概要」をご参照ください。

同期可能なSQL操作

操作タイプ

SQL文

DML

挿入、更新、および削除

手順

  1. [データ同期タスク] ページに移動します。

    1. データ管理 (DMS) コンソールにログインします。

    2. 上部のナビゲーションバーで、DTSをクリックします。

    3. 左側のナビゲーションウィンドウで、DTS (DTS) > データ同期を選択します。

    説明
  2. データ同期タスクの右側で、データ同期インスタンスが存在するリージョンを選択します。

    説明

    新しいDTSコンソールを使用する場合は、上部のナビゲーションバーでデータ同期インスタンスが存在するリージョンを選択する必要があります。

  3. [タスクの作成] をクリックします。 [データ同期タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。

    セクション

    パラメーター

    説明

    非該当

    タスク名

    DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。

    ソースデータベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    移行元ディスクのタイプを設定します。 [PolarDB-X 2.0] を選択します。

    アクセス方法

    ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。

    インスタンスリージョン

    PolarDB-Xインスタンスが存在するリージョン。

    データベースアカウント

    ソースPolarDB-Xインスタンスのデータベースアカウント。 アカウントには、同期するオブジェクトに対するSELECT権限、およびREPLICATION CLIENT権限とREPLICATION SLAVE権限が必要です。

    説明

    データベースアカウントに権限を付与する方法の詳細については、「PolarDB-Xのデータ同期ツール」をご参照ください。

    データベースパスワード

    データベースインスタンスへのアクセスに使用されるパスワード。

    宛先データベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    ターゲットデータベースのタイプ。 [MaxCompute] を選択します。

    アクセス方法

    ターゲットデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。

    インスタンスリージョン

    ターゲットMaxComputeプロジェクトが存在するリージョン。

    プロジェクト

    ターゲットMaxComputeプロジェクトの名前。 DataWorksコンソールの [ワークスペース] ページでプロジェクトを検索できます。

    accessKeyId

    ターゲットMaxComputeプロジェクトへの接続に使用するアカウントのAccessKey ID。 AccessKey IDの取得方法の詳細については、「AccessKeyペアの取得」をご参照ください。

    accessSecret

    ターゲットMaxComputeプロジェクトへの接続に使用するアカウントのAccessKeyシークレット。 AccessKeyシークレットの取得方法の詳細については、「AccessKeyペアの取得」をご参照ください。

  4. ページの下部で、接続性をテストして続行をクリックします。

    ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLインスタンスやApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできるようにする必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。

    警告

    DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。

  5. [OK] をクリックして、MaxComputeアカウントに権限を付与します。

  6. 同期するオブジェクトと詳細設定を設定します。

    パラメーター

    説明

    増分データテーブルのパーティション定義

    ビジネス要件に基づいてパーティション名を選択します。 パーティションの詳細については、 「パーティション」をご参照ください。

    同期タイプ

    同期タイプ。 デフォルトでは、増分データ同期が選択されています。 [スキーマ同期] および [完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTSは選択したオブジェクトの履歴データをソースデータベースからターゲットクラスターに同期します。 履歴データは、その後の増分同期の基礎となる。

    競合テーブルの処理モード

    • エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。

      説明

      ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。

    • エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。

      警告

      エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

      • ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:

        • 完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。

        • 増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。

      • ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。

    追加列の命名規則

    DTSがMaxComputeにデータを同期した後、DTSは宛先テーブルに追加の列を追加します。 追加の列の名前がターゲットテーブルの既存の列の名前と同じである場合、データ同期は失敗します。 ビジネス要件に基づいて、[新しいルール] または [以前のルール] を選択します。

    警告

    このパラメーターを指定する前に、宛先テーブルの追加の列と既存の列に名前の競合があるかどうかを確認してください。 追加の列の命名規則と定義の詳細については、「追加の列の命名規則」をご参照ください。

    宛先インスタンスでのオブジェクト名の大文字化

    ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。

    ソースオブジェクト

    ソースオブジェクト セクションから1つ以上のオブジェクトを選択し、向右アイコンをクリックして 選択中のオブジェクト セクションにオブジェクトを追加します。

    説明

    同期するオブジェクトとして、列、テーブル、またはデータベースを選択できます。 同期するオブジェクトとしてテーブルまたは列を選択した場合、DTSはビュー、トリガー、ストアドプロシージャなどの他のオブジェクトをターゲットデータベースに同期しません。

    [選択済みオブジェクト]

    • 同期先のインスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名のマップ」トピックの「単一オブジェクトの名前のマップ」セクションをご参照ください。

    • 一度に複数のオブジェクトの名前を変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名のマップ」トピックの「一度に複数のオブジェクト名をマップする」セクションをご参照ください。

    説明
    • 特定のデータベースまたはテーブルで実行されるSQL操作を選択するには、次の手順を実行します。[選択されたオブジェクト] セクションで、オブジェクトを右クリックします。 表示されるダイアログボックスで、同期するSQL操作を選択します。 同期できるSQL操作の詳細については、このトピックの「同期できるSQL操作」をご参照ください。

    • データをフィルタリングするWHERE条件を指定するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルター条件の指定」をご参照ください。

  7. [次へ:詳細設定] をクリックして詳細設定を構成します。

    パラメーター

    説明

    Set Alerts

    データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:

    失敗した接続のリトライ時間範囲の指定

    失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    説明
    • ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。

    • DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

    ETLの設定

    抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 詳細については、「ETLとは何ですか? 」をご参照ください。有効な値:

  8. タスク設定を保存し、事前チェックを実行します。

    • 関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。

    • パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。

    説明
    • データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。

    • 事前チェック中にアイテムに対してアラートがトリガーされた場合:

      • アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。

      • アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

  9. 成功率100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。

  10. 購入ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表にパラメーターを示します。

    セクション

    パラメーター

    説明

    新しいインスタンスクラス

    Billing Method

    • サブスクリプション: データ同期インスタンスの作成時にサブスクリプションの料金を支払います。 サブスクリプションの課金方法は、長期使用の場合、従量課金の課金方法よりも費用対効果が高くなります。

    • 従量課金: 従量課金インスタンスは1時間ごとに課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。

    リソースグループの設定

    データ同期インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「リソース管理とは 」をご参照ください。

    インスタンスクラス

    DTSは、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。

    サブスクリプション期間

    サブスクリプションの課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1〜9か月、1年、2年、3年、または5年とすることができる。

    説明

    このパラメーターは、サブスクリプション の課金方法を選択した場合にのみ使用できます。

  11. データ伝送サービス (従量課金) サービス規約を読んで選択します。

  12. [購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。

    タスクリストでタスクの進行状況を確認できます。

増分データテーブルのスキーマ

説明

MaxComputeプロジェクトのフルテーブルスキャンを許可するには、MaxComputeでset odps.sql.allow.fullscan=true; ステートメントを実行する必要があります。

DTSは、ソースPolarDB-X 2.0インスタンスで生成された増分データをMaxComputeの増分データテーブルに同期します。 増分データテーブルは、増分データおよび特定のメタデータを格納する。 次の図は、増分データテーブルのスキーマを示しています。

Schema of an incremental data table

説明

この例では、modifytime_yearmodifytime_monthmodifytime_daymodifytime_hour、およびmodifytime_minuteフィールドがパーティションキーを形成します。 これらのフィールドはステップ6で指定されます。

増分データテーブルのスキーマ

項目

説明

record_id

増分ログエントリの一意のID。

説明
  • IDは、新しいログエントリごとに自動インクリメントされます。

  • UPDATE操作が実行されると、DTSは2つの増分ログエントリを生成して、更新前と更新後の値を記録します。 2つの増分ログエントリは同じレコードIDを持ちます。

operation_flag

操作のタイプ。 有効な値:

  • I: INSERT操作

  • D: DELETE操作

  • U: UPDATE操作

utc_timestamp

操作のタイムスタンプ (UTC) 。 また、バイナリログファイルのタイムスタンプでもあります。

before_flag

列の値が更新前の値かどうかを示します。 有効値: YとN。

after_flag

列の値が更新後の値であるかどうかを示します。 有効値: YとN。

before_flagおよびafter_flagフィールドに関する追加情報

増分ログエントリのbefore_flagフィールドとafter_flagフィールドは、操作の種類に応じて定義されます。

  • INSERT

    INSERT操作の場合、列の値は新しく挿入されたレコードの値 (更新後の値) です。 before_flagフィールドの値はNであり、after_flagフィールドの値はYである。

    Example of an INSERT operation

  • UPDATE

    DTSは、UPDATE操作用に2つの増分ログエントリを生成します。 2つの増分ログエントリは、record_id、operation_flag、およびutc_timestampフィールドに同じ値を持ちます。

    第1のログエントリは、更新前の値を記録するので、before_flagフィールドの値はYであり、after_flagフィールドの値はNである。第2のログエントリは、更新後の値を記録するので、before_flagフィールドの値はNであり、after_flagフィールドの値はYである。

    Example of an UPDATE operation

  • DELETE

    DELETE操作の場合、列の値は削除されたレコードの値 (更新前の値) です。 before_flagフィールドの値はYであり、after_flagフィールドの値はNである。

    Example of a DELETE operation

完全なベースラインテーブルと増分データテーブルのマージ

データ同期タスクが開始されると、DTSはMaxComputeに完全なベースラインテーブルと増分データテーブルを作成します。 SQL文を使用して、2つのテーブルをマージできます。 これにより、特定の時点で完全なデータを取得できます。

このセクションでは、customerという名前のテーブルのデータをマージする方法について説明します。 次の図は、顧客テーブルのスキーマを示しています。

Schema of the customer table

  1. ソーステーブルのスキーマに基づいて、MaxComputeでテーブルを作成します。 テーブルは、マージされたデータを格納するために使用されます。

    たとえば、1565944878の時点で顧客テーブルの完全なデータを取得できます。 次のSQL文を実行して、必要なテーブルを作成します。

    CREATE TABLE `customer_1565944878` (
        `id` bigint NULL,
        `register_time` datetime NULL,
        `address` string);
    説明
  2. MaxComputeで次のSQL文を実行して、完全なベースラインテーブルと増分データテーブルをマージし、特定の時点で完全なデータを取得します。

    set odps.sql.allow.fullscan=true;
    insert overwrite table <result_storage_table>
    select <col1>,
           <col2>,
           <colN>
      from(
    select row_number() over(partition by t.<primary_key_column>
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, <col1>, <col2>, <colN>
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>, incr.<col2>,incr.<colN>
      from <table_log> incr
     where utc_timestamp< <timestamp>
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>, base.<col2>,base.<colN>
      from <table_base> base) t) gt
    where row_number=1 
      and after_flag='Y'
    説明
    • <result_storage_table>: マージされたデータを格納するテーブルの名前。

    • <col1>/<col2>/<colN>: マージするテーブル内の列の名前。

    • <primary_key_column>: マージするテーブルの主キー列の名前。

    • <table_log>: 増分データテーブルの名前。

    • <table_base>: 完全なベースラインテーブルの名前。

    • <timestamp>: フルデータが取得されたときに生成されるタイムスタンプ。

    次のSQL文を実行して、1565944878の時点で顧客テーブルの完全なデータを取得します。

    set odps.sql.allow.fullscan=true;
    insert overwrite table customer_1565944878
    select id,
           register_time,
           address
      from(
    select row_number() over(partition by t.id
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, register_time, address
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.register_time, incr.address
      from customer_log incr
     where utc_timestamp< 1565944878
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.register_time, base.address
      from customer_base base) t) gt
     where gt.row_number= 1
       and gt.after_flag= 'Y';
  3. customer_1565944878テーブルからマージされたデータを照会します。

    Query the merged data