当使用场景中不关心整个结果集的顺序时,您可以使用并发导出数据功能以更快的速度将命中的数据全部返回。
表格存储Java SDK从5.6.0版本开始支持并发导出数据功能。使用并发导出数据功能时,请确保获取了正确的Java SDK版本。关于Java SDK历史迭代版本的更多信息,请参见Java SDK历史迭代版本。
前提条件
已初始化OTSClient。具体操作,请参见初始化OTSClient。
已在数据表上创建多元索引。具体操作,请参见创建多元索引。
参数
参数 | 说明 | |
tableName | 数据表名称。 | |
indexName | 多元索引名称。 | |
scanQuery | query | 多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。 |
limit | 扫描数据时一次能返回的数据行数。 | |
maxParallel | 最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。 | |
currentParallelId | 当前并发ID。取值范围为[0, maxParallel)。 | |
token | 用于翻页功能。ParallelScan请求结果中有下一次进行翻页的token,使用该token可以接着上一次的结果继续读取数据。 | |
aliveTime | ParallelScan的当前任务有效时间,也是token的有效时间。默认值为60,建议使用默认值,单位为秒。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新token有效时间。 说明 动态修改schema中的切换索引、服务端单台机器故障、服务端负载均衡等均会导致Session提前过期,此时需要重新创建Session。 | |
columnsToGet | 指定分组结果中需要返回的列名,可以通过将列名加入Columns来实现。 如果需要返回多元索引中的所有列,则可以使用更简洁的ReturnAllFromIndex实现。 重要 此处不能使用ReturnAll。 | |
sessionId | 本次并发扫描数据任务的sessionId。创建Session可以通过ComputeSplits API来创建,同时获得本次任务支持的最大并发数。 |
示例
请根据实际选择单并发扫描数据和多线程并发扫描数据。
单并发扫描数据
相对于多并发扫描数据,单并发扫描数据的代码更简单,单并发代码无需关心currentParallelId和maxParallel参数。单并发使用方式的整体吞吐比Search接口方式高,但是比多线程多并发使用方式的吞吐低。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
public class Test {
public static List<Row> scanQuery(final SyncClient client) {
String tableName = "<TableName>";
String indexName = "<IndexName>";
//获取sessionId和本次请求支持的最大并发数。
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
byte[] sessionId = computeSplitsResponse.getSessionId();
int splitsSize = computeSplitsResponse.getSplitsSize();
/*
* 创建并发扫描数据请求。
*/
ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
parallelScanRequest.setTableName(tableName);
parallelScanRequest.setIndexName(indexName);
ScanQuery scanQuery = new ScanQuery();
//该query决定了扫描出的数据范围,可用于构建嵌套的复杂的query。
Query query = new MatchAllQuery();
scanQuery.setQuery(query);
//设置单次请求返回的数据行数。
scanQuery.setLimit(2000);
parallelScanRequest.setScanQuery(scanQuery);
ColumnsToGet columnsToGet = new ColumnsToGet();
columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
parallelScanRequest.setColumnsToGet(columnsToGet);
parallelScanRequest.setSessionId(sessionId);
/*
* 使用builder模式创建并发扫描数据请求,功能与前面一致。
*/
ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(2000)
.build())
.addColumnsToGet("col_1", "col_2")
.sessionId(sessionId)
.build();
List<Row> result = new ArrayList<>();
/*
* 使用原生的API扫描数据。
*/
{
ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
//下次请求的ScanQuery的token。
byte[] nextToken = parallelScanResponse.getNextToken();
//获取数据。
List<Row> rows = parallelScanResponse.getRows();
result.addAll(rows);
while (nextToken != null) {
//设置token。
parallelScanRequest.getScanQuery().setToken(nextToken);
//继续扫描数据。
parallelScanResponse = client.parallelScan(parallelScanRequest);
//获取数据。
rows = parallelScanResponse.getRows();
result.addAll(rows);
nextToken = parallelScanResponse.getNextToken();
}
}
/*
* 推荐方式。
* 使用iterator方式扫描所有匹配数据。使用方式上更简单,速度和前面方法一致。
*/
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
result.add(row);
//获取具体的值。
String col_1 = row.getLatestColumn("col_1").getValue().asString();
long col_2 = row.getLatestColumn("col_2").getValue().asLong();
}
}
/*
* 关于失败重试的问题,如果本函数的外部调用者有重试机制或者不需要考虑失败重试问题,可以忽略此部分内容。
* 为了保证可用性,遇到任何异常均推荐进行任务级别的重试,重新开始一个新的ParallelScan任务。
* 异常分为如下两种:
* 1、服务端Session异常OTSSessionExpired。
* 2、调用者客户端网络等异常。
*/
try {
//正常处理逻辑。
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
//处理row,内存足够大时可直接放到list中。
result.add(row);
}
}
} catch (Exception ex) {
//重试。
{
result.clear();
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
//处理row,内存足够大时可直接放到list中。
result.add(row);
}
}
}
return result;
}
}
多线程并发扫描数据
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
public class Test {
public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
// 获取机器的CPU数量。
final int cpuProcessors = Runtime.getRuntime().availableProcessors();
// 指定客户端多线程的并发数量。建议和客户端的CPU核数一致,避免客户端压力太大,影响查询性能。
final Semaphore semaphore = new Semaphore(cpuProcessors);
// 获取sessionId和本次请求支持的最大并发数。
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
final byte[] sessionId = computeSplitsResponse.getSessionId();
final int maxParallel = computeSplitsResponse.getSplitsSize();
// 业务统计行数使用。
AtomicLong rowCount = new AtomicLong(0);
/*
* 为了使用一个函数实现多线程功能,此处构建一个内部类继承Thread来使用多线程。
* 您也可以构建一个正常的外部类,使代码更有条理。
*/
final class ThreadForScanQuery extends Thread {
private final int currentParallelId;
private ThreadForScanQuery(int currentParallelId) {
this.currentParallelId = currentParallelId;
this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // 设置线程名称。
}
@Override
public void run() {
System.out.println("start thread:" + this.getName());
try {
// 正常处理逻辑。
{
ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此处的query决定了获取什么数据。
.limit(2000)
.currentParallelId(currentParallelId)
.maxParallel(maxParallel)
.build())
.addColumnsToGet("col_long", "col_keyword", "col_bool") // 设置要返回的多元索引中的部分字段,或者使用下行注释的内容获取多元索引中全部数据。
//.returnAllColumnsFromIndex(true)
.sessionId(sessionId)
.build();
// 使用Iterator形式获取所有数据。
RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
long count = 0;
while (ltr.hasNext()) {
Row row = ltr.next();
// 增加自定义的处理逻辑,此处代码以统计行数为例介绍。
count++;
}
rowCount.addAndGet(count);
System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
}
} catch (Exception ex) {
// 如果有异常,此处需要考虑重试正常处理逻辑。
} finally {
semaphore.release();
}
}
}
// 多个线程同时执行,currentParallelId取值范围为[0, maxParallel)。
List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
threadList.add(thread);
}
// 同时启动。
for (ThreadForScanQuery thread : threadList) {
// 利用semaphore限制同时启动的线程数量,避免客户端瓶颈。
semaphore.acquire();
thread.start();
}
// 主线程阻塞等待所有线程完成任务。
for (ThreadForScanQuery thread : threadList) {
thread.join();
}
System.out.println("all thread finished! total rows:" + rowCount.get());
}
}