If you do not have requirements on the order of query results, you can use the parallel scan feature to obtain query results in an efficient manner.
Tablestore SDK for Java V5.6.0 or later supports the parallel scan feature. Before you use the parallel scan feature, make sure that you obtain a correct version of Tablestore SDK for Java. For more information about the version history of Tablestore SDK for Java, see Version history of Tablestore SDK for Java.
Background information
The search index feature allows you to call the Search operation to use all query features and analytics capabilities such as sorting and aggregation. The query results are returned from the Search operation in a specific order.
In some cases, for example, if you connect Tablestore to a computing environment such as Spark or Presto or you want to query a specific group of objects, a faster query speed may be more important than the order of query results. To improve query speeds, Tablestore provides the ParallelScan operation for the search index feature.
Compared with the Search operation, the ParallelScan operation supports all query features but does not provide analytics capabilities such as sorting and aggregation. This way, query speeds are improved by more than five times. You can call the ParallelScan operation to export hundreds of millions of data rows within a minute. The capability to export data can be horizontally scaled without upper limits.
The maximum number of rows that can be returned by each ParallelScan call is greater than the maximum number of rows that can be returned by each Search call. The Search operation returns up to 100 rows per call, whereas the ParallelScan operation returns up to 2,000 rows per call. The parallel scan feature allows you to use multiple threads to initiate requests in a session in parallel, which accelerates data export.
Scenarios
If you want to sort or aggregate query results, or the query request is sent from an end user, use the Search operation
If you do not need to sort query results and want to return all matched results in an efficient manner, or the data is pulled by a computing environment such as Spark or Presto, use the ParallelScan operation.
Features
The following items describe the differences between the ParallelScan operation and the Search operation.
Stable results
Parallel scan tasks are stateful. In a session, the result set of the scanned data is determined by the data status when the first request is initiated. If data is inserted or modified after the first request is sent, the result set is not affected.
Sessions
ImportantIf the session ID is difficult to obtain, you can call the ParallelScan operation to initiate a request without a session ID. However, if you send a request without a session ID, duplicate data may occur in the obtained result set at a very low probability.
Parallel scan-related operations use sessions. The session ID can be used to determine the result set of scanned data. The following process describes how to obtain and use a session ID:
Call the ComputeSplits operation to query the maximum number of parallel scan tasks and the current session ID.
Initiate multiple parallel scan requests to read data. You must specify the current session ID and the parallel scan task IDs in these requests.
Tablestore returns the OTSSessionExpired error code when network exceptions, thread exceptions, dynamic modifications on schemas, or index switchovers occur in the parallel scan process and data scans stop. In these cases, you must initiate another parallel scan task to scan data again.
Parallel scan tasks that have the same session ID and the same ScanQuery parameter value are considered one task. A parallel scan task starts from the time when you send the first ParallelScan request, and ends when all data is scanned or the token expires.
Maximum number of parallel scan tasks in a single request
The maximum number of parallel scan tasks in a single request supported by the ParallelScan operation is determined by the response of the ComputeSplits request. A larger volume of data requires more parallel scan tasks in a session.
A single request is specified by one query statement. For example, if you use the Search operation to query results in which the value of city is Hangzhou, all data that matches this condition is returned in the result. However, if you use the ParallelScan operation and the number of parallel scan tasks in a session is 2, each ParallelScan request returns half of the results. The complete result set consists of the two parallel result sets.
Performance
The query speed of a ParallelScan request that includes a parallel scan task is five times faster than the query speed of a Search request. When you use the parallel scan feature, the query speed increases together with the number of parallel scan tasks in a session. For example, if eight parallel scan tasks are included in a session, the query speed can be improved by four times.
Cost
ParallelScan requests consume fewer resources and are offered at a lower price. To export large amounts of data, we recommend that you use the ParallelScan operation.
Limits
The maximum number of parallel scan tasks is 10. You can adjust this limit based on your business requirements.
Only existing columns can be returned from search indexes. However, the columns of the DATE and NESTED types cannot be returned.
The ParallelScan operation can return values of the ARRAY and GEOPOINT columns. However, the return values are formatted and may be different from the values that are written to the data table. For example, if you write [1,2, 3, 4] to an ARRAY column, the ParallelScan operation returns [1,2,3,4] as the value. If you write
10,50
to a GEOPOINT column, the ParallelScan operation returns10.0,50.0
as the value.You can set the ReturnType parameter to RETURN_ALL_INDEX or RETURN_SPECIFIED, but not to RETURN_ALL.
The maximum number of rows that can be returned by each ParallelScan call is specified by the limit parameter. The default value of the limit parameter is 2,000. If you specify a value greater than 2,000, the performance almost does not change with the increase of the limit.
API operations
You can call the following API operations to use the parallel scan feature:
ComputeSplits: You can call this operation to query the maximum number of parallel scan tasks for a single ParallelScan request.
ParallelScan: You can call this operation to export data.
Use Tablestore SDKs
You can use the following Tablestore SDKs to scan data in parallel:
Tablestore SDK for Java: Parallel scan
Tablestore SDK for Go: Parallel scan
Tablestore SDK for Python: Parallel scan
Tablestore SDK for Node.js: Parallel scan
Tablestore SDK for .NET: Parallel scan
Tablestore SDK for PHP: Parallel scan
Parameters
Parameter | Description | |
tableName | The name of the data table. | |
indexName | The name of the search index. | |
scanQuery | query | The query statement for the search index. The operation supports term query, fuzzy query, range query, geo query, and nested query, which are similar to those of the Search operation. |
limit | The maximum number of rows that can be returned by each ParallelScan call. | |
maxParallel | The maximum number of parallel scan tasks per request. The maximum number of parallel scan tasks per request varies based on the data volume. A larger volume of data requires more parallel scan tasks per request. You can use the ComputeSplits operation to query the maximum number of parallel scan tasks per request. | |
currentParallelId | The ID of the parallel scan task in the request. Valid values: [0, Value of maxParallel) | |
token | The token that is used to paginate query results. The results of the ParallelScan request contain the token for the next page. You can use the token to retrieve the next page. | |
aliveTime | The validity period of the current parallel scan task. This validity period is also the validity period of the token. Unit: seconds. Default value: 60. Unit: seconds. We recommend that you use the default value. If the next request is not initiated within the validity period, no more data can be queried. The validity time of the token is refreshed each time you send a request. Note Sessions expire ahead of time if switch indexes are dynamically changed in schemas, a single server fails, or a server-end load balancing is performed. In this case, you must recreate sessions. | |
columnsToGet | The name of the column to be returned in the grouping result. You can add the column name to Columns. If you want all columns to be returned in the search index, you can use the more concise ReturnAllFromIndex operation. Important ReturnAll cannot be used here. | |
sessionId | The session ID of the parallel scan task. You can call the ComputeSplits operation to create a session and query the maximum number of parallel scan tasks that are supported by the parallel scan request. |
Example
You can scan data by using a single thread or by using multiple threads at a time based on your business requirements.
Scan data by using a single thread
When you use parallel scan, the code for a request that uses a single thread is simpler than the code for a request that uses multiple threads. The currentParallelId and maxParallel parameters are not required for a request that uses a single thread. The ParallelScan request that uses a single thread provides higher throughput than the Search request. However, the ParallelScan request that uses a single thread provides lower throughput than the ParallelScan request that uses multiple threads.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
public class Test {
public static List<Row> scanQuery(final SyncClient client) {
String tableName = "<TableName>";
String indexName = "<IndexName>";
// Query the session ID and the maximum number of parallel scan tasks supported by the request.
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
byte[] sessionId = computeSplitsResponse.getSessionId();
int splitsSize = computeSplitsResponse.getSplitsSize();
/*
* Create a parallel scan request.
*/
ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
parallelScanRequest.setTableName(tableName);
parallelScanRequest.setIndexName(indexName);
ScanQuery scanQuery = new ScanQuery();
// This query determines the range of the data to scan. You can create a nested and complex query.
Query query = new MatchAllQuery();
scanQuery.setQuery(query);
// Specify the maximum number of rows that can be returned by each ParallelScan call.
scanQuery.setLimit(2000);
parallelScanRequest.setScanQuery(scanQuery);
ColumnsToGet columnsToGet = new ColumnsToGet();
columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
parallelScanRequest.setColumnsToGet(columnsToGet);
parallelScanRequest.setSessionId(sessionId);
/*
* Use builder to create a parallel scan request that has the same features as the preceding request.
*/
ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(2000)
.build())
.addColumnsToGet("col_1", "col_2")
.sessionId(sessionId)
.build();
List<Row> result = new ArrayList<>();
/*
* Use the native API operation to scan data.
*/
{
ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
// Query the token of ScanQuery for the next request.
byte[] nextToken = parallelScanResponse.getNextToken();
// Obtain the data.
List<Row> rows = parallelScanResponse.getRows();
result.addAll(rows);
while (nextToken != null) {
// Specify the token.
parallelScanRequest.getScanQuery().setToken(nextToken);
// Continue to scan the data.
parallelScanResponse = client.parallelScan(parallelScanRequest);
// Obtain the data.
rows = parallelScanResponse.getRows();
result.addAll(rows);
nextToken = parallelScanResponse.getNextToken();
}
}
/*
* Recommended method.
* Use an iterator to scan all matched data. This method has the same query speed but is easier to use compared with the previous method.
*/
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
result.add(row);
// Obtain the specific values.
String col_1 = row.getLatestColumn("col_1").getValue().asString();
long col_2 = row.getLatestColumn("col_2").getValue().asLong();
}
}
/*
* If the operation fails, retry the operation. If the caller of this function has a retry mechanism or if you do not want to retry the failed operation, you can ignore this part.
* To ensure availability, we recommend that you start a new parallel scan task when exceptions occur.
* The following exceptions may occur when you send a ParallelScan request:
* 1. A session exception occurs on the server side. The error code is OTSSessionExpired.
* 2. An exception such as a network exception occurs on the client side.
*/
try {
// Execute the processing logic.
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
// Process rows of data. If you have sufficient memory resources, you can add the rows to a list.
result.add(row);
}
}
} catch (Exception ex) {
// Retry the processing logic.
{
result.clear();
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
// Process rows of data. If you have sufficient memory resources, you can add the rows to a list.
result.add(row);
}
}
}
return result;
}
}
Scan data by using multiple threads
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
public class Test {
public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
// Query the number of CPU cores on the client.
final int cpuProcessors = Runtime.getRuntime().availableProcessors();
// Specify the number of parallel threads for the client. We recommend that you specify the number of CPU cores on the client as the number of parallel threads for the client to prevent impact on the query performance.
final Semaphore semaphore = new Semaphore(cpuProcessors);
// Query the session ID and the maximum number of parallel scan tasks supported by the request.
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
final byte[] sessionId = computeSplitsResponse.getSessionId();
final int maxParallel = computeSplitsResponse.getSplitsSize();
// Create an AtomicLong object if you need to obtain the row count for your business.
AtomicLong rowCount = new AtomicLong(0);
/*
* If you want to perform multithreading by using a function, you can build an internal class to inherit the threads.
* You can also build an external class to organize the code.
*/
final class ThreadForScanQuery extends Thread {
private final int currentParallelId;
private ThreadForScanQuery(int currentParallelId) {
this.currentParallelId = currentParallelId;
this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // Specify the thread name.
}
@Override
public void run() {
System.out.println("start thread:" + this.getName());
try {
// Execute the processing logic.
{
ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.range("col_long").lessThan(10_0000)) // Specify the data to query.
.limit(2000)
.currentParallelId(currentParallelId)
.maxParallel(maxParallel)
.build())
.addColumnsToGet("col_long", "col_keyword", "col_bool") // Specify the fields to return from the search index. To return all fields from the search index, set returnAllColumnsFromIndex to true.
//.returnAllColumnsFromIndex(true)
.sessionId(sessionId)
.build();
// Use an iterator to obtain all the data.
RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
long count = 0;
while (ltr.hasNext()) {
Row row = ltr.next();
// Add a custom processing logic. The following sample code shows how to add a custom processing logic to count the number of rows:
count++;
}
rowCount.addAndGet(count);
System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
}
} catch (Exception ex) {
// If exceptions occur, you can retry the processing logic.
} finally {
semaphore.release();
}
}
}
// Simultaneously execute threads. Valid values of currentParallelId: [0, Value of maxParallel).
List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
threadList.add(thread);
}
// Simultaneously initiate the threads.
for (ThreadForScanQuery thread : threadList) {
// Specify a value for semaphore to limit the number of threads that can be initiated at the same time to prevent bottlenecks on the client.
semaphore.acquire();
thread.start();
}
// The main thread is blocked until all threads are complete.
for (ThreadForScanQuery thread : threadList) {
thread.join();
}
System.out.println("all thread finished! total rows:" + rowCount.get());
}
}