MaxCompute を使用して Tablestore データをバックアップする場合、または Tablestore から MaxCompute にデータを移行する場合は、DataWorks コンソールでバッチ同期タスクを作成および構成して、フルデータをエクスポートできます。フルデータが MaxCompute にエクスポートされた後、DataWorks の DataAnalysis サービスを使用して、エクスポートされたデータを表示および分析できます。
使用上の注意
Tablestore のフィールド名は大文字と小文字が区別されます。MaxCompute のフィールド名が Tablestore のフィールド名と同じであることを確認してください。
ステップ 1: Tablestore データソースを追加する
Tablestore データベースをデータソースとして追加するには、次の手順を実行します。
データ統合ページに移動します。
DataWorks コンソール にログオンし、左上隅でリージョンを選択し、 を選択し、ドロップダウンリストからワークスペースを選択して、データ統合に移動 をクリックします。
左側のナビゲーションペインで、データソース をクリックします。
データソース ページで、データソースを追加 をクリックします。
データソースを追加 ダイアログボックスで、Tablestore ブロックをクリックします。
OTS データソースを追加 ダイアログボックスで、次の表に示すパラメータを構成します。
パラメータ
説明
データソース名
データソースの名前。名前には、文字、数字、アンダースコア (_) を使用でき、文字で始める必要があります。
データソースの説明
データソースの説明。説明は 80 文字以内にする必要があります。
エンドポイント
Tablestore インスタンスのエンドポイント。詳細については、エンドポイント を参照してください。
Tablestore インスタンスと宛先データソースのリソースが同じリージョンにある場合は、仮想プライベートクラウド (VPC) エンドポイントを入力します。それ以外の場合は、パブリックエンドポイントを入力します。
Table Store インスタンス名
Tablestore インスタンスの名前。詳細については、インスタンス を参照してください。
AccessKey ID
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey シークレット。AccessKey ペアの作成方法の詳細については、AccessKey ペアの作成 を参照してください。
AccessKey Secret
データソースと選択したリソースグループ間のネットワーク接続をテストします。
同期ノードが想定どおりに実行されるようにするには、データソースと、同期ノードが実行されるすべてのリソースグループタイプとの間の接続をテストする必要があります。
重要同期タスクは、1 つのタイプのリソースグループのみを使用できます。デフォルトでは、データ統合の共有リソースグループのみがリソースグループリストに表示されます。データ同期の安定性とパフォーマンスを確保するために、データ統合専用の排他的リソースグループを使用することをお勧めします。
購入 をクリックして新しいリソースグループを作成するか、購入したリソースグループを関連付ける をクリックして既存のリソースグループを関連付けます。詳細については、データ統合専用の排他的リソースグループの作成と使用 を参照してください。
リソースグループが起動した後、リソースグループの 接続ステータス (本番環境) 列にある ネットワーク接続をテスト をクリックします。
接続済み と表示された場合、接続テストは合格です。
データソースがネットワーク接続テストに合格したら、完了 をクリックします。
新しく作成されたデータソースがデータソースリストに表示されます。
ステップ 2: MaxCompute データソースを追加する
ステップ 1 と同様の操作を実行して、MaxCompute をデータソースとして追加します。
データソース ページで、データソースを追加 をクリックします。
データソースを追加 ダイアログボックスで、MaxCompute ブロックを見つけてクリックします。
Maxcompute データソースを追加 ダイアログボックスで、パラメータを構成し、リソースグループの接続をテストして、完了 をクリックします。詳細については、MaxCompute データソースの追加 を参照してください。
重要同期タスクは、1 つのタイプのリソースグループのみを使用できます。デフォルトでは、リソースグループリストには、データ統合専用の排他的リソースグループのみが表示されます。データ同期の安定性とパフォーマンスを確保するために、データ統合専用の排他的リソースグループを使用することをお勧めします。
使用可能なリソースグループがない場合は、購入 をクリックして、データ統合専用の排他的リソースグループを作成します。詳細については、データ統合専用の排他的リソースグループの作成と使用 を参照してください。
ステップ 3: バッチ同期ノードを作成する
DataStudio コンソールに移動します。
DataWorks コンソール にログオンし、左上隅でリージョンを選択し、 を選択し、ドロップダウンリストからワークスペースを選択して、Datastudio に移動 をクリックします。
DataStudio コンソールの スケジュール済みワークフロー ページで、ビジネスフローをクリックしてビジネスフローを選択します。
ワークフローの作成方法については、ワークフローの作成 を参照してください。
データ統合 ノードを右クリックし、ノードの作成 > オフライン同期 を選択します。
ノードの作成 ダイアログボックスで、パスを選択し、ノード名を入力します。
確認 をクリックします。
新しく作成されたオフライン同期ノードが データ統合 ノードの下に表示されます。
ステップ 4: バッチ同期タスクを構成して開始する
Tablestore から MaxCompute にデータを同期するタスクを作成および構成するには、次の手順を実行します。
データ統合 フォルダで、作成したバッチ同期ノードをダブルクリックします。
リソースグループとデータソース間のネットワーク接続を確立します。
バッチ同期タスクのソースと宛先、およびバッチ同期タスクの実行に使用するリソースグループを選択します。リソースグループとデータソース間のネットワーク接続を確立し、接続をテストします。
重要データ同期タスクは、リソースグループを使用して実行されます。リソースグループを選択し、リソースグループとデータソース間のネットワーク接続が確立されていることを確認してください。
ネットワーク接続とリソースグループの構成 ステップで、ソース パラメータを Tablestore に設定し、データソース名 パラメータを ステップ 1: Tablestore データソースを追加する で追加した Tablestore データソースの名前に設定します。
リソースグループを選択します。
リソースグループを選択すると、システムはリソースグループのリージョンと仕様を表示し、リソースグループとソースデータソース間の接続を自動的にテストします。
重要リソースグループが、データソースを追加したときに選択したものと同じであることを確認してください。
宛先 パラメータを MaxCompute(ODPS) に設定し、データソース名 パラメータを ステップ 2: MaxCompute データソースを追加する で追加した宛先データソースの名前に設定します。
システムは、リソースグループと宛先データソース間の接続を自動的にテストします。
次へ をクリックします。
タスクを構成します。
コードレスユーザーインターフェース (UI) またはコードエディタを使用してタスクを構成できます。
(推奨) コードレス UI を使用する
タスクの構成 ステップの ソースと宛先の構成 セクションで、ビジネス要件に基づいてデータソースと宛先を構成します。
データソースの構成
パラメータ
説明
テーブル
Tablestore データテーブルの名前。
プライマリキーの範囲 (開始)
読み取るデータの範囲を指定するために使用される開始プライマリキーと終了プライマリキー。値は JSON 配列である必要があります。
開始プライマリキーと終了プライマリキーは、INF_MIN タイプと INF_MAX タイプの値で構成される有効なプライマリキーまたは仮想ポイントである必要があります。仮想ポイントの列数は、プライマリキー列の数と同じである必要があります。
INF_MIN タイプは、無限に小さい値を指定します。他のすべてのタイプの値は、INF_MIN タイプの値よりも大きくなります。INF_MAX タイプは、無限に大きい値を指定します。他のすべてのタイプの値は、INF_MAX タイプの値よりも小さくなります。
データテーブルの行は、プライマリキーによって昇順にソートされます。読み取るデータの範囲は、左閉区間、右開区間です。プライマリキーが開始プライマリキー以上で終了プライマリキー未満のすべての行が返されます。
たとえば、テーブルに pk1 と pk2 のプライマリキー列が含まれているとします。pk1 列は String タイプで、pk2 列は Integer タイプです。
テーブルからフルデータをエクスポートするには、次のパラメータを指定します。
プライマリキーのサンプル範囲 (開始) パラメータ
[ { "type": "INF_MIN" }, { "type": "INF_MIN" } ]
プライマリキーのサンプル範囲 (終了) パラメータ
[ { "type": "INF_MAX" }, { "type": "INF_MAX" } ]
pk1 列の値が tablestore である行をエクスポートするには、次のパラメータを指定します。
プライマリキーのサンプル範囲 (開始) パラメータ
[ { "type": "STRING", "value": "tablestore" }, { "type": "INF_MIN" } ]
プライマリキーのサンプル範囲 (終了) パラメータ
[ { "type": "STRING", "value": "tablestore" }, { "type": "INF_MAX" } ]
プライマリキーの範囲 (終了)
分割構成情報
データを分割するために使用されるカスタムルール。一般的なシナリオでは、このパラメータを構成しないことをお勧めします。Tablestore テーブル内のデータの分布が不均一で、Tablestore Reader の自動分割機能が失敗した場合は、カスタムルールを指定してデータを分割できます。開始プライマリキーと終了プライマリキーの間の範囲内で分割キーを構成できます。すべてのプライマリキーを指定する必要はありません。値は JSON 配列です。
データ宛先の構成
パラメータ
説明
トンネルリソースグループ
トンネルクォータ。デフォルト値は共通伝送リソースで、MaxCompute のフリートンネルクォータを指定します。
説明延滞または期限切れのため、排他的トンネルクォータが使用できない場合、実行中のタスクは排他的トンネルクォータからフリートンネルクォータに自動的に切り替わります。
テーブル
MaxCompute 内のテーブルの名前。
パーティション情報
日次増分データを対応する日付のパーティションに保存する場合は、pt パラメータを構成して日次増分データの同期を許可します。たとえば、pt パラメータを ${bizdate} に設定できます。
書き込みモード
MaxCompute にデータを書き込むモード。有効な値:
書き込む前に既存のデータを保持する (Insert Into): データはテーブルまたは静的パーティションに直接インポートされます。
書き込む前に既存のデータをクリーンアップする (Insert Overwrite): データがテーブルまたは静的パーティションにインポートされる前に、宛先の既存のデータがクリアされます。
空の文字列を Null に変換して書き込む
ソースの空の文字列を宛先の Null に変換するかどうかを指定します。
同期後に表示
このパラメータは、詳細設定 をクリックした後にのみ表示されます。
MaxCompute に同期されたデータが同期完了後にクエリできるかどうかを指定します。
フィールドマッピング セクションで、ソースフィールドとターゲットフィールドの横にある アイコンをクリックして、ソースフィールドと宛先フィールドを変更します。
重要ソースフィールドの数とタイプが宛先フィールドの数とタイプと一致していることを確認してください。
JSON 形式でソースフィールドを指定する必要があります。例:
{"name":"id","type":"STRING"}
。ターゲットフィールドに各宛先フィールドの名前を指定する必要があります。1 行が 1 つのフィールドを表します。ソースフィールドの行は、ターゲットフィールドの同じ行にマッピングされます。チャネル制御 セクションで、タスクの予想最大同時実行数、同期レート、ダーティデータレコードのポリシー、分散実行パラメータなど、タスク実行のパラメータを構成します。パラメータの詳細については、チャネル制御ポリシーの構成 を参照してください。
アイコンをクリックして構成を保存します。
説明構成を保存しないと、後続の操作を実行したときに構成を保存するように求めるメッセージが表示されます。OK をクリックして構成を保存します。
コードエディタを使用する
Tablestore から MaxCompute にフルデータを同期するには、Tablestore Reader プラグインと MaxCompute Writer プラグインを使用します。スクリプト構成ルールの詳細については、Tablestore データソース と MaxCompute データソース を参照してください。
重要コードレス UI とコードエディタを切り替えることはできません。注意して進めてください。
タスクの構成 ステップで、 アイコンをクリックします。表示されるメッセージで、OK をクリックします。
コードエディタで、次のサンプルコードに基づいてパラメータを指定します。
重要ほとんどの場合、フルデータをエクスポートするタスクは 1 回だけ実行されます。タスクのスケジューリングパラメータを構成する必要はありません。
スクリプト構成に変数 (例:
${date}
) が含まれている場合は、データを同期するタスクを実行するときに各変数を特定の値に設定します。サンプルコードには、構成を理解するのに役立つコメントが記載されています。サンプルコードを使用する場合は、すべてのコメントを削除してください。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "ots", // リーダーのタイプ。このパラメータは変更できません。 "parameter": { "datasource": "", // Tablestore データソースの名前。ビジネス要件に基づいてこのパラメータを構成します。 "column": [ // Tablestore ソースからエクスポートする列の名前。 { "name": "column1" }, { "name": "column2" }, { "name": "column3" }, { "name": "column4" }, { "name": "column5" } ], "range": { "split": [ // Tablestore 内のテーブルに関するパーティション情報。エクスポートを高速化できます。ほとんどの場合、このパラメータを指定する必要はありません。 { "type": "INF_MIN" }, { "type": "STRING", "value": "splitPoint1" }, { "type": "STRING", "value": "splitPoint2" }, { "type": "STRING", "value": "splitPoint3" }, { "type": "INF_MAX" } ], "end": [ { "type": "INF_MAX" // Tablestore の最初のプライマリキー列の終了位置。フルデータをエクスポートするには、このパラメータを INF_MAX に設定します。データの一部のみをエクスポートするには、ビジネス要件に基づいてこのパラメータを構成します。テーブルに複数のプライマリキー列が含まれている場合は、各プライマリキー列の end パラメータを構成します。 }, { "type": "INF_MAX" }, { "type": "STRING", "value": "end1" }, { "type": "INT", "value": "100" } ], "begin": [ { "type": "INF_MIN" // Tablestore の最初のプライマリキー列の開始位置。フルデータをエクスポートするには、このパラメータを INF_MIN に設定します。データの一部のみをエクスポートするには、ビジネス要件に基づいてこのパラメータを構成します。テーブルに複数のプライマリキー列が含まれている場合は、各プライマリキー列の begin パラメータを構成します。 }, { "type": "INF_MIN" }, { "type": "STRING", "value": "begin1" }, { "type": "INT", "value": "0" } ] }, "table": "" // Tablestore 内のテーブルの名前。 }, "name": "Reader", "category": "reader" }, { "stepType": "odps", // ライターのタイプ。このパラメータは変更できません。 "parameter": { "partition": "", // MaxCompute テーブルがパーティション分割されている場合、このパラメータは必須です。MaxCompute テーブルがパーティション分割されていない場合は、このパラメータを指定しないでください。データを書き込むパーティションの名前。最後のレベルのパーティションを指定する必要があります。 "truncate": true, // すべての履歴データを削除するかどうかを指定します。 "datasource": "", // MaxCompute データソースの名前。ビジネス要件に基づいてこのパラメータを構成します。 "column": [ // MaxCompute の列の名前。Tablestore の列名と同じ順序で列名を指定します。 "*" ], "table": "" // MaxCompute 内のテーブルの名前。テーブルは事前に作成する必要があります。そうしないと、タスクが失敗する可能性があります。 }, "name": "Writer", "category": "writer" }, { "name": "Processor", "stepType": null, "category": "processor", "parameter": {} } ], "setting": { "executeMode": null, "errorLimit": { "record": "0" // エラー数がレコード数を超えると、タスクは失敗します。 }, "speed": { "throttle":true, // スロットリングを有効にするかどうかを指定します。false の値はスロットリングが無効になっていることを指定し、true の値はスロットリングが有効になっていることを指定します。mbps パラメータは、throttle パラメータを true に設定した場合にのみ有効になります。 "concurrent":1 // 同時タスクの最大数。 "mbps":"12" // トラフィックが制限されるレート。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
begin パラメータと end パラメータを構成して、エクスポートするデータの範囲を指定できます。たとえば、テーブルに pk1 と pk2 のプライマリキー列が含まれているとします。pk1 列は String タイプで、pk2 列は Integer タイプです。
テーブルからフルデータをエクスポートするには、次のパラメータを指定します。
"begin": [ // エクスポートするデータの開始位置。 { "type": "INF_MIN" }, { "type": "INF_MIN" } ], "end": [ // エクスポートするデータの終了位置。 { "type": "INF_MAX" }, { "type": "INF_MAX" } ],
pk1 列の値が tablestore である行をエクスポートするには、次のパラメータを指定します。
"begin": [ // エクスポートするデータの開始位置。 { "type": "STRING", "value": "tablestore" }, { "type": "INF_MIN" } ], "end": [ // エクスポートするデータの終了位置。 { "type": "STRING", "value": "tablestore" }, { "type": "INF_MAX" } ],
アイコンをクリックして構成を保存します。
説明スクリプトを保存しないと、後続の操作を実行したときにスクリプトを保存するように求めるメッセージが表示されます。OK をクリックしてスクリプトを保存します。
同期タスクを実行する
重要ほとんどの場合、フルデータは 1 回だけ同期する必要があり、スケジューリングプロパティを構成する必要はありません。
アイコンをクリックします。
パラメータ ダイアログボックスで、ドロップダウンリストからリソースグループの名前を選択します。
実行 をクリックします。
同期タスクが完了したら、ランタイムログ タブの実行ログの URL をクリックして、実行ログの詳細ページに移動します。実行ログの詳細ページで、
現在のタスクステータス
パラメータの値を確認します。現在のタスクステータス
パラメータの値が FINISH の場合、タスクは完了です。
ステップ 5: MaxCompute にインポートされたデータを表示する
MaxCompute SQL タスクを開発するか、アドホッククエリを作成して、SQL ステートメントを実行することで MaxCompute テーブルのデータをクエリできます。詳細については、MaxCompute SQL タスクの開発、アドホッククエリの作成、テーブル操作 を参照してください。