すべてのプロダクト
Search
ドキュメントセンター

Tablestore:Tablestore SDK を使用して Tablestore データを OSS に配信する

最終更新日:Jun 19, 2025

Tablestore SDK を使用してデータを配信する前に、使用方法と操作について理解する必要があります。Tablestore コンソールで配信タスクを作成して、Tablestore テーブルから OSS バケットにデータを配信できます。

使用方法

  • データ配信は、中国 (杭州)、中国 (上海)、中国 (北京)、中国 (深圳)、中国 (張家口) リージョンでご利用いただけます。

  • Tablestore データに対する削除操作は、データの配信時に無視されます。Tablestore データを OSS に配信した後、そのデータに対して削除操作を実行しても、OSS に配信されたデータは削除されません。

  • 配信タスクの作成時の初期化には最大 1 分かかります。

  • データが一定の速度で書き込まれる場合、レイテンシは 3 分以内です。データが同期される場合、P99 レイテンシは 10 分以内です。

    説明

    P99 レイテンシは、過去 10 秒間のリクエストのうち最も遅い 1% の平均レイテンシを示します。

前提条件

  • Object Storage Service (OSS) コンソールで次の操作を実行します。

    OSS がアクティブ化されています。Tablestore インスタンスが存在するリージョンにバケットが作成されています。詳細については、「OSS をアクティブ化する」をご参照ください。

    説明

    データ配信では、Tablestore インスタンスから、Tablestore インスタンスと同じリージョンにある OSS バケットにデータを配信できます。MaxCompute などの他のウェアハウスにデータを配信するには、チケットの送信 を行ってください。

  • Tablestore コンソールで次の操作を実行します。

    • [インスタンス管理] ページの [インスタンスの詳細] タブから、Tablestore インスタンスのエンドポイントを取得します。詳細については、「エンドポイント」をご参照ください。

    • データテーブルが作成されています。詳細については、「データテーブルの操作」をご参照ください。

  • Resource Access Management (RAM) コンソールで次の操作を実行します。

    • RAM ユーザーを作成し、AliyunOTSFullAccess ポリシーを RAM ユーザーにアタッチして、Tablestore を管理するための権限を RAM ユーザーに付与します。詳細については、「RAM ユーザーの作成」および「RAM ユーザーへの権限付与」をご参照ください。

      警告

      Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、リソースが潜在的なリスクにさらされます。RAM ユーザーの AccessKey ペアを使用して操作を実行することをお勧めします。これにより、Alibaba Cloud アカウントの AccessKey ペアの漏洩を防ぐことができます。

      RAM ユーザーの AccessKey ペアが作成されています。詳細については、「AccessKey ペアを作成する」をご参照ください。

  • アクセス認証情報が構成されています。詳細については、「アクセス認証情報を構成する」をご参照ください。

操作

操作

説明

CreateDeliveryTask

配信タスクを作成します。

ListDeliveryTask

データテーブルに対して作成されたすべての配信タスクに関する情報を一覧表示します。

DescribeDeliveryTask

配信タスクの詳細情報を照会します。

DeleteDeliveryTask

配信タスクを削除します。

パラメーター

パラメーター

説明

tableName

データテーブルの名前。

taskName

配信タスクの名前。

名前は 3 ~ 16 文字で、小文字、数字、およびハイフン (-) のみを含めることができます。名前は小文字または数字で始まり、小文字または数字で終わる必要があります。

taskConfig

配信タスクの構成。以下の内容が含まれます。

  • ossPrefix: OSS バケット内のディレクトリのプレフィックス。Tablestore データはディレクトリに配信されます。ディレクトリのパスは、次の時間変数をサポートしています: $yyyy、$MM、$dd、$HH、$mm。

    • パスで時間変数を使用する場合、OSS ディレクトリはデータが書き込まれた時刻に基づいて動的に生成されます。このようにして、データは Hive パーティションの命名規則に基づいてパーティション化されます。OSS 内のオブジェクトは、時間に基づいて編成、パーティション化、および分散されます。

    • パスで時間変数を使用しない場合、すべてのファイルは、指定されたプレフィックスを含む名前の OSS ディレクトリに配信されます。

  • ossBucket: OSS バケットの名前。

  • ossEndpoint: OSS バケットが配置されているリージョンのエンドポイント。

  • ossStsRole: Tablestore サービスロールの Alibaba Cloud Resource Name (ARN)。

  • format: 配信されたデータが保存されるフォーマット。デフォルト値: Parquet。

    デフォルトでは、PLAIN を使用してすべてのタイプのデータが配信用にエンコードされます。

    現在、Parquet のみがサポートされています。このパラメーターを指定する必要はありません。

  • eventTimeColumn: イベント時間列。このパラメーターは、列のデータの時刻に基づいてデータをパーティション化するように指定します。このパラメーターの値は、列の名前とフォーマット (EventTimeFormat) で構成されます。EventTimeFormat パラメーターの有効な値: RFC822、RFC850、RFC1123、RFC3339、および Unix。要件に基づいてフォーマットを指定します。

    eventTimeColumn パラメーターを指定しない場合、データは Tablestore に書き込まれた時刻に基づいてパーティション化されます。

  • parquetSchema: 配信するフィールド。このパラメーターの値は、ソースフィールド、デスティネーションフィールド、およびデスティネーションフィールドタイプで構成されます。このパラメーターを指定する必要があります。

    スキーマでは、ソースフィールドとデスティネーションフィールドの名前、およびソースフィールドを配信する順序を指定できます。データが OSS に配信されると、データはスキーマのフィールドの順序に基づいて分散されます。

    重要

    ソースフィールドのタイプは、デスティネーションフィールドのタイプと一致する必要があります。一致しない場合、フィールドはダーティデータとして破棄されます。詳細については、「データ型マッピング」をご参照ください。

taskType

配信タスクのタイプ。デフォルト値: BASE_INC。有効な値:

  • INC: 増分データ配信。増分データのみが同期されます。

  • BASE: 完全データ配信。テーブル内のすべてのデータがスキャンされ、同期されます。

  • BASE_INC: 差分データ配信。完全データが同期された後、Tablestore は増分データを同期します。

    Tablestore が増分データを同期すると、データが最後に配信された時刻と配信タスクの現在のステータスを表示できます。

Tablestore SDK を使用する

Tablestore SDK for JavaTablestore SDK for Go を使用して、OSS にデータを配信できます。この例では、Tablestore SDK for Java を使用します。

次のサンプルコードは、データテーブルの配信タスクを作成する方法の例を示しています。

import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {

        public static void main(String[] args) {
            // インスタンス名を指定します。
            final String instanceName = "yourInstanceName";
            // インスタンスのエンドポイントを指定します。
            final String endPoint = "yourEndpoint";
            // 環境変数から AccessKey ID と AccessKey シークレットを取得します。
            final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");            
            final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");

            SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
            try {
                createDeliveryTask(client);
                System.out.println("end");
            } catch (TableStoreException e) {
                System.err.println("操作に失敗しました。詳細: " + e.getMessage() + e.getErrorCode() + e.toString());
                System.err.println("リクエスト ID: " + e.getRequestId());
            } catch (ClientException e) {
                System.err.println("リクエストに失敗しました。詳細: " + e.getMessage());
            } finally {
                client.shutdown();
            }
        }

        private static void createDeliveryTask(SyncClient client){
            String tableName = "sampleTable";
            String taskName = "sampledeliverytask";
            OSSTaskConfig taskConfig = new OSSTaskConfig();
            taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
            taskConfig.setOssBucket("datadeliverytest");
            taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
            taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
            // eventColumn パラメーターはオプションです。このパラメーターを指定すると、このパラメーターで指定された列のデータの時刻に基づいてデータがパーティション化されます。このパラメーターを指定しない場合、データは Tablestore に書き込まれた時刻に基づいてパーティション化されます。
            EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
            taskConfig.setEventTimeColumn(eventColumn);
            taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
            taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
            taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
            CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
            request.setTableName(tableName);
            request.setTaskName(taskName);
            request.setTaskConfig(taskConfig);
            request.setTaskType(DeliveryTaskType.BASE_INC);
            CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
            System.out.println("リクエスト ID: "+ response.getRequestId());
            System.out.println("トレース ID: " + response.getTraceId());
            System.out.println("配信タスクの作成に成功しました");
        }
}