All Products
Search
Document Center

Tablestore:Parallel scan

Last Updated:Aug 16, 2024

If you do not have requirements on the order of query results, you can use the parallel scan feature to obtain query results in an efficient manner.

Important

Tablestore SDK for Java V5.6.0 or later supports the parallel scan feature. Before you use the parallel scan feature, make sure that you obtain a correct version of Tablestore SDK for Java. For more information about the version history of Tablestore SDK for Java, see Version history of Tablestore SDK for Java.

Background information

The search index feature allows you to call the Search operation to use all query features and analytics capabilities such as sorting and aggregation. The query results are returned from the Search operation in a specific order.

In some cases, for example, if you connect Tablestore to a computing environment such as Spark or Presto or you want to query a specific group of objects, a faster query speed may be more important than the order of query results. To improve query speeds, Tablestore provides the ParallelScan operation for the search index feature.

Compared with the Search operation, the ParallelScan operation supports all query features but does not provide analytics capabilities such as sorting and aggregation. This way, query speeds are improved by more than five times. You can call the ParallelScan operation to export hundreds of millions of data rows within a minute. The capability to export data can be horizontally scaled without upper limits.

The maximum number of rows that can be returned by each ParallelScan call is greater than the maximum number of rows that can be returned by each Search call. The Search operation returns up to 100 rows per call, whereas the ParallelScan operation returns up to 2,000 rows per call. The parallel scan feature allows you to use multiple threads to initiate requests in a session in parallel, which accelerates data export.

Scenarios

  • If you want to sort or aggregate query results, or the query request is sent from an end user, use the Search operation

  • If you do not need to sort query results and want to return all matched results in an efficient manner, or the data is pulled by a computing environment such as Spark or Presto, use the ParallelScan operation.

Features

The following items describe the differences between the ParallelScan operation and the Search operation.

  • Stable results

    Parallel scan tasks are stateful. In a session, the result set of the scanned data is determined by the data status when the first request is initiated. If data is inserted or modified after the first request is sent, the result set is not affected.

  • Sessions

    Important

    If the session ID is difficult to obtain, you can call the ParallelScan operation to initiate a request without a session ID. However, if you send a request without a session ID, duplicate data may occur in the obtained result set at a very low probability.

    Parallel scan-related operations use sessions. The session ID can be used to determine the result set of scanned data. The following process describes how to obtain and use a session ID:

    1. Call the ComputeSplits operation to query the maximum number of parallel scan tasks and the current session ID.

    2. Initiate multiple parallel scan requests to read data. You must specify the current session ID and the parallel scan task IDs in these requests.

    Tablestore returns the OTSSessionExpired error code when network exceptions, thread exceptions, dynamic modifications on schemas, or index switchovers occur in the parallel scan process and data scans stop. In these cases, you must initiate another parallel scan task to scan data again.

    Parallel scan tasks that have the same session ID and the same ScanQuery parameter value are considered one task. A parallel scan task starts from the time when you send the first ParallelScan request, and ends when all data is scanned or the token expires.

  • Maximum number of parallel scan tasks in a single request

    The maximum number of parallel scan tasks in a single request supported by the ParallelScan operation is determined by the response of the ComputeSplits request. A larger volume of data requires more parallel scan tasks in a session.

    A single request is specified by one query statement. For example, if you use the Search operation to query results in which the value of city is Hangzhou, all data that matches this condition is returned in the result. However, if you use the ParallelScan operation and the number of parallel scan tasks in a session is 2, each ParallelScan request returns half of the results. The complete result set consists of the two parallel result sets.

  • Performance

    The query speed of a ParallelScan request that includes a parallel scan task is five times faster than the query speed of a Search request. When you use the parallel scan feature, the query speed increases together with the number of parallel scan tasks in a session. For example, if eight parallel scan tasks are included in a session, the query speed can be improved by four times.

  • Cost

    ParallelScan requests consume fewer resources and are offered at a lower price. To export large amounts of data, we recommend that you use the ParallelScan operation.

Limits

  • The maximum number of parallel scan tasks is 10. You can adjust this limit based on your business requirements.

  • Only existing columns can be returned from search indexes. However, the columns of the DATE and NESTED types cannot be returned.

    The ParallelScan operation can return values of the ARRAY and GEOPOINT columns. However, the return values are formatted and may be different from the values that are written to the data table. For example, if you write [1,2, 3, 4] to an ARRAY column, the ParallelScan operation returns [1,2,3,4] as the value. If you write 10,50 to a GEOPOINT column, the ParallelScan operation returns 10.0,50.0 as the value.

    You can set the ReturnType parameter to RETURN_ALL_INDEX or RETURN_SPECIFIED, but not to RETURN_ALL.

  • The maximum number of rows that can be returned by each ParallelScan call is specified by the limit parameter. The default value of the limit parameter is 2,000. If you specify a value greater than 2,000, the performance almost does not change with the increase of the limit.

API operations

You can call the following API operations to use the parallel scan feature:

  • ComputeSplits: You can call this operation to query the maximum number of parallel scan tasks for a single ParallelScan request.

  • ParallelScan: You can call this operation to export data.

Use Tablestore SDKs

You can use the following Tablestore SDKs to scan data in parallel:

Parameters

Parameter

Description

tableName

The name of the data table.

indexName

The name of the search index.

scanQuery

query

The query statement for the search index. The operation supports term query, fuzzy query, range query, geo query, and nested query, which are similar to those of the Search operation.

limit

The maximum number of rows that can be returned by each ParallelScan call.

maxParallel

The maximum number of parallel scan tasks per request. The maximum number of parallel scan tasks per request varies based on the data volume. A larger volume of data requires more parallel scan tasks per request. You can use the ComputeSplits operation to query the maximum number of parallel scan tasks per request.

currentParallelId

The ID of the parallel scan task in the request. Valid values: [0, Value of maxParallel)

token

The token that is used to paginate query results. The results of the ParallelScan request contain the token for the next page. You can use the token to retrieve the next page.

aliveTime

The validity period of the current parallel scan task. This validity period is also the validity period of the token. Unit: seconds. Default value: 60. Unit: seconds. We recommend that you use the default value. If the next request is not initiated within the validity period, no more data can be queried. The validity time of the token is refreshed each time you send a request.

Note

Sessions expire ahead of time if switch indexes are dynamically changed in schemas, a single server fails, or a server-end load balancing is performed. In this case, you must recreate sessions.

columnsToGet

The name of the column to be returned in the grouping result. You can add the column name to Columns.

If you want all columns to be returned in the search index, you can use the more concise ReturnAllFromIndex operation.

Important

ReturnAll cannot be used here.

sessionId

The session ID of the parallel scan task. You can call the ComputeSplits operation to create a session and query the maximum number of parallel scan tasks that are supported by the parallel scan request.

Example

You can scan data by using a single thread or by using multiple threads at a time based on your business requirements.

Scan data by using a single thread

When you use parallel scan, the code for a request that uses a single thread is simpler than the code for a request that uses multiple threads. The currentParallelId and maxParallel parameters are not required for a request that uses a single thread. The ParallelScan request that uses a single thread provides higher throughput than the Search request. However, the ParallelScan request that uses a single thread provides lower throughput than the ParallelScan request that uses multiple threads.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

public class Test {

    public static List<Row> scanQuery(final SyncClient client) {
        String tableName = "<TableName>";
        String indexName = "<IndexName>";
        // Query the session ID and the maximum number of parallel scan tasks supported by the request. 
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        byte[] sessionId = computeSplitsResponse.getSessionId();
        int splitsSize = computeSplitsResponse.getSplitsSize();
        /*
         * Create a parallel scan request. 
         */
        ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
        parallelScanRequest.setTableName(tableName);
        parallelScanRequest.setIndexName(indexName);
        ScanQuery scanQuery = new ScanQuery();
        // This query determines the range of the data to scan. You can create a nested and complex query. 
        Query query = new MatchAllQuery();
        scanQuery.setQuery(query);

        // Specify the maximum number of rows that can be returned by each ParallelScan call. 
        scanQuery.setLimit(2000);
        parallelScanRequest.setScanQuery(scanQuery);
        ColumnsToGet columnsToGet = new ColumnsToGet();
        columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
        parallelScanRequest.setColumnsToGet(columnsToGet);
        parallelScanRequest.setSessionId(sessionId);

        /*
         * Use builder to create a parallel scan request that has the same features as the preceding request. 
         */
        ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
            .tableName(tableName)
            .indexName(indexName)
            .scanQuery(ScanQuery.newBuilder()
                .query(QueryBuilders.matchAll())
                .limit(2000)
                .build())
            .addColumnsToGet("col_1", "col_2")
            .sessionId(sessionId)
            .build();
        List<Row> result = new ArrayList<>();

        /*
         * Use the native API operation to scan data. 
         */
        {
            ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
            // Query the token of ScanQuery for the next request. 
            byte[] nextToken = parallelScanResponse.getNextToken();
            // Obtain the data. 
            List<Row> rows = parallelScanResponse.getRows();
            result.addAll(rows);
            while (nextToken != null) {
                // Specify the token. 
                parallelScanRequest.getScanQuery().setToken(nextToken);
                // Continue to scan the data. 
                parallelScanResponse = client.parallelScan(parallelScanRequest);
                // Obtain the data. 
                rows = parallelScanResponse.getRows();
                result.addAll(rows);
                nextToken = parallelScanResponse.getNextToken();
            }
        }

        /*
         * Recommended method. 
         * Use an iterator to scan all matched data. This method has the same query speed but is easier to use compared with the previous method. 
         */
        {
            RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
            while (iterator.hasNext()) {
                Row row = iterator.next();
                result.add(row);
                // Obtain the specific values. 
                String col_1 = row.getLatestColumn("col_1").getValue().asString();
                long col_2 = row.getLatestColumn("col_2").getValue().asLong();
            }
        }

        /*
         * If the operation fails, retry the operation. If the caller of this function has a retry mechanism or if you do not want to retry the failed operation, you can ignore this part. 
         * To ensure availability, we recommend that you start a new parallel scan task when exceptions occur. 
         * The following exceptions may occur when you send a ParallelScan request:
         * 1. A session exception occurs on the server side. The error code is OTSSessionExpired. 
         * 2. An exception such as a network exception occurs on the client side. 
         */
        try {
            // Execute the processing logic. 
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    // Process rows of data. If you have sufficient memory resources, you can add the rows to a list. 
                    result.add(row);
                }
            }
        } catch (Exception ex) {
            // Retry the processing logic. 
            {
                result.clear();
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    // Process rows of data. If you have sufficient memory resources, you can add the rows to a list. 
                    result.add(row);
                }
            }
        }
        return result;
    }
}

Scan data by using multiple threads

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

public class Test {

    public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
        // Query the number of CPU cores on the client. 
        final int cpuProcessors = Runtime.getRuntime().availableProcessors();
        // Specify the number of parallel threads for the client. We recommend that you specify the number of CPU cores on the client as the number of parallel threads for the client to prevent impact on the query performance. 
        final Semaphore semaphore = new Semaphore(cpuProcessors);

        // Query the session ID and the maximum number of parallel scan tasks supported by the request. 
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        final byte[] sessionId = computeSplitsResponse.getSessionId();
        final int maxParallel = computeSplitsResponse.getSplitsSize();

        // Create an AtomicLong object if you need to obtain the row count for your business. 
        AtomicLong rowCount = new AtomicLong(0);
        /*
         * If you want to perform multithreading by using a function, you can build an internal class to inherit the threads. 
         * You can also build an external class to organize the code. 
         */
        final class ThreadForScanQuery extends Thread {
            private final int currentParallelId;

            private ThreadForScanQuery(int currentParallelId) {
                this.currentParallelId = currentParallelId;
                this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // Specify the thread name. 
            }

            @Override
            public void run() {
                System.out.println("start thread:" + this.getName());
                try {
                    // Execute the processing logic. 
                    {
                        ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                .tableName(tableName)
                                .indexName(indexName)
                                .scanQuery(ScanQuery.newBuilder()
                                        .query(QueryBuilders.range("col_long").lessThan(10_0000)) // Specify the data to query. 
                                        .limit(2000)
                                        .currentParallelId(currentParallelId)
                                        .maxParallel(maxParallel)
                                        .build())
                                .addColumnsToGet("col_long", "col_keyword", "col_bool")  // Specify the fields to return from the search index. To return all fields from the search index, set returnAllColumnsFromIndex to true. 
                                //.returnAllColumnsFromIndex(true)
                                .sessionId(sessionId)
                                .build();
                        // Use an iterator to obtain all the data. 
                        RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                        long count = 0;
                        while (ltr.hasNext()) {
                            Row row = ltr.next();
                            // Add a custom processing logic. The following sample code shows how to add a custom processing logic to count the number of rows: 
                            count++;
                        }
                        rowCount.addAndGet(count);
                        System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
                    }
                } catch (Exception ex) {
                    // If exceptions occur, you can retry the processing logic. 
                } finally {
                    semaphore.release();
                }
            }
        }

        // Simultaneously execute threads. Valid values of currentParallelId: [0, Value of maxParallel). 
        List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
        for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
            ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
            threadList.add(thread);
        }

        // Simultaneously initiate the threads. 
        for (ThreadForScanQuery thread : threadList) {
            // Specify a value for semaphore to limit the number of threads that can be initiated at the same time to prevent bottlenecks on the client. 
            semaphore.acquire();
            thread.start();
        }

        // The main thread is blocked until all threads are complete. 
        for (ThreadForScanQuery thread : threadList) {
            thread.join();
        }
        System.out.println("all thread finished! total rows:" + rowCount.get());
    }
}