すべてのプロダクト
Search
ドキュメントセンター

:TableStoreWriter を使用してデータを並行して書き込む

最終更新日:Dec 28, 2024

このトピックでは、TableStoreWriter を使用して、200 行を超えるデータを高並列で Tablestore に書き込む方法について説明します。

使用上の注意

複数のスレッドで同じ TableStoreWriter オブジェクトを共有することをお勧めします。

前提条件

  • Tablestore インスタンスが作成されていること。詳細については、インスタンスの作成を参照してください。

  • Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ペアが作成されていること。詳細については、AccessKey ペアの作成を参照してください。

手順

ステップ 1: TableStoreWriter をインポートする

TableStoreWriter は、Tablestore SDK for Java によって提供されるツールクラスです。 Tablestore SDK for Java をインストールすることで、TableStoreWriter をインポートできます。詳細については、Tablestore SDK for Java のインストールを参照してください。

ステップ 2: TableStoreWriter を初期化する

TableStoreWriter を初期化する際には、インスタンスとテーブルに関する情報、認証情報、および TableStoreWriter 関連の設定(TableStoreWriter がバッファ内のデータを Tablestore に書き込むために使用する並列リクエストの最大数や、バッチ書き込みリクエストで Tablestore に書き込むことができる行の最大数など)を指定する必要があります。

説明

TableStoreWriter に関連するパラメータの詳細については、TableStoreWriter パラメータを参照してください。

次のサンプルコードは、TableStoreWriter を初期化する方法の例を示しています。

private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * ほとんどの場合、TableStoreWriter パラメータのデフォルト設定を保持できます。ビジネス要件に基づいて、TableStoreWriter パラメータのカスタム設定を指定することもできます。
     * パラメータの詳細については、「TableStoreWriter パラメータ」のトピックを参照してください。
     * */
    WriterConfig config = new WriterConfig();
    // バッチ書き込みリクエストで Tablestore に書き込むことができる行の最大数を指定します。デフォルト値:200。
    config.setMaxBatchRowsCount(200); 
    // TableStoreWriter がバッファ内のデータを Tablestore に書き込むために使用する並列リクエストの最大数を指定します。デフォルト値:10。デフォルト設定を保持することをお勧めします。                          
    config.setConcurrency(10);    
            
    /**
     * 行レベルのコールバックを指定します。
     * この例では、コールバックは、Tablestore に書き込まれた行数と Tablestore への書き込みに失敗した行数の統計を収集するように設定されています。
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** アクセス認証情報を設定します。   **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * 使いやすく、初期化と解放のロジックを分離できる組み込みのスレッドプールとクライアントを使用することをお勧めします。
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

ステップ 3: 書き込みリクエストを作成して実行する

追加、削除、変更の各操作に基づいて RowChange を作成します。次に、RowChange を TableStoreWriter に追加します。

説明

enableSchemaCheck が有効になっている場合、TableStoreWriter は行データがバッファにフラッシュされる前に、行データに対して次のチェックを実行します。

  • 行のプライマリキーのスキーマが、テーブルに定義されているスキーマと同じであるかどうかを確認します。

  • 行の各プライマリキー列または属性列の値のサイズが上限を超えていないかどうかを確認します。

  • 行の属性列の数が上限を超えていないかどうかを確認します。

  • 属性列の名前が行のプライマリキー列の名前と同じではないかどうかを確認します。

  • 行のサイズが、リクエストを使用して同時にインポートできる最大データ量を超えていないかどうかを確認します。

行データが上記のチェックに失敗した場合、TableStoreWriter は行データをダーティデータと判断します。ダーティデータはコールバックをトリガーしません。

ビジネス要件に基づいて、一度に 1 行のデータまたは複数行のデータを書き込むことによって、データを書き込むことができます。

一度に 1 行のデータを書き込む

次のサンプルコードは、一度に 1 行のデータを書き込むことによって、1,000 行のデータをデータテーブルに書き込む方法の例を示しています。

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Single Row With Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                // プライマリキー列 "pk_0" を追加 (文字列型)
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                // プライマリキー列 "pk_1" を追加 (文字列型)
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                // プライマリキー列 "pk_2" を追加 (整数型)
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Write thread finished.");
    // バッファ内のデータを Tablestore にフラッシュします。TableStoreWriter は、flushInterval パラメータと maxBatchSize パラメータに基づいて、バッファ内のデータを Tablestore にフラッシュするタイミングを決定します。flushInterval パラメータは、バッファ内のデータが Tablestore にフラッシュされる間隔を指定します。maxBatchSize パラメータは、バッファ内のデータ量に基づいてバッファ内のデータを Tablestore にフラッシュするかどうかを指定します。
    writer.flush();
    
    // Future オブジェクトを表示します。
    // printFutureResult(futures);

    System.out.println("=========================================================[Finish]");
}

一度に複数行のデータを書き込む

次のサンプルコードは、一度に複数行のデータを書き込むことによって、1,000 行のデータをデータテーブルに書き込む方法の例を示しています。

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Row List With Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                // プライマリキー列 "pk_0" を追加 (文字列型)
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                // プライマリキー列 "pk_1" を追加 (文字列型)
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                // プライマリキー列 "pk_2" を追加 (整数型)
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Write thread finished.");
    // バッファ内のデータを Tablestore にフラッシュします。TableStoreWriter は、flushInterval パラメータと maxBatchSize パラメータに基づいて、バッファ内のデータを Tablestore にフラッシュするタイミングを決定します。flushInterval パラメータは、バッファ内のデータが Tablestore にフラッシュされる間隔を指定します。maxBatchSize パラメータは、バッファ内のデータ量に基づいてバッファ内のデータを Tablestore にフラッシュするかどうかを指定します。
    writer.flush();
    
    // Future オブジェクトを表示します。
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Finish]");
}

ステップ 4: TableStoreWriter をシャットダウンする

アプリケーションを終了する前に、TableStoreWriter を手動でシャットダウンする必要があります。TableStoreWriter をシャットダウンする前に、システムはバッファ内のすべてのデータを Tablestore にフラッシュします。

重要

TableStoreWriter のシャットダウン中またはシャットダウン後に addRowChange 操作が呼び出されてバッファにデータが書き込まれた場合、データは Tablestore に書き込まれない可能性があります。

// TableStoreWriter を事前にシャットダウンします。キュー内のすべてのデータが Tablestore に書き込まれた後、システムはクライアントと内部スレッドプールを自動的にシャットダウンします。
writer.close();

完全なサンプルコード

次のサンプルコードは、データテーブルを作成し、テーブルにデータを並行して書き込む方法の例を示しています。

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    private static String endpoint = "<エンドポイント>"; // インスタンスのエンドポイントを指定します
    private static String instanceName = "<インスタンス名>"; // インスタンス名を指定します
    private static String accessKeyId = System.getenv("OTS_AK_ENV"); // AccessKey ID を指定します
    private static String accessKeySecret = System.getenv("OTS_SK_ENV"); // AccessKey Secret を指定します
    private static String tableName = "<テーブル名>"; // テーブル名を指定します

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * TableStoreWriter を使用する前に、テーブルが既に存在することを確認してください。
         * 1. TableStoreWriter は、テーブルが存在するかどうかを確認します。
         * 2. 書き込むデータのフィールドとフィールドタイプが、テーブルのフィールドとフィールドタイプと同じであるかどうかを確認します。
         * */
        sample.tryCreateTable();

        /**
         * 初期化には DefaultTablestoreWriter を使用することをお勧めします。
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // インスタンスのエンドポイント。
         *      ServiceCredentials credentials,                                    // アクセス認証情報(AccessKey ペアとトークンを含む)。
         *      String instanceName,                                               // インスタンス名。
         *      String tableName,                                                  // テーブル名。TableStoreWriter は 1 つのテーブルに対してのみ指定されます。
         *      WriterConfig config,                                               // TableStoreWriter の設定。
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // 行レベルのコールバックを設定します。
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * Future オブジェクトでは、一度に 1 行のデータを書き込む方法が使用されます。
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * Future オブジェクトでは、一度に複数行のデータを書き込む方法が使用されます。
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());

        /**
         * TableStoreWriter を事前にシャットダウンします。キュー内のすべてのデータが Tablestore に書き込まれた後、システムはクライアントと内部スレッドプールを自動的にシャットダウンします。
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // バッチ書き込みリクエストで Tablestore に書き込むことができる行の最大数を指定します。デフォルト値:200。バッチ書き込みリクエストで 200 行を超えるデータを書き込む場合は、このパラメータの値を増やします。
        config.setMaxBatchRowsCount(200); 
        // TableStoreWriter がバッファ内のデータを Tablestore に書き込むために使用する並列リクエストの最大数を指定します。デフォルト値:10。デフォルト設定を保持することをお勧めします。                          
        config.setConcurrency(10);                                   

        /**
         * 行レベルのコールバックを指定します。
         * この例では、コールバックは、Tablestore に書き込まれた行数と Tablestore への書き込みに失敗した行数の統計を収集するように設定されています。
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * 使いやすく、初期化と解放のロジックを分離できる組み込みのスレッドプールとクライアントを使用することをお勧めします。
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Single Row With Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                // プライマリキー列 "pk_0" を追加 (文字列型)
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                // プライマリキー列 "pk_1" を追加 (文字列型)
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                // プライマリキー列 "pk_2" を追加 (整数型)
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Write thread finished.");
        writer.flush();
        // Future オブジェクトを表示します。
        // printFutureResult(futures);

        System.out.println("=========================================================[Finish]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Row List With Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    // プライマリキー列 "pk_0" を追加 (文字列型)
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    // プライマリキー列 "pk_1" を追加 (文字列型)
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    // プライマリキー列 "pk_2" を追加 (整数型)
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Write thread finished.");
    writer.flush();
    // Future オブジェクトを表示します。
    // printFutureResult(futures);
    System.out.println("=========================================================[Finish]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

この例では、次の結果が返されます。

=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

課金ルール

TableStoreWriter を使用して Tablestore にデータを書き込む場合、Tablestore へのデータ書き込みとデータストレージの料金が発生します。詳細については、課金概要を参照してください。

FAQ

Tablestore SDK for Java を使用して Tablestore データテーブルにデータを書き込むときに、「属性列の数が最大値 128 を超えています」というエラーが報告された場合はどうすればよいですか?

参考資料

TableStoreWriter のシナリオとアーキテクチャについては、TableStoreWriter の概要を参照してください。