DataWorks でバッチ同期タスクを設定して、Tablestore から OSS に新規および変更されたデータを定期的に同期します。これにより、データのバックアップとその後の処理が可能になります。
準備
ソース Tablestore テーブルのインスタンス名、エンドポイント、およびリージョン ID を取得します。また、ソーステーブルのストリーム機能を有効にする必要があります。
データテーブルの場合、テーブルの作成または変更時にストリーム機能を有効にします。時系列テーブルの場合、この機能はデフォルトで有効になっています。
Alibaba Cloud アカウントまたは Tablestore と OSS の権限を持つ RAM ユーザーの AccessKey を作成します。
DataWorks をアクティブ化し、OSS バケットまたは Tablestore インスタンスと同じリージョンにワークスペースを作成します。
サーバーレスリソースグループを作成し、ワークスペースにアタッチします。課金の詳細については、「サーバーレスリソースグループの課金」をご参照ください。
DataWorks と Tablestore のインスタンスが異なるリージョンにある場合は、VPC ピアリング接続を作成して、リージョン間のネットワーク接続を確立する必要があります。
手順
ステップ 1: Tablestore データソースの追加
DataWorks で Tablestore データソースを設定して、ソースデータに接続します。
DataWorks コンソールにログインします。ターゲットリージョンに切り替えます。左側のナビゲーションウィンドウで、 を選択します。ドロップダウンリストからワークスペースを選択し、[データ統合へ] をクリックします。
左側のナビゲーションウィンドウで、[データソース] をクリックします。
[データソース] ページで、[データソースの追加] をクリックします。
[データソースの追加] ダイアログボックスで、データソースタイプとして [Tablestore] を検索して選択します。
[OTS データソースの追加] ダイアログボックスで、次の表に従ってデータソースパラメーターを設定します。
パラメーター
説明
データソース名
データソース名は、文字、数字、アンダースコア (_) の組み合わせである必要があります。数字またはアンダースコア (_) で始めることはできません。
データソースの説明
データソースの簡単な説明。説明は 80 文字以内にする必要があります。
リージョン
Tablestore インスタンスが存在するリージョンを選択します。
Tablestore インスタンス名
Tablestore インスタンスの名前。
エンドポイント
Tablestore インスタンスのエンドポイント。VPC アドレスを使用します。
AccessKey ID
Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。
AccessKey Secret
リソースグループの接続性をテストします。
データソースを作成するときは、リソースグループの接続性をテストして、同期タスクのリソースグループがデータソースに接続できることを確認する必要があります。そうしないと、データ同期タスクを実行できません。
[接続設定] セクションで、リソースグループの [接続ステータス] 列にある [ネットワーク接続のテスト] をクリックします。
接続性テストに合格したら、[完了] をクリックします。新しいデータソースがデータソースリストに表示されます。
接続性テストが失敗した場合は、[ネットワーク接続診断ツール] を使用して問題をトラブルシューティングします。
ステップ 2: OSS データソースの追加
データエクスポートの宛先として OSS データソースを設定します。
再度 [データソースの追加] をクリックします。ダイアログボックスで、データソースタイプとして [OSS] を検索して選択し、データソースパラメーターを設定します。
パラメーター
説明
データソース名
データソース名は、文字、数字、アンダースコア (_) で構成する必要があります。数字またはアンダースコア (_) で始めることはできません。
データソースの説明
データソースの簡単な説明。説明は 80 文字以内にする必要があります。
アクセスモード
[RAM ロール認証モード]: DataWorks サービスアカウントは、RAM ロールを引き受けることでデータソースにアクセスします。このモードを初めて選択する場合は、画面の指示に従って必要な権限を付与します。
[AccessKey モード]: Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret を使用してデータソースにアクセスします。
ロール
このパラメーターは、[アクセスモード] を [RAM ロール認証モード] に設定した場合にのみ必要です。
AccessKey ID
これらのパラメーターは、[アクセスモード] を [AccessKey モード] に設定した場合にのみ必要です。Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID と AccessKey Secret。
AccessKey Secret
リージョン
バケットが配置されているリージョン。
エンドポイント
OSS ドメイン名。詳細については、「リージョンとエンドポイント」をご参照ください。
バケット
バケットの名前。
パラメーターを設定し、接続性テストに合格したら、[完了] をクリックしてデータソースを追加します。
ステップ 3: バッチ同期タスクの設定
データ同期タスクを作成および設定して、Tablestore から OSS へのデータ転送ルールを定義します。
タスクノードの作成
[データ開発] ページに移動します。
DataWorks コンソールにログインします。
上部のナビゲーションバーで、リソースグループとリージョンを選択します。
左側のナビゲーションウィンドウで、 をクリックします。
対応するワークスペースを選択し、[Data Studio へ] をクリックします。
Data Studio コンソールで、[ワークスペースディレクトリ] の横にある
アイコンをクリックし、 を選択します。[ノードの作成] ダイアログボックスで、[パス] を選択します。ソースを [Tablestore Stream] に、宛先を [OSS] に設定します。[名前] を入力し、[OK] をクリックします。
同期タスクの設定
プロジェクトディレクトリで、新しいバッチ同期タスクノードをクリックし、コードレス UI またはコードエディタを使用して設定します。
時系列テーブルを同期する場合は、コードエディタのみを使用して同期タスクを設定します。
コードレス UI (デフォルト)
次の項目を設定します:
[データソース]: ソースと宛先のデータソースを選択します。
[ランタイムリソース]: リソースグループを選択します。システムは自動的にデータソースの接続性をテストします。
[データソース]: ソースデータテーブルを選択します。他のパラメーターはデフォルト設定を維持するか、必要に応じて変更します。
[宛先]: [テキストタイプ] を選択し、対応するパラメーターを設定します。
[テキストタイプ]: サポートされているテキストタイプは [csv]、[text]、[orc]、および [parquet] です。
[オブジェクト名 (パスを含む)]: OSS バケット内のファイルのパスと名前。例:
tablestore/resource_table.csv。[列区切り文字]: デフォルトは
,です。非表示の区切り文字の場合は、\u001bや\u007cなどの Unicode エンコーディングを入力します。[オブジェクトパス]: OSS バケット内のファイルのパス。このパラメーターは [parquet] ファイルの場合にのみ必要です。
[ファイル名]: OSS バケット内のファイルの名前。このパラメーターは、ファイルタイプが [parquet] の場合にのみ必要です。
[宛先フィールドマッピング]: マッピングは、ソーステーブルのプライマリキーと増分変更データに基づいて自動的に設定されます。必要に応じてマッピングを変更します。
設定が完了したら、ページの上部にある [保存] をクリックします。
コードエディタ
ページの上部にある [コードエディタ] をクリックしてスクリプトを編集します。
データテーブル
次の例は、宛先ファイルタイプが CSV のタスクを設定する方法を示しています。ソースデータテーブルには、id という名前の 1 つの int プライマリキー列と、name という名前の 1 つの string プライマリキー列を含むプライマリキーがあります。タスクを設定するときは、サンプルスクリプト内の datasource、table、および宛先ファイル名 (object) を置き換えます。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "otsstream",
"parameter": {
"statusTable": "TableStoreStreamReaderStatusTable",
"maxRetries": 31,
"isExportSequenceInfo": false,
"datasource": "source_data",
"column": [
"id",
"name",
"colName",
"version",
"colValue",
"opType",
"sequenceInfo"
],
"startTimeString": "${startTime}",
"table": "source_table",
"endTimeString": "${endTime}"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "oss",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"datasource": "target_data",
"writeSingleObject": false,
"column": [
"0",
"1",
"2",
"3",
"4",
"5",
"6"
],
"writeMode": "truncate",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileFormat": "csv",
"object": "tablestore/source_table.csv"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}時系列テーブル
次の例は、宛先ファイルタイプが CSV のタスクを設定する方法を示しています。ソース時系列テーブルのタイムラインデータには、value という名前の 1 つの int 属性列が含まれています。タスクを設定するときは、サンプルスクリプト内の datasource、table、および宛先ファイル名 (object) を置き換えます。
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "otsstream",
"parameter": {
"statusTable": "TableStoreStreamReaderStatusTable",
"maxRetries": 31,
"isExportSequenceInfo": false,
"datasource": "source_data",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source"
},
{
"name": "_tags"
},
{
"name": "_time"
},
{
"name": "value",
"type": "int"
}
],
"startTimeString": "${startTime}",
"table": "source_series",
"isTimeseriesTable":"true",
"mode": "single_version_and_update_only",
"endTimeString": "${endTime}"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "oss",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"datasource": "target_data",
"writeSingleObject": false,
"writeMode": "truncate",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileFormat": "csv",
"object": "tablestore/source_series.csv"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 2,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}スクリプトの編集が完了したら、ページの上部にある [保存] をクリックします。
同期タスクのデバッグ
ページの右側で、[デバッグ設定] をクリックします。タスクのリソースグループを選択し、[スクリプトパラメーター] を指定します。
[startTime]: 増分データ同期の開始時刻 (この時刻を含む)。例:
20251119200000。[endTime]: 増分データ同期の終了時刻 (この時刻を含まない)。例:
20251119205000。増分同期は 5 分ごとに実行される定期的なスケジュールを使用します。プラグインは 5 分の遅延を発生させます。これにより、合計同期遅延は 5〜10 分になります。終了時刻を設定するときは、現在時刻から 10 分以内の時刻を設定しないでください。
ページの上部にある [実行] をクリックして、同期タスクを開始します。
上記の例の値は、増分データが
2025/11/19の20:00 から20:50(この時刻を含まない) まで同期されることを示しています。
ステップ 4: 同期結果の表示
同期タスクが完了したら、ログで実行ステータスを表示し、OSS バケットで結果ファイルを確認できます。
ページの下部でタスクの実行ステータスと結果を表示します。次のログ情報は、同期タスクが正常に実行されたことを示しています。
2025-11-18 11:16:23 INFO Shell run successfully! 2025-11-18 11:16:23 INFO Current task status: FINISH 2025-11-18 11:16:23 INFO Cost time is: 77.208s宛先バケット内のファイルを表示します。
バケットリストに移動します。宛先バケットをクリックして、結果ファイルを表示またはダウンロードします。
本番稼働
デバッグが完了したら、ページの右側にある [スケジューリング設定] ペインで startTime および endTime スケジューリングパラメーターと定期的なスケジューリングポリシーを設定します。その後、タスクを本番環境に公開します。設定ルールの詳細については、「スケジューリングパラメーターの設定と使用」、「スケジューリングポリシー」、および「スケジューリング時間」をご参照ください。
