All Products
Search
Document Center

Tablestore:Use TableStoreWriter to concurrently write data

Last Updated:Dec 05, 2024

This topic describes how to use TableStoreWriter to write more than 200 rows of data to Tablestore with high concurrency.

Usage notes

We recommend that multiple threads share the same TableStoreWriter object.

Prerequisites

Procedure

Step 1: Import TableStoreWriter

TableStoreWriter is a tool class provided by Tablestore SDK for Java. You can import TableStoreWriter by installing Tablestore SDK for Java. For more information, see Install Tablestore SDK for Java.

Step 2: Initialize TableStoreWriter

When you initialize TableStoreWriter, you must specify information about instances and tables, authorization information, and TableStoreWriter-related configurations, such as the maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore and the maximum number of rows that can be written to Tablestore in a batch write request.

Note

For more information about the parameters related to TableStoreWriter, see TableStoreWriter parameters.

The following sample code provides an example on how to initialize TableStoreWriter:

private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * In most cases, you can retain the default settings for the TableStoreWriter parameters. You can also specify custom settings for the TableStoreWriter parameters based on your business requirements. 
     * For more information about the parameters, see the "TableStoreWriter parameters" topic. 
     * */
    WriterConfig config = new WriterConfig();
    // Specify the maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200. 
    config.setMaxBatchRowsCount(200); 
    // Specify the maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10. We recommend that you retain the default settings.                          
    config.setConcurrency(10);    
            
    /**
     * Specify row-level callbacks. 
     * In this example, callbacks are configured to collect statistics on the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore. 
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** Configure access credentials.   **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * We recommend that you use the built-in thread pools and client, which are easier to use and isolate the logic of initialization and release. 
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

Step 3: Construct and execute a write request

Construct a RowChange based on different add, delete, and modify operations. Then, add the RowChange to TableStoreWriter.

Note

If enableSchemaCheck is turned on, TableStoreWriter performs the following checks on the row data before the row data is flushed to the buffer:

  • Check whether the schema of the primary key of the row is the same as the schema defined for the table.

  • Check whether the value size of each primary key column or attribute column of the row exceeds the upper limit.

  • Check whether the number of attribute columns in the row exceeds the upper limit.

  • Check whether the name of an attribute column is the same as the name of a primary key column of the row.

  • Check whether the size of the row exceeds the maximum amount of data that can be imported at the same time by using a request.

If the row data fails the preceding checks, TableStoreWriter determines that the row data is dirty data. Dirty data does not trigger callbacks.

You can write data by writing a single row of data at a time or multiple rows at the same time based on your business requirements.

Write a single row of data at a time

The following sample code provides an example on how to write 1,000 rows of data to a data table by writing a single row of data at a time.

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Single Row With Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Write thread finished.");
    // Flush data in the buffer to Tablestore. TableStoreWriter determines the time to flush data in the buffer to Tablestore based on the flushInterval and maxBatchSize parameters. The flushInterval parameter specifies the interval at which data in the buffer is flushed to Tablestore. The maxBatchSize parameter specifies whether to flush data in the buffer to Tablestore based on the amount of data in the buffer. 
    writer.flush();
    
    // Display the Future object. 
    // printFutureResult(futures);

    System.out.println("=========================================================[Finish]");
}

Write multiple rows of data at the same time

The following sample code provides an example on how to write 1,000 rows to a data table by writing multiple rows of data at the same time.

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Row List With Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Write thread finished.");
    // Flush data in the buffer to Tablestore. TableStoreWriter determines the time to flush data in the buffer to Tablestore based on the flushInterval and maxBatchSize parameters. The flushInterval parameter specifies the interval at which data in the buffer is flushed to Tablestore. The maxBatchSize parameter specifies whether to flush data in the buffer to Tablestore based on the amount of data in the buffer. 
    writer.flush();
    
    // Display the Future object. 
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Finish]");
}

Step 4: Shut down TableStoreWriter

Before you exit the application, you must manually shut down TableStoreWriter. Before you shut down TableStoreWriter, the system flushes all data in the buffer to Tablestore.

Important

If the addRowChange operation is called to write data to the buffer while TableStoreWriter is being shut down or after you shut down TableStoreWriter, the data may not be written to Tablestore.

// Proactively shut down TableStoreWriter. After all data in the queues is written to Tablestore, the system automatically shuts down the client and the internal thread pools. 
writer.close();

Complete sample code

The following sample code provides an example on how to create a data table and concurrently write data to the table.

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    private static String endpoint = "<ENDPOINT>";
    private static String instanceName = "<INSTANCE_NAME>";
    private static String accessKeyId = System.getenv("OTS_AK_ENV");
    private static String accessKeySecret = System.getenv("OTS_SK_ENV");
    private static String tableName = "<TABLE_NAME>";

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * Make sure that the table already exists before you use TableStoreWriter. 
         * 1. TableStoreWriter verifies whether the table exists.
         * 2. Check whether the fields and field types of data that you want to write are the same as the fields and field types of the table. 
         * */
        sample.tryCreateTable();

        /**
         * We recommend that you use DefaultTablestoreWriter for initialization. 
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // The endpoint of the instance. 
         *      ServiceCredentials credentials,                                    // The access credentials, including the AccessKey pair and token.
         *      String instanceName,                                               // The instance name. 
         *      String tableName,                                                  // The table name. TableStoreWriter is specified only for one table. 
         *      WriterConfig config,                                               // The TableStoreWriter configurations. 
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // Configure row-level callbacks. 
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * The method of writing a single row of data at a time is used in the Future object.
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * The method of writing multiple rows of data at the same time is used in the Future object.
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());

        /**
         * Proactively shut down TableStoreWriter. After all data in the queues is written to Tablestore, the system automatically shuts down the client and the internal thread pools. 
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // Specify the maximum number of rows that can be written to Tablestore in a batch write request. Default value: 200. If you want to write more than 200 rows of data in a batch write request, increase the value of this parameter. 
        config.setMaxBatchRowsCount(200); 
        // Specify the maximum number of parallel requests that TableStoreWriter uses to write data in the buffer to Tablestore. Default value: 10. We recommend that you retain the default settings.                          
        config.setConcurrency(10);                                   

        /**
         * Specify row-level callbacks. 
         * In this example, callbacks are configured to collect statistics on the number of rows that are written to Tablestore and the number of rows that fail to be written to Tablestore. 
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * We recommend that you use the built-in thread pools and client, which are easier to use and isolate the logic of initialization and release. 
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Single Row With Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Write thread finished.");
        writer.flush();
        // Display the Future object. 
        // printFutureResult(futures);

        System.out.println("=========================================================[Finish]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Row List With Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Write thread finished.");
    writer.flush();
    // Display the Future object. 
    // printFutureResult(futures);
    System.out.println("=========================================================[Finish]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

The following result is returned in this example:

=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

Billing rules

When you use TableStoreWriter to write data to Tablestore, you are charged for writing data to Tablestore and data storage fees. For more information, see Billing overview.

FAQ

What do I do if the error "The count of attribute columns exceeds the maximum:128" is reported when I use Tablestore SDK for Java to write data to a Tablestore data table?

References

For information about the scenarios and architecture of TableStoreWriter, see TableStoreWriter overview.