すべてのプロダクト
Search
ドキュメントセンター

Tablestore:Tablestore SDK を使用して Tablestore データを OSS に配信する

最終更新日:Dec 28, 2024

Tablestore SDK を使用してデータを配信する前に、使用上の注意と操作について理解する必要があります。Tablestore コンソールで配信タスクを作成して、Tablestore テーブルから OSS バケットにデータを配信できます。

使用上の注意

  • データ配信は、中国 (杭州)、中国 (上海)、中国 (北京)、中国 (深セン)、中国 (張家口) リージョンでご利用いただけます。

  • Tablestore データの削除操作は、データの配信時には無視されます。Tablestore データを削除操作を実行しても、OSS に配信された Tablestore データは削除されません。

  • 配信タスクの作成時の初期化には最大 1 分かかります。

  • データが一定の速度で書き込まれる場合、レイテンシは 3 分以内です。データが同期される場合、P99 レイテンシは 10 分以内です。

    説明

    P99 レイテンシは、過去 10 秒間のリクエストのうち最も遅い 1% の平均レイテンシを示します。

前提条件

  • 以下の操作は、オブジェクトストレージサービス (OSS) コンソールで実行されます。

    OSS がアクティブ化されています。Tablestore インスタンスが存在するリージョンにバケットが作成されます。詳細については、OSS をアクティブ化する を参照してください。

    説明

    データ配信では、Tablestore インスタンスから、Tablestore インスタンスと同じリージョンにある OSS バケットにデータを配信できます。MaxCompute などの他のウェアハウスにデータを配信するには、チケットを送信してください。

  • 以下の操作は、Tablestore コンソールで実行されます。

    • Tablestore インスタンスのエンドポイントは、[インスタンス管理] ページの [インスタンスの詳細] タブから取得します。詳細については、エンドポイント を参照してください。

    • データテーブルが作成されます。詳細については、データテーブルの操作 を参照してください。

  • 以下の操作は、Resource Access Management (RAM) コンソールで実行されます。

    • RAM ユーザーが作成され、AliyunOTSFullAccess ポリシーが RAM ユーザーにアタッチされて、RAM ユーザーに Tablestore を管理する権限が付与されます。詳細については、RAM ユーザーの作成 および RAM ユーザーへの権限の付与 を参照してください。

      警告

      Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、リソースが潜在的なリスクにさらされます。操作を実行するには、RAM ユーザーの AccessKey ペアを使用することをお勧めします。これにより、Alibaba Cloud アカウントの AccessKey ペアの漏洩を防ぐことができます。

      RAM ユーザーの AccessKey ペアが作成されます。詳細については、AccessKey ペアの作成 を参照してください。

  • アクセス認証情報が構成されます。詳細については、アクセス認証情報の構成 を参照してください。

操作

操作

説明

CreateDeliveryTask

配信タスクを作成します。

ListDeliveryTask

データテーブルに作成されたすべての配信タスクに関する情報を一覧表示します。

DescribeDeliveryTask

配信タスクの詳細情報をクエリします。

DeleteDeliveryTask

配信タスクを削除します。

パラメーター

パラメーター

説明

tableName

データテーブルの名前。

taskName

配信タスクの名前。

名前は 3 ~ 16 文字で、小文字、数字、ハイフン (-) のみを含めることができます。名前は小文字または数字で始まり、小文字または数字で終わる必要があります。

taskConfig

配信タスクの構成。以下の内容が含まれます。

  • ossPrefix: OSS バケット内のディレクトリのプレフィックス。Tablestore データはこのディレクトリに配信されます。ディレクトリのパスは、次の時間変数をサポートしています: $yyyy、$MM、$dd、$HH、$mm。

    • パスで時間変数を使用する場合、OSS ディレクトリはデータが書き込まれた時刻に基づいて動的に生成されます。このようにして、データは hive パーティションの命名スタイルに基づいてパーティション化されます。OSS 内のオブジェクトは、時間に基づいて編成、パーティション化、および配布されます。

    • パスで時間変数を使用しない場合、すべてのファイルは、指定されたプレフィックスを含む名前の OSS ディレクトリに配信されます。

  • ossBucket: OSS バケットの名前。

  • ossEndpoint: OSSバケットが配置されているリージョンのエンドポイント。

  • ossStsRole: Tablestore サービスリンクロールの Alibaba Cloud リソースネーム (ARN)。

  • format: 配信されるデータの保存形式。デフォルト値: Parquet。

    デフォルトでは、配信されるすべてのタイプのデータのエンコードにPLAINが使用されます。

    現在、Parquetのみがサポートされています。このパラメータを指定する必要はありません。

  • eventTimeColumn: イベント時刻の列。このパラメータは、列のデータの時刻に基づいてデータがパーティション分割されることを指定します。このパラメータの値は、列の名前と形式 (EventTimeFormat) で構成されます。EventTimeFormat パラメータの有効な値: RFC822、RFC850、RFC1123、RFC3339、および Unix。要件に基づいて形式を指定してください。

    eventTimeColumn パラメータを指定しない場合、データは Tablestore に書き込まれた時刻に基づいてパーティション分割されます。

  • parquetSchema: 配信するフィールド。このパラメータの値は、ソースフィールド、宛先フィールド、および宛先フィールドの型で構成されます。このパラメータは必須です。

    スキーマには、ソースフィールドと宛先フィールドの名前、およびソースフィールドを配信する順序を指定できます。データがOSSに配信されると、スキーマ内のフィールドの順序に基づいてデータが配信されます。

    重要

    ソースフィールドの型は、宛先フィールドの型と一致する必要があります。一致しない場合、フィールドはダーティデータとして破棄されます。詳細については、データ型マッピングを参照してください。

taskType

配信タスクのタイプ。デフォルト値: BASE_INC。有効な値:

  • INC: 差分データ配信。差分データのみが同期されます。

  • BASE: フルデータ配信。テーブル内のすべてのデータがスキャンされ、同期されます。

  • BASE_INC: 差分データ配信。フルデータが同期された後、Tablestoreは差分データを同期します。

    Tablestoreが差分データを同期する場合、データが最後に配信された時刻と配信タスクの現在のステータスを確認できます。

Tablestore SDK の使用

Tablestore SDK for Java および Tablestore SDK for Go を使用して、OSS にデータを配信できます。この例では、Tablestore SDK for Java を使用します。

次のサンプルコードは、データテーブルの配信タスクを作成する方法の例を示しています。

import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {

        public static void main(String[] args) {
            final String endPoint = "https://yourinstancename.cn-hangzhou.ots.aliyuncs.com";

            final String accessKeyId = System.getenv("OTS_AK_ENV");
            
            final String accessKeySecret = System.getenv("OTS_SK_ENV");

            final String instanceName = "yourinstancename";

            SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
            try {
                createDeliveryTask(client);
                System.out.println("end");
            } catch (TableStoreException e) {
                System.err.println("The operation failed. Details:" + e.getMessage() + e.getErrorCode() + e.toString()); // 操作に失敗しました。詳細:
                System.err.println("Request ID:" + e.getRequestId()); // リクエストID:
            } catch (ClientException e) {
                System.err.println("The request failed. Details:" + e.getMessage()); // リクエストに失敗しました。詳細:
            } finally {
                client.shutdown();
            }
        }

        private static void createDeliveryTask(SyncClient client){
            String tableName = "sampleTable";
            String taskName = "sampledeliverytask";
            OSSTaskConfig taskConfig = new OSSTaskConfig();
            taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
            taskConfig.setOssBucket("datadeliverytest");
            taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
            taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
            // The eventColumn parameter is optional. If you specify this parameter, data is partitioned based on the time of the data in the column that is specified by this parameter. If you do not specify this parameter, data is partitioned based on the time at which the data is written to Tablestore.  // eventColumn パラメーターはオプションです。このパラメーターを指定すると、このパラメーターで指定された列のデータの時刻に基づいてデータがパーティション化されます。このパラメーターを指定しない場合、データは Tablestore に書き込まれた時刻に基づいてパーティション化されます。
            EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
            taskConfig.setEventTimeColumn(eventColumn);
            taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
            taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
            taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
            CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
            request.setTableName(tableName);
            request.setTaskName(taskName);
            request.setTaskConfig(taskConfig);
            request.setTaskType(DeliveryTaskType.BASE_INC);
            CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
            System.out.println("resquestID: "+ response.getRequestId()); // リクエストID:
            System.out.println("traceID: " + response.getTraceId()); // トレースID:
            System.out.println("create delivery task success"); // 配信タスクの作成に成功しました
        }
}