全部產品
Search
文件中心

Tablestore:使用TableStoreWriter並發寫入資料

更新時間:Dec 06, 2024

本文將引導您使用Table Store的TableStoreWriter介面實現並發寫入資料,突破批量寫200條資料的限制。

注意事項

使用多線程時,建議共用一個TableStoreWriter對象。

準備工作

  • 已建立Table Store執行個體。具體操作,請參見建立執行個體

  • 已為阿里雲帳號或者RAM使用者建立AccessKey。具體操作,請參見建立AccessKey

操作步驟

步驟一:引入TableStoreWriter工具類

TableStoreWriter是Table StoreJava SDK提供的工具類。通過安裝Table StoreJava SDK即可引入TableStoreWriter工具類。具體操作,請參見安裝Java SDK

步驟二:初始化TableStoreWriter

初始化TableStoreWriter時,您需要配置執行個體和表資訊、授權資訊以及TableStoreWriter的相關配置(例如最大並發數、單次批量請求匯入的最大行數等)。

說明

關於TableStoreWriter相關配置的更多資訊,請參見TableStoreWriter配置項

TableStoreWriter的初始化樣本如下:

private static TableStoreWriter createTablesStoreWriter() {
    
    /**
     * 一般情況下保持預設配置即可,您也可以按需自訂 TableStoreWriter 配置。
     * 更多參數說明請參見“配置 TableStoreWriter”文檔。
     * */
    WriterConfig config = new WriterConfig();
    // 配置一次大量匯入的行數上限,預設值為 200。
    config.setMaxBatchRowsCount(200); 
    // 配置最大並發數,預設值為 10。建議保持預設。                         
    config.setConcurrency(10);    
            
    /**
     * 自訂行層級 Callback。
     * 該樣本通過成功、失敗計數,簡單展示回調能力。
     * */
    TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
        @Override
        public void onCompleted(RowChange rowChange, RowWriteResult cc) {
            succeedRows.incrementAndGet();
        }

        @Override
        public void onFailed(RowChange rowChange, Exception ex) {
            failedRows.incrementAndGet();
        }
    };

    /** 配置訪問憑證。 **/
    ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);

    /**
     * 推薦使用內部構建的線程池與 Client,方便使用者使用,隔離初始化和釋放的邏輯。
     * */
    DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
        endpoint, credentials, instanceName, tableName, config, resultCallback);

    return writer;
}

步驟三:構造寫入請求並執行

根據不同的增刪改操作需要構造RowChange,然後將RowChange添加到TableStoreWriter中。

實際使用時,只需使用單行寫入或者批量寫入中的一種寫入方式即可。

單行寫入資料

以下樣本用於以單行寫入方式並發寫入1000行資料到資料表。

public void writeSingleRowWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Single Row With Future");
    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
        futures.add(future);
    }

    System.out.println("Write thread finished.");
    // 對緩衝區中的資料進行 flush。TableStoreWriter也會根據flushInterval和maxBatchSize的配置決定緩衝區的flush時機。其中flushInterval是根據時間定期進行flush,maxBatchSize是根據緩衝區的資料量決定是否進行flush。
    writer.flush();
    
    // 列印Future過程。
    // printFutureResult(futures);

    System.out.println("=========================================================[Finish]");
}

批量寫入資料

以下樣本用於以批量寫入方式並發寫入1000行資料到資料表。

public void writeRowListWithFuture(TableStoreWriter writer) {
    System.out.println("=========================================================[Start]");
    System.out.println("Write Row List With Future");

    int rowsCount = 1000;
    int columnsCount = 10;
    String strValue = "1234567890";
    AtomicLong rowIndex = new AtomicLong(-1);

    List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
    List<RowChange> rowChanges = new LinkedList<RowChange>();
    for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

        PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

        RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
        for (int j = 0; j < columnsCount; j++) {
            rowChange.put("column_" + j, ColumnValue.fromString(strValue));
        }
        rowChange.put("index", ColumnValue.fromLong(index));
        rowChanges.add(rowChange);
        if (Math.random() > 0.995 || index == rowsCount - 1) {
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
            futures.add(future);
            rowChanges.clear();
        }
    }

    System.out.println("Write thread finished.");
    // 對緩衝區中的資料進行 flush。TableStoreWriter也會根據flushInterval和maxBatchSize的配置決定緩衝區的flush時機。其中flushInterval是根據時間定期進行flush,maxBatchSize是根據緩衝區的資料量決定是否進行flush。
    writer.flush();
    
    // 列印Future過程。
    // printFutureResult(futures);
    
    System.out.println("=========================================================[Finish]");
}

步驟四:關閉TableStoreWriter

退出應用程式前,您需要手動關閉TableStoreWriter。在關閉TableStoreWriter時,系統會先flush掉緩衝區中的所有資料。

重要

如果在關閉過程中或者關閉之後仍然調用addRowChange介面向緩衝區中寫入資料,則該部分資料不保證會寫入Table Store。

// 主動關閉Writer,內部等候所有隊列資料寫入後,自動關閉client與內部的線程池。
writer.close();

完整範例程式碼

以下樣本用於建立一張新的資料表,並通過並發寫入的方式將資料寫入到資料表中。

import com.alicloud.openservices.tablestore.DefaultTableStoreWriter;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.TableStoreWriter;
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentials;
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
import com.alicloud.openservices.tablestore.model.*;
import com.alicloud.openservices.tablestore.writer.RowWriteResult;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.alicloud.openservices.tablestore.writer.WriterResult;

import com.aliyuncs.exceptions.ClientException;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.commons.codec.digest.DigestUtils.md5Hex;

public class TableStoreWriterDemo {

    private static String endpoint = "<ENDPOINT>";
    private static String instanceName = "<INSTANCE_NAME>";
    private static String accessKeyId = System.getenv("OTS_AK_ENV");
    private static String accessKeySecret = System.getenv("OTS_SK_ENV");
    private static String tableName = "<TABLE_NAME>";

    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ClientException {
        TableStoreWriterDemo sample = new TableStoreWriterDemo();

        /**
         * 使用Writer前確保表已存在。
         * 1、writer會校正表的存在性.
         * 2、校正寫入資料是否與表的欄位、類型一致。
         * */
        sample.tryCreateTable();

        /**
         * 初始化建議使用。
         * DefaultTableStoreWriter(
         *      String endpoint,                                                   // 執行個體的服務地址。
         *      ServiceCredentials credentials,                                    // 認證資訊:含 AK,也支援 token
         *      String instanceName,                                               // 執行個體名。
         *      String tableName,                                                  // 表名:一個 writer 僅針對一個表。
         *      WriterConfig config,                                               // writer 的配置。
         *      TableStoreCallback<RowChange, RowWriteResult> resultCallback       // 行層級回調。
         * )
         * */
        TableStoreWriter writer = sample.createTablesStoreWriter();

        /**
         * Future使用:單行寫
         * */
        sample.writeSingleRowWithFuture(writer);
        /**
         * Future使用:批量寫
         * */   
        //sample.writeRowListWithFuture(writer);

        System.out.println("Count by TablestoreCallback: failedRow=" + failedRows.get() + ", succeedRow=" + succeedRows.get());
        System.out.println("Count by WriterStatics: " + writer.getWriterStatistics());

        /**
         * 您需要主動關閉Writer,內部等候所有隊列資料寫入後,自動關閉 client 與內部的線程池。
         * */
        writer.close();
    }

    private static TableStoreWriter createTablesStoreWriter() {

        WriterConfig config = new WriterConfig();
        // 配置一次大量匯入的行數上限,預設值為 200。如果希望一次寫入超過 200 行資料,請調大該值。
        config.setMaxBatchRowsCount(200); 
        // 配置最大並發數,預設值為 10。建議保持預設即可。                         
        config.setConcurrency(10);                                   

        /**
         * 自訂的行層級 Callback。
         * 該樣本通過成功、失敗計數,簡單展示回調能力。
         * */
        TableStoreCallback<RowChange, RowWriteResult> resultCallback = new TableStoreCallback<RowChange, RowWriteResult>() {
            @Override
            public void onCompleted(RowChange rowChange, RowWriteResult cc) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(RowChange rowChange, Exception ex) {
                failedRows.incrementAndGet();
            }
        };

        ServiceCredentials credentials = new DefaultCredentials(accessKeyId, accessKeySecret);


        /**
         * 推薦使用內部構建的線程池與 client,方便使用者使用,隔離初始化和釋放的邏輯。
         * */
        DefaultTableStoreWriter writer = new DefaultTableStoreWriter(
                endpoint, credentials, instanceName, tableName, config, resultCallback);

        return writer;
    }


    private static void tryCreateTable() throws ClientException {
        SyncClient ots = new SyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);

        try {
            ots.deleteTable(new DeleteTableRequest(tableName));
        } catch (Exception e) {
        }

        TableMeta tableMeta = new TableMeta(tableName);
        tableMeta.addPrimaryKeyColumn("pk_0", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_1", PrimaryKeyType.STRING);
        tableMeta.addPrimaryKeyColumn("pk_2", PrimaryKeyType.INTEGER);
        TableOptions tableOptions = new TableOptions(-1, 1);
        CreateTableRequest request = new CreateTableRequest(
                tableMeta, tableOptions, new ReservedThroughput(new CapacityUnit(0, 0)));

        try {
            CreateTableResponse res = ots.createTable(request);
        } catch (Exception e) {
            throw new ClientException(e);
        } finally {
            ots.shutdown();
        }
    }

    public static void writeSingleRowWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Single Row With Future");
        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            Future<WriterResult> future = writer.addRowChangeWithFuture(rowChange);
            futures.add(future);
        }

        System.out.println("Write thread finished.");
        writer.flush();
        // 列印future過程。
        // printFutureResult(futures);

        System.out.println("=========================================================[Finish]");
    }
    
    public void writeRowListWithFuture(TableStoreWriter writer) {
        System.out.println("=========================================================[Start]");
        System.out.println("Write Row List With Future");

        int rowsCount = 1000;
        int columnsCount = 10;
        String strValue = "1234567890";
        AtomicLong rowIndex = new AtomicLong(-1);

        List<Future<WriterResult>> futures = new LinkedList<Future<WriterResult>>();
        List<RowChange> rowChanges = new LinkedList<RowChange>();
        for (long index = rowIndex.incrementAndGet(); index < rowsCount; index = rowIndex.incrementAndGet()) {

            PrimaryKey pk = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                    .addPrimaryKeyColumn("pk_0", PrimaryKeyValue.fromString(md5Hex(index + "")))
                    .addPrimaryKeyColumn("pk_1", PrimaryKeyValue.fromString("pk" + index))
                    .addPrimaryKeyColumn("pk_2", PrimaryKeyValue.fromLong(index % 5))
                    .build();

            RowUpdateChange rowChange = new RowUpdateChange(tableName, pk);
            for (int j = 0; j < columnsCount; j++) {
                rowChange.put("column_" + j, ColumnValue.fromString(strValue));
            }
            rowChange.put("index", ColumnValue.fromLong(index));
            rowChanges.add(rowChange);
            if (Math.random() > 0.995 || index == rowsCount - 1) {
                Future<WriterResult> future = writer.addRowChangeWithFuture(rowChanges);
                futures.add(future);
                rowChanges.clear();
            }
    }

    System.out.println("Write thread finished.");
    writer.flush();
    // 列印future過程。
    // printFutureResult(futures);
    System.out.println("=========================================================[Finish]");
    }


    private static void printFutureResult(List<Future<WriterResult>> futures) {
        int totalRow = 0;

        for (int index = 0; index < futures.size(); index++) {
            try {
                WriterResult result = futures.get(index).get();
                totalRow += result.getTotalCount();
                System.out.println(String.format(
                        "Future[%d] finished:\tfailed: %d\tsucceed: %d\tfutureBatch: %d\ttotalFinished: %d",
                        index, result.getFailedRows().size(), result.getSucceedRows().size(),
                        result.getTotalCount(), totalRow));

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

執行結果樣本如下:

=========================================================[Start]
Write Single Row With Future
Write thread finished.
=========================================================[Finish]
Count by TablestoreCallback: failedRow=0, succeedRow=1000
Count by WriterStatics: WriterStatistics: {
    totalRequestCount=6,
    totalRowsCount=1000,
    totalSucceedRowsCount=1000,
    totalFailedRowsCount=0,
    totalSingleRowRequestCount=0,
}

計費說明

通過TableStoreWriter寫入資料時會產生資料寫入和資料存放區費用。更多資訊,請參見計費概述

常見問題

使用Java SDK寫入資料時報錯:The count of attribute columns exceeds the maximum:128

相關文檔

如果您想瞭解TableStoreWriter工具類的應用情境、架構詳解等資訊,請參見TableStoreWriter介紹