このトピックでは、data Transmission Service (DTS) を使用して、PolarDB-XインスタンスからMaxComputeプロジェクトにデータを同期する方法について説明します。
前提条件
MySQL 5.7と互換性のあるソースPolarDB-Xインスタンスが作成されました。 詳細については、次をご参照ください: インスタンスの作成と データベースの作成
MaxComputeが有効化されています。 詳細については、「MaxComputeとDataWorksの有効化」をご参照ください。
MaxComputeプロジェクトが作成されます。 詳細については、「MaxComputeプロジェクトの作成」をご参照ください。
制限事項
DTSは、ソースデータベースの外部キーをターゲットデータベースに同期しません。 したがって、ソースデータベースのカスケードおよび削除操作は、ターゲットデータベースと同期されません。
カテゴリ | 説明 |
ソースデータベースの制限 |
|
その他の制限 |
|
注意事項 | DTSは、バイナリログファイルの位置を移動するようにスケジュールされたソースデータベースの 'dts_health_check '.'ha_health_check' テーブルを更新します。 |
課金
同期タイプ | タスク設定料金 |
スキーマ同期と完全データ同期 | 無料です。 |
増分データ同期 | 有料。 詳細については、「課金の概要」をご参照ください。 |
同期可能なSQL操作
操作タイプ | SQL文 |
DML | 挿入、更新、および削除 |
手順
[データ同期タスク] ページに移動します。
データ管理 (DMS) コンソールにログインします。
上部のナビゲーションバーで、DTSをクリックします。
左側のナビゲーションウィンドウで、を選択します。
説明操作は、DMSコンソールのモードとレイアウトによって異なります。 詳細については、「シンプルモード」および「DMSコンソールのレイアウトとスタイルのカスタマイズ」をご参照ください。
新しいDTSコンソールのデータ同期タスクページに行くこともできます。
データ同期タスクの右側で、データ同期インスタンスが存在するリージョンを選択します。
説明新しいDTSコンソールを使用する場合は、上部のナビゲーションバーでデータ同期インスタンスが存在するリージョンを選択する必要があります。
[タスクの作成] をクリックします。 [データ同期タスクの作成] ページで、ソースデータベースとターゲットデータベースを設定します。 次の表にパラメーターを示します。
セクション
パラメーター
説明
非該当
タスク名
DTSタスクの名前。 タスク名は自動生成されます。 タスクを簡単に識別できるように、わかりやすい名前を指定することをお勧めします。 一意のタスク名を指定する必要はありません。
ソースデータベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
移行元ディスクのタイプを設定します。 [PolarDB-X 2.0] を選択します。
アクセス方法
ソースデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。
インスタンスリージョン
PolarDB-Xインスタンスが存在するリージョン。
データベースアカウント
ソースPolarDB-Xインスタンスのデータベースアカウント。 アカウントには、同期するオブジェクトに対するSELECT権限、およびREPLICATION CLIENT権限とREPLICATION SLAVE権限が必要です。
説明データベースアカウントに権限を付与する方法の詳細については、「PolarDB-Xのデータ同期ツール」をご参照ください。
データベースパスワード
データベースインスタンスへのアクセスに使用されるパスワード。
宛先データベース
既存のDMSデータベースインスタンスの選択
使用するデータベースインスタンス。 ビジネス要件に基づいて、既存のインスタンスを使用するかどうかを選択できます。
既存のインスタンスを選択すると、DTSはデータベースのパラメーターを自動的に入力します。
既存のインスタンスを選択しない場合は、次のデータベース情報を設定する必要があります。
データベースタイプ
ターゲットデータベースのタイプ。 [MaxCompute] を選択します。
アクセス方法
ターゲットデータベースのアクセス方法。 [Alibaba Cloudインスタンス] を選択します。
インスタンスリージョン
ターゲットMaxComputeプロジェクトが存在するリージョン。
プロジェクト
ターゲットMaxComputeプロジェクトの名前。 DataWorksコンソールの [ワークスペース] ページでプロジェクトを検索できます。
accessKeyId
ターゲットMaxComputeプロジェクトへの接続に使用するアカウントのAccessKey ID。 AccessKey IDの取得方法の詳細については、「AccessKeyペアの取得」をご参照ください。
accessSecret
ターゲットMaxComputeプロジェクトへの接続に使用するアカウントのAccessKeyシークレット。 AccessKeyシークレットの取得方法の詳細については、「AccessKeyペアの取得」をご参照ください。
ページの下部で、接続性をテストして続行をクリックします。
ソースまたはターゲットデータベースがAlibaba Cloudデータベースインスタンス (ApsaraDB RDS for MySQLインスタンスやApsaraDB for MongoDBインスタンスなど) の場合、DTSは自動的にDTSサーバーのCIDRブロックをインスタンスのホワイトリストに追加します。 ソースデータベースまたはターゲットデータベースがElastic Compute Service (ECS) インスタンスでホストされている自己管理データベースの場合、DTSサーバーのCIDRブロックがECSインスタンスのセキュリティグループルールに自動的に追加されます。ECSインスタンスがデータベースにアクセスできるようにする必要があります。 ソースデータベースまたはターゲットデータベースが、データセンターにデプロイされているか、サードパーティのクラウドサービスプロバイダーによって提供される自己管理データベースである場合、DTSサーバーのCIDRブロックをデータベースのホワイトリストに手動で追加して、DTSがデータベースにアクセスできるようにする必要があります。 詳細については、「DTSサーバーのCIDRブロックの追加」をご参照ください。
警告DTSサーバーのCIDRブロックがデータベースまたはインスタンスのホワイトリスト、またはECSセキュリティグループルールに自動的または手動で追加されると、セキュリティリスクが発生する可能性があります。 したがって、DTSを使用してデータを同期する前に、潜在的なリスクを理解して認識し、次の対策を含む予防策を講じる必要があります。VPNゲートウェイ、またはSmart Access Gateway。
[OK] をクリックして、MaxComputeアカウントに権限を付与します。
同期するオブジェクトと詳細設定を設定します。
パラメーター
説明
増分データテーブルのパーティション定義
ビジネス要件に基づいてパーティション名を選択します。 パーティションの詳細については、 「パーティション」をご参照ください。
同期タイプ
同期タイプ。 デフォルトでは、増分データ同期が選択されています。 [スキーマ同期] および [完全データ同期] も選択する必要があります。 事前チェックが完了すると、DTSは選択したオブジェクトの履歴データをソースデータベースからターゲットクラスターに同期します。 履歴データは、その後の増分同期の基礎となる。
競合テーブルの処理モード
エラーの事前チェックと報告: ターゲットデータベースに、ソースデータベースのテーブルと同じ名前のテーブルが含まれているかどうかを確認します。 ソースデータベースとターゲットデータベースに同じテーブル名のテーブルが含まれていない場合は、事前チェックに合格します。 それ以外の場合、事前チェック中にエラーが返され、データ同期タスクを開始できません。
説明ソースデータベースとターゲットデータベースに同じ名前のテーブルが含まれていて、ターゲットデータベース内のテーブルを削除または名前変更できない場合は、オブジェクト名マッピング機能を使用して、ターゲットデータベースに同期されるテーブルの名前を変更できます。 詳細については、「マップオブジェクト名」をご参照ください。
エラーを無視して続行: ソースデータベースとターゲットデータベースの同じテーブル名の事前チェックをスキップします。
警告エラーを無視して続行 を選択すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
ソースデータベースとターゲットデータベースが同じスキーマを持ち、ターゲットデータベースのデータレコードがソースデータベースのデータレコードと同じ主キー値または一意キー値を持つ場合:
完全データ同期中、DTSはデータレコードをターゲットデータベースに同期しません。 ターゲットデータベースの既存のデータレコードが保持されます。
増分データ同期中、DTSはデータレコードをターゲットデータベースに同期します。 ターゲットデータベースの既存のデータレコードが上書きされます。
ソースデータベースとターゲットデータベースのスキーマが異なる場合、データの初期化に失敗する可能性があります。 この場合、一部の列のみが同期されるか、データ同期タスクが失敗します。 作業は慎重に行ってください。
追加列の命名規則
DTSがMaxComputeにデータを同期した後、DTSは宛先テーブルに追加の列を追加します。 追加の列の名前がターゲットテーブルの既存の列の名前と同じである場合、データ同期は失敗します。 ビジネス要件に基づいて、[新しいルール] または [以前のルール] を選択します。
警告このパラメーターを指定する前に、宛先テーブルの追加の列と既存の列に名前の競合があるかどうかを確認してください。 追加の列の命名規則と定義の詳細については、「追加の列の命名規則」をご参照ください。
宛先インスタンスでのオブジェクト名の大文字化
ターゲットインスタンスのデータベース名、テーブル名、および列名の大文字化。 デフォルトでは、DTSデフォルトポリシーが選択されています。 他のオプションを選択して、オブジェクト名の大文字化をソースまたはターゲットデータベースの大文字化と一致させることができます。 詳細については、「ターゲットインスタンスのオブジェクト名の大文字化の指定」をご参照ください。
ソースオブジェクト
ソースオブジェクト セクションから1つ以上のオブジェクトを選択し、アイコンをクリックして 選択中のオブジェクト セクションにオブジェクトを追加します。
説明同期するオブジェクトとして、列、テーブル、またはデータベースを選択できます。 同期するオブジェクトとしてテーブルまたは列を選択した場合、DTSはビュー、トリガー、ストアドプロシージャなどの他のオブジェクトをターゲットデータベースに同期しません。
[選択済みオブジェクト]
同期先のインスタンスに同期するオブジェクトの名前を変更するには、選択中のオブジェクト セクションでオブジェクトを右クリックします。 詳細については、「オブジェクト名のマップ」トピックの「単一オブジェクトの名前のマップ」セクションをご参照ください。
一度に複数のオブジェクトの名前を変更するには、選択中のオブジェクト セクションの右上隅にある 一括編集 をクリックします。 詳細については、「オブジェクト名のマップ」トピックの「一度に複数のオブジェクト名をマップする」セクションをご参照ください。
説明特定のデータベースまたはテーブルで実行されるSQL操作を選択するには、次の手順を実行します。[選択されたオブジェクト] セクションで、オブジェクトを右クリックします。 表示されるダイアログボックスで、同期するSQL操作を選択します。 同期できるSQL操作の詳細については、このトピックの「同期できるSQL操作」をご参照ください。
データをフィルタリングするWHERE条件を指定するには、[選択済みオブジェクト] セクションでオブジェクトを右クリックします。 表示されるダイアログボックスで、条件を指定します。 詳細については、「フィルター条件の指定」をご参照ください。
[次へ:詳細設定] をクリックして詳細設定を構成します。
パラメーター
説明
Set Alerts
データ同期タスクのアラートを設定するかどうかを指定します。 タスクが失敗するか、同期レイテンシが指定されたしきい値を超えると、アラート送信先は通知を受け取ります。 有効な値:
No: アラートを有効にしません。
Yes: アラートを設定します。 この場合、アラートしきい値と アラート通知設定 詳細については、「モニタリングとアラートの設定」トピックの「DTSタスクを作成するときのモニタリングとアラートの設定」をご参照ください。
失敗した接続のリトライ時間範囲の指定
失敗した接続のリトライ時間範囲。 データ同期タスクの開始後にソースデータベースまたはターゲットデータベースの接続に失敗した場合、DTSはその時間範囲内ですぐに接続を再試行します。 有効な値: 10 ~ 1440 単位は分です。 デフォルト値: 720 このパラメーターを30より大きい値に設定することを推奨します。 DTSが指定された時間範囲内にソースデータベースとターゲットデータベースに再接続すると、DTSはデータ同期タスクを再開します。 それ以外の場合、データ同期タスクは失敗します。
説明ソースまたはターゲットデータベースが同じである複数のデータ同期タスクに対して異なるリトライ時間範囲を指定した場合、最も短いリトライ時間範囲が優先されます。
DTSが接続を再試行すると、DTSインスタンスに対して課金されます。 業務要件に基づいて再試行時間範囲を指定することを推奨します。 ソースインスタンスとターゲットインスタンスがリリースされた後、できるだけ早くDTSインスタンスをリリースすることもできます。
ETLの設定
抽出、変換、および読み込み (ETL) 機能を有効にするかどうかを指定します。 詳細については、「ETLとは何ですか? 」をご参照ください。有効な値:
Yes: ETL機能を設定します。 コードエディターでデータ処理ステートメントを入力できます。 詳細については、「データ移行またはデータ同期タスクでのETLの設定」をご参照ください。
No: ETL機能を設定しません。
タスク設定を保存し、事前チェックを実行します。
関連するAPI操作を呼び出してDTSタスクを設定するときに指定するパラメーターを表示するには、ポインターを 次:タスク設定の保存と事前チェック に移動し、OpenAPI パラメーターのプレビュー をクリックします。
パラメーターを表示または表示する必要がない場合は、ページ下部の 次:タスク設定の保存と事前チェック をクリックします。
説明データ同期タスクを開始する前に、DTSは事前チェックを実行します。 データ同期タスクは、タスクが事前チェックに合格した後にのみ開始できます。
データ同期タスクが事前チェックに失敗した場合は、失敗した各項目の横にある [詳細の表示] をクリックします。 チェック結果に基づいて原因を分析した後、問題のトラブルシューティングを行います。 次に、プレチェックを再実行します。
事前チェック中にアイテムに対してアラートがトリガーされた場合:
アラートアイテムを無視できない場合は、失敗したアイテムの横にある [詳細の表示] をクリックして、問題のトラブルシューティングを行います。 次に、もう一度プレチェックを実行します。
アラート項目を無視できる場合は、[アラート詳細の確認] をクリックします。 [詳細の表示] ダイアログボックスで、[無視] をクリックします。 表示されたメッセージボックスで、[OK] をクリックします。 次に、[再度事前チェック] をクリックして、事前チェックを再度実行します。 アラート項目を無視すると、データの不整合が発生し、ビジネスが潜在的なリスクにさらされる可能性があります。
成功率が100% になるまで待ちます。 次に、[次へ: インスタンスの購入] をクリックします。
購入ページで、データ同期インスタンスの課金方法とインスタンスクラスのパラメーターを設定します。 下表にパラメーターを示します。
セクション
パラメーター
説明
新しいインスタンスクラス
Billing Method
サブスクリプション: データ同期インスタンスの作成時にサブスクリプションの料金を支払います。 サブスクリプションの課金方法は、長期使用の場合、従量課金の課金方法よりも費用対効果が高くなります。
従量課金: 従量課金インスタンスは1時間ごとに課金されます。 従量課金方法は、短期使用に適しています。 従量課金データ同期インスタンスが不要になった場合は、インスタンスをリリースしてコストを削減できます。
リソースグループの設定
データ同期インスタンスが属するリソースグループ。 デフォルト値: Default resource group 詳細については、「リソース管理とは 」をご参照ください。
インスタンスクラス
DTSは、同期速度が異なるインスタンスクラスを提供します。 ビジネス要件に基づいてインスタンスクラスを選択できます。 詳細については、「データ同期インスタンスのインスタンスクラス」をご参照ください。
サブスクリプション期間
サブスクリプションの課金方法を選択した場合は、サブスクリプション期間と作成するデータ同期インスタンスの数を指定します。 サブスクリプション期間は、1〜9か月、1年、2年、3年、または5年とすることができる。
説明このパラメーターは、サブスクリプション の課金方法を選択した場合にのみ使用できます。
データ伝送サービス (従量課金) サービス規約を読んで選択します。
[購入して開始] をクリックします。 表示されるダイアログボックスで、OK をクリックします。
タスクリストでタスクの進行状況を確認できます。
増分データテーブルのスキーマ
MaxComputeプロジェクトのフルテーブルスキャンを許可するには、MaxComputeでset odps.sql.allow.fullscan=true;
ステートメントを実行する必要があります。
DTSは、ソースPolarDB-X 2.0インスタンスで生成された増分データをMaxComputeの増分データテーブルに同期します。 増分データテーブルは、増分データおよび特定のメタデータを格納する。 次の図は、増分データテーブルのスキーマを示しています。
この例では、modifytime_year
、modifytime_month
、modifytime_day
、modifytime_hour
、およびmodifytime_minute
フィールドがパーティションキーを形成します。 これらのフィールドはステップ6で指定されます。
増分データテーブルのスキーマ
項目 | 説明 |
record_id | 増分ログエントリの一意のID。 説明
|
operation_flag | 操作のタイプ。 有効な値:
|
utc_timestamp | 操作のタイムスタンプ (UTC) 。 また、バイナリログファイルのタイムスタンプでもあります。 |
before_flag | 列の値が更新前の値かどうかを示します。 有効値: YとN。 |
after_flag | 列の値が更新後の値であるかどうかを示します。 有効値: YとN。 |
before_flagおよびafter_flagフィールドに関する追加情報
増分ログエントリのbefore_flagフィールドとafter_flagフィールドは、操作の種類に応じて定義されます。
INSERT
INSERT操作の場合、列の値は新しく挿入されたレコードの値 (更新後の値) です。 before_flagフィールドの値はNであり、after_flagフィールドの値はYである。
UPDATE
DTSは、UPDATE操作用に2つの増分ログエントリを生成します。 2つの増分ログエントリは、record_id、operation_flag、およびutc_timestampフィールドに同じ値を持ちます。
第1のログエントリは、更新前の値を記録するので、before_flagフィールドの値はYであり、after_flagフィールドの値はNである。第2のログエントリは、更新後の値を記録するので、before_flagフィールドの値はNであり、after_flagフィールドの値はYである。
DELETE
DELETE操作の場合、列の値は削除されたレコードの値 (更新前の値) です。 before_flagフィールドの値はYであり、after_flagフィールドの値はNである。
完全なベースラインテーブルと増分データテーブルのマージ
データ同期タスクが開始されると、DTSはMaxComputeに完全なベースラインテーブルと増分データテーブルを作成します。 SQL文を使用して、2つのテーブルをマージできます。 これにより、特定の時点で完全なデータを取得できます。
このセクションでは、customerという名前のテーブルのデータをマージする方法について説明します。 次の図は、顧客テーブルのスキーマを示しています。
ソーステーブルのスキーマに基づいて、MaxComputeでテーブルを作成します。 テーブルは、マージされたデータを格納するために使用されます。
たとえば、
1565944878
の時点で顧客テーブルの完全なデータを取得できます。 次のSQL文を実行して、必要なテーブルを作成します。CREATE TABLE `customer_1565944878` ( `id` bigint NULL, `register_time` datetime NULL, `address` string);
説明アドホッククエリ機能を使用して、SQL文を実行できます。 詳細については、「アドホッククエリ機能を使用したSQL文の実行 (オプション) 」をご参照ください。
MaxComputeでサポートされているデータ型の詳細については、「データ型のエディション」をご参照ください。
MaxComputeで次のSQL文を実行して、完全なベースラインテーブルと増分データテーブルをマージし、特定の時点で完全なデータを取得します。
set odps.sql.allow.fullscan=true; insert overwrite table <result_storage_table> select <col1>, <col2>, <colN> from( select row_number() over(partition by t.<primary_key_column> order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, <col1>, <col2>, <colN> from( select incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>, incr.<col2>,incr.<colN> from <table_log> incr where utc_timestamp< <timestamp> union all select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>, base.<col2>,base.<colN> from <table_base> base) t) gt where row_number=1 and after_flag='Y'
説明<result_storage_table>: マージされたデータを格納するテーブルの名前。
<col1>/<col2>/<colN>: マージするテーブル内の列の名前。
<primary_key_column>: マージするテーブルの主キー列の名前。
<table_log>: 増分データテーブルの名前。
<table_base>: 完全なベースラインテーブルの名前。
<timestamp>: フルデータが取得されたときに生成されるタイムスタンプ。
次のSQL文を実行して、
1565944878
の時点で顧客テーブルの完全なデータを取得します。set odps.sql.allow.fullscan=true; insert overwrite table customer_1565944878 select id, register_time, address from( select row_number() over(partition by t.id order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, register_time, address from( select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.register_time, incr.address from customer_log incr where utc_timestamp< 1565944878 union all select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.register_time, base.address from customer_base base) t) gt where gt.row_number= 1 and gt.after_flag= 'Y';
customer_1565944878テーブルからマージされたデータを照会します。