すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:PolarDB for MySQLクラスターのデータをDataHubプロジェクトに同期する

最終更新日:Oct 31, 2024

このトピックでは、data Transmission Service (DTS) を使用して、PolarDB for MySQLクラスターの増分データをDataHubプロジェクトにリアルタイムで同期する方法について説明します。 データを同期した後、Realtime Compute for Apache Flinkなどのビッグデータサービスを使用して、データをリアルタイムで分析できます。

前提条件

制限事項

説明

DTSは、ソースデータベースの外部キーをターゲットデータベースに同期しません。 したがって、ソースデータベースのカスケードおよび削除操作は、ターゲットデータベースと同期されません。

カテゴリ

説明

ソースデータベースの制限

  • 同期するテーブルには、PRIMARY KEYまたはUNIQUE制約が必要であり、すべてのフィールドが一意である必要があります。 そうでない場合、宛先データベースは重複するデータレコードを含み得る。

  • 同期するオブジェクトとしてテーブルを選択し、ターゲットデータベースのテーブルや列の名前の変更など、テーブルを変更する必要がある場合は、1つのデータ同期タスクで最大1,000のテーブルを同期できます。 タスクを実行して1,000を超えるテーブルを同期すると、リクエストエラーが発生します。 この場合、複数のタスクを構成してテーブルを同期するか、タスクを構成してデータベース全体を同期することをお勧めします。

  • 増分データを同期する必要がある場合は、バイナリログ機能を有効にし、loose_polar_log_binパラメーターをonに設定する必要があります。 それ以外の場合、事前チェック中にエラーメッセージが返され、データ同期タスクを開始できません。 詳細については、「バイナリログの有効化」および「パラメーターの変更」をご参照ください。

    説明
    • PolarDB for MySQLクラスターのバイナリログ機能を有効にすると、バイナリログによって占有されているストレージ容量に対して課金されます。

    • 増分データ同期タスクの場合、ソースデータベースのバイナリログは少なくとも24時間保持されます。 そうしないと、DTSはバイナリログの取得に失敗し、タスクが失敗する可能性があります。 例外的な状況では、データの不整合または損失が発生します。 上記の要件に基づいて、バイナリログの保持期間を設定してください。 そうしないと、DTSのSLAに記載されているサービスの信頼性またはパフォーマンスが保証されない場合があります。

  • スキーマ同期および完全データ同期中は、DDL文を実行してデータベースまたはテーブルのスキーマを変更しないでください。 それ以外の場合、データ同期タスクは失敗します。

その他の制限

  • ターゲットDataHubプロジェクトの1つの文字列の長さは2 MBを超えることはできません。

  • 同期するオブジェクトとして選択できるのは、テーブルとデータベースだけです。

  • DTSは、ソースPolarDB for MySQLクラスターの読み取り専用ノードを同期しません。

  • DTSは、ソースPolarDB for MySQLクラスターからObject Storage Service (OSS) 外部テーブルを同期しません。

  • データ同期中にソーステーブルでDDL操作を実行するためにpt-online-schema-changeなどのツールを使用しないことをお勧めします。 それ以外の場合、データ同期タスクは失敗します。

  • データ同期中に他のソースからのデータがターゲットデータベースに書き込まれない場合は、data Management (DMS) を使用して、ソーステーブルに対してオンラインDDL操作を実行できます。 詳細については、「ロックフリーDDL操作の実行」をご参照ください。

  • データ同期中に他のソースからのデータがターゲットデータベースに書き込まれると、ソースデータベースとターゲットデータベース間のデータの不一致が発生します。 たとえば、他のソースからのデータがターゲットデータベースに書き込まれているときにDMSを使用してオンラインDDLステートメントを実行すると、ターゲットデータベースでデータが失われる可能性があります。

特別なケース

DTSは、CREATE DATABASE IF NOT EXISTS 'test' ステートメントをソースデータベースで実行し、バイナリログファイルの位置を前に移動します。

課金

同期タイプ

タスク設定料金

スキーマ同期と完全データ同期

無料です。

増分データ同期

有料。 詳細については、「課金の概要」をご参照ください。

サポートしている同期トポロジ

  • 一方向の 1 対 1 の同期

  • 一方向の 1 対多の同期

  • 一方向の多対 1 の同期

  • 一方向のカスケード同期

DTSでサポートされている同期トポロジの詳細については、「同期トポロジ」をご参照ください。

同期可能なSQL操作

挿入、更新、および削除

データベースアカウントに必要な権限

ソースPolarDB for MySQLクラスターのデータベースアカウントには、同期するオブジェクトに対する読み取り権限以上が必要です。

手順

説明

この手順は、新しいDTSコンソールに基づいて説明します。 DTSコンソールとデータ管理 (DMS) コンソールのDTSモジュールの間で操作に不一致が発生した場合は、DMSコンソールが優先されます。

  1. [データ同期タスク] ページに移動します。

    1. データ管理 (DMS) コンソールにログインします。

    2. 上部のナビゲーションバーで、DTSをクリックします。

    3. 左側のナビゲーションウィンドウで、DTS (DTS) > データ同期を選択します。

    説明
  2. データ同期タスクの右側で、データ同期インスタンスが存在するリージョンを選択します。

    説明

    新しいDTSコンソールを使用する場合は、上部のナビゲーションバーでデータ同期インスタンスが存在するリージョンを選択する必要があります。

  3. [タスクの作成] をクリックします。 [データ同期タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。

    セクション

    パラメーター

    説明

    非該当

    タスク名

    DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。

    ソースデータベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    移行元ディスクのタイプを設定します。 [PolarDB for MySQL] を選択します。

    アクセス方法

    ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。

    インスタンスリージョン

    ソースPolarDB for MySQLクラスターが存在するリージョン。

    Alibaba Cloudアカウント全体でのデータの複製

    Alibaba Cloudアカウント間でデータを同期するかどうかを指定します。 この例では、[いいえ] が選択されています。

    PolarDBクラスターID

    ソースPolarDB for MySQLクラスターのID。

    データベースアカウント

    ソースPolarDB for MySQLクラスターのデータベースアカウント。 アカウントに必要な権限の詳細については、このトピックの「データベースアカウントに必要な権限」をご参照ください。

    データベースパスワード

    データベースインスタンスへのアクセスに使用されるパスワード。

    宛先データベース

    既存のDMSデータベースインスタンスの選択

    使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。

    • 既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。

    • 既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。

    データベースタイプ

    ターゲットデータベースのタイプ。 [DataHub] を選択します。

    アクセス方法

    ターゲットデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。

    インスタンスリージョン

    ターゲットDataHubプロジェクトが存在するリージョン。

    プロジェクト

    データの同期先のDataHubプロジェクト。

  4. ページの下部で、[接続をテストして続行] をクリックします。

    ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLインスタンスやApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできることを確認する必要があります。 データベースが複数のECSインスタンスにデプロイされている場合、DTSサーバーのCIDRブロックを各ECSインスタンスのセキュリティグループルールに手動で追加する必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」トピックの「DTSサーバーのCIDRブロック」セクションをご参照ください。

    警告

    DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。

  5. 同期するオブジェクトと詳細設定を設定します。

    パラメーター

    説明

    同期タイプ

    デフォルトでは、増分データ同期が選択されています。 スキーマ同期のみ選択できます。 [フルデータ同期] は選択できません。

    説明

    初期スキーマ同期中に、DTSは、ソースデータベースのテーブルなど、選択したオブジェクトのスキーマをターゲットDataHubプロジェクトに同期します。

    競合テーブルの処理モード

    • エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。

      説明

      ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。

    • エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。

      警告

      エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

      • ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:

        • 完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。

        • 増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。

      • ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。

    追加列の命名規則

    DTSがデータをDataHubプロジェクトに同期すると、DTSはターゲットトピックに列を追加します。 追加の列の名前がターゲットトピックの既存の列の名前と同じである場合、データ同期タスクは失敗します。 [はい] または [いいえ] を選択して、ビジネス要件に基づいて追加の列の新しい命名規則を有効にするかどうかを指定できます。

    警告

    このパラメーターを設定する前に、追加の列にターゲットトピックの既存の列と名前が競合しているかどうかを確認します。 そうしないと、データ同期タスクが失敗したり、データが失われたりします。 詳細については、「追加の列の命名規則の変更」トピックの「追加の列の命名規則」セクションをご参照ください。

    宛先インスタンスでのオブジェクト名の大文字化

    ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。

    ソースオブジェクト

    [ソースオブジェクト] セクションから1つ以上のオブジェクトを選択し、Rightwards arrowアイコンをクリックして [選択済みオブジェクト] セクションにオブジェクトを追加します。

    説明

    同期するオブジェクトとしてテーブルまたはデータベースを選択できます。

    [選択済みオブジェクト]

    • 同期先のインスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名のマップ」トピックの「単一オブジェクトの名前のマップ」セクションをご参照ください。

    • 一度に複数のオブジェクトの名前を変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名のマップ」トピックの「一度に複数のオブジェクト名をマップする」セクションをご参照ください。

  6. [次へ:詳細設定] をクリックして詳細設定を構成します。

    パラメーター

    説明

    Set Alerts

    データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:

    失敗した接続のリトライ時間範囲の指定

    失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。

    説明
    • ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。

    • DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。

    ETLの設定

    抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 ビジネス要件に基づいて、[はい] または [いいえ] を選択します。 [はい] を選択した場合、コードエディターにドメイン固有言語 (DSL) ステートメントを入力する必要があります。 詳細については、「データ移行または同期タスクでのETLの設定」をご参照ください。

  7. オプション: [選択したオブジェクト] セクションで、同期するトピックの名前の上にポインターを移動し、右クリックします。 表示されるダイアログボックスで、テーブルまたはデータベースの名前を変更し、ビジネス要件に基づいてパーティション分割のシャードキーを設定します。

  8. タスク設定を保存し、事前チェックを実行します。

    • 関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。

    • パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。

    説明
    • データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。

    • データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。

    • 事前チェック中にアイテムに対してアラートがトリガーされた場合:

      • アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。

      • アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。

  9. 成功率100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。

  10. 購入ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表にパラメーターを示します。

    セクション

    パラメーター

    説明

    新しいインスタンスクラス

    Billing Method

    • サブスクリプション: データ同期インスタンスの作成時にサブスクリプションの料金を支払います。 サブスクリプションの課金方法は、長期使用の場合、従量課金の課金方法よりも費用対効果が高くなります。

    • 従量課金: 従量課金インスタンスは1時間ごとに課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。

    リソースグループの設定

    データ同期インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「リソース管理とは 」をご参照ください。

    インスタンスクラス

    DTSは、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。

    サブスクリプション期間

    サブスクリプションの課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1〜9か月、1年、2年、3年、または5年とすることができる。

    説明

    このパラメーターは、サブスクリプション の課金方法を選択した場合にのみ使用できます。

  11. データ伝送サービス (従量課金) サービス規約を読んで選択します。

  12. [購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。

    タスクリストでタスクの進行状況を確認できます。

DataHubトピックのスキーマ

DTSが増分データをDataHubトピックに同期すると、DTSはトピック内のメタデータを格納する列を追加します。 次の図は、DataHubトピックのスキーマを示しています。

説明

この例では、idname、およびaddressはデータフィールドです。 DTSは、以前のバージョンの追加の列の命名規則が使用されているため、ソースデータベースからターゲットデータベースに同期される元のデータフィールドを含むデータフィールドにdts_ プレフィックスを追加します。 追加の列に新しい命名規則を使用する場合、DTSは、ソースデータベースからターゲットデータベースに同期される元のデータフィールドにプレフィックスを追加しません。

Topic定义

次の表に、DataHubトピックの追加の列を示します。

前の追加の列名

新しい追加列名

タイプ

説明

dts_record_id

new_dts_sync_dts_record_id

String

増分ログエントリの一意のID。

説明
  • デフォルトでは、IDは新しいログエントリごとに自動インクリメントされます。 ディザスタリカバリシナリオでは、ロールバックが発生し、IDが自動インクリメントされない場合があります。 したがって、いくつかのIDが重複することがある。

  • UPDATE操作が実行されると、DTSは2つの増分ログエントリを生成して、更新前と更新後の値を記録します。 2つの増分ログエントリのdts_record_idフィールドの値は同じです。

dts_operation_フラグ

new_dts_sync_dts_operation_フラグ

String

操作タイプ。 有効な値:

  • I: INSERT操作。

  • D: DELETE操作。

  • U: UPDATE操作。

  • F: 完全なデータ同期。

dts_instance_id

new_dts_sync_dts_instance_id

String

データベースのサーバーID。

dts_db_name

new_dts_sync_dts_db_名

String

データベース名。

dts_table_name

new_dts_sync_dts_table_name

String

<td class="en-UStry align-left colsep-1 rowsep-1">テーブル名。</td>

dts_utc_タイムスタンプ

new_dts_sync_dts_utc_タイムスタンプ

String

UTCに表示される操作のタイムスタンプ。 ログファイルのタイムスタンプでもあります。

dts_before_フラグ

new_dts_sync_dts_before_フラグ

String

列の値が更新前の値かどうかを示します。 有効値: YとN。

dts_after_flag

new_dts_sync_dts_after_フラグ

String

列の値が更新後の値であるかどうかを示します。 有効値: YとN。

dts_before_flagおよびdts_after_flagフィールドに関する追加情報

増分ログエントリのdts_before_flagおよびdts_after_flagフィールドの値は、操作の種類によって異なります。

  • INSERT

    INSERT操作の場合、列の値は新しく挿入されたレコードの値 (更新後の値) です。 dts_before_flagフィールドの値はNであり、dts_after_flagフィールドの値はYである。

    INSERT操作

  • UPDATE

    DTSは、UPDATE操作用に2つの増分ログエントリを生成します。 2つの増分ログエントリは、dts_record_iddts_operation_flag、およびdts_utc_timestampフィールドの値が同じです。

    第1のログエントリは、更新前の値を記録する。 したがって、dts_before_flagフィールドの値はYであり、dts_after_flagフィールドの値はNである。第2のログエントリは、更新後の値を記録する。 したがって、dts_before_flagフィールドの値はNであり、dts_after_flagフィールドの値はYである。

    UPDATE操作

  • DELETE

    DELETE操作の場合、列の値は削除されたレコードの値 (更新前の値) です。 dts_before_flagフィールドの値はYであり、dts_after_flagフィールドの値はNである。

    DELETE操作

次のステップ

データ同期タスクを設定した後、Alibaba Cloud Realtime Compute for Apache Flinkを使用して、DataHubプロジェクトに同期されたデータを分析できます。 詳細については、「Alibaba Cloud Realtime Compute for Apache Flinkとは 」をご参照ください。