當使用情境中不關心整個結果集的順序時,您可以使用並發匯出資料功能以更快的速度將命中的資料全部返回。
TablestoreJava SDK從5.6.0版本開始支援並發匯出資料功能。使用並發匯出資料功能時,請確保擷取了正確的Java SDK版本。關於Java SDK歷史迭代版本的更多資訊,請參見Java SDK歷史迭代版本。
前提條件
已初始化OTSClient。具體操作,請參見初始化OTSClient。
已在資料表上建立多元索引。具體操作,請參見建立多元索引。
參數
參數 | 說明 | |
tableName | 資料表名稱。 | |
indexName | 多元索引名稱。 | |
scanQuery | query | 多元索引的查詢語句。支援精確查詢、模糊查詢、範圍查詢、地理位置查詢、巢狀查詢等,功能和Search介面一致。 |
limit | 掃描資料時一次能返回的資料行數。 | |
maxParallel | 最大並發數。請求支援的最大並發數由使用者資料量決定。資料量越大,支援的並發數越多,每次任務前可以通過ComputeSplits API進行擷取。 | |
currentParallelId | 當前並發ID。取值範圍為[0, maxParallel)。 | |
token | 用於翻頁功能。ParallelScan請求結果中有下一次進行翻頁的token,使用該token可以接著上一次的結果繼續讀取資料。 | |
aliveTime | ParallelScan的當前任務有效時間,也是token的有效時間。預設值為60,建議使用預設值,單位為秒。如果在有效時間內沒有發起下一次請求,則不能繼續讀取資料。持續發起請求會重新整理token有效時間。 說明 由於服務端採用非同步方式清理到期任務,因此當前任務只保證在設定的有效時間內不會到期,但不能保證有效時間之後一定到期。 | |
columnsToGet | ParallelScan目前僅可以掃描多元索引中的資料,需要在建立多元索引時設定附加儲存(即store=true)。 | |
sessionId | 本次並發掃描資料任務的sessionId。建立Session可以通過ComputeSplits API來建立,同時獲得本次任務支援的最大並發數。 |
樣本
請根據實際選擇單並發掃描資料和多線程並發掃描資料。
單並發掃描資料
相對於多並發掃描資料,單並發掃描資料的代碼更簡單,單並發代碼無需關心currentParallelId和maxParallel參數。單並發使用方式的整體吞吐比Search介面方式高,但是比多線程多並發使用方式的吞吐低。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
public class Test {
public static List<Row> scanQuery(final SyncClient client) {
String tableName = "<TableName>";
String indexName = "<IndexName>";
//擷取sessionId和本次請求支援的最大並發數。
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
byte[] sessionId = computeSplitsResponse.getSessionId();
int splitsSize = computeSplitsResponse.getSplitsSize();
/*
* 建立並發掃描資料請求。
*/
ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
parallelScanRequest.setTableName(tableName);
parallelScanRequest.setIndexName(indexName);
ScanQuery scanQuery = new ScanQuery();
//該query決定了掃描出的資料範圍,可用於構建嵌套的複雜的query。
Query query = new MatchAllQuery();
scanQuery.setQuery(query);
//設定單次請求返回的資料行數。
scanQuery.setLimit(2000);
parallelScanRequest.setScanQuery(scanQuery);
ColumnsToGet columnsToGet = new ColumnsToGet();
columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
parallelScanRequest.setColumnsToGet(columnsToGet);
parallelScanRequest.setSessionId(sessionId);
/*
* 使用builder模式建立並發掃描資料請求,功能與前面一致。
*/
ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.matchAll())
.limit(2000)
.build())
.addColumnsToGet("col_1", "col_2")
.sessionId(sessionId)
.build();
List<Row> result = new ArrayList<>();
/*
* 使用原生的API掃描資料。
*/
{
ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
//下次請求的ScanQuery的token。
byte[] nextToken = parallelScanResponse.getNextToken();
//擷取資料。
List<Row> rows = parallelScanResponse.getRows();
result.addAll(rows);
while (nextToken != null) {
//設定token。
parallelScanRequest.getScanQuery().setToken(nextToken);
//繼續掃描資料。
parallelScanResponse = client.parallelScan(parallelScanRequest);
//擷取資料。
rows = parallelScanResponse.getRows();
result.addAll(rows);
nextToken = parallelScanResponse.getNextToken();
}
}
/*
* 推薦方式。
* 使用iterator方式掃描所有匹配資料。使用方式上更簡單,速度和前面方法一致。
*/
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
result.add(row);
//擷取具體的值。
String col_1 = row.getLatestColumn("col_1").getValue().asString();
long col_2 = row.getLatestColumn("col_2").getValue().asLong();
}
}
/*
* 關於失敗重試的問題,如果本函數的外部調用者有重試機制或者不需要考慮失敗重試問題,可以忽略此部分內容。
* 為了保證可用性,遇到任何異常均推薦進行任務層級的重試,重新開始一個新的ParallelScan任務。
* 異常分為如下兩種:
* 1、服務端Session異常OTSSessionExpired。
* 2、調用者用戶端網路等異常。
*/
try {
//正常處理邏輯。
{
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
//處理row,記憶體足夠大時可直接放到list中。
result.add(row);
}
}
} catch (Exception ex) {
//重試。
{
result.clear();
RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
while (iterator.hasNext()) {
Row row = iterator.next();
//處理row,記憶體足夠大時可直接放到list中。
result.add(row);
}
}
}
return result;
}
}
多線程並發掃描資料
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
public class Test {
public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
// 擷取機器的CPU數量。
final int cpuProcessors = Runtime.getRuntime().availableProcessors();
// 指定用戶端多線程的並發數量。建議和用戶端的CPU核心數一致,避免用戶端壓力太大,影響查詢效能。
final Semaphore semaphore = new Semaphore(cpuProcessors);
// 擷取sessionId和本次請求支援的最大並發數。
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
computeSplitsRequest.setTableName(tableName);
computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
final byte[] sessionId = computeSplitsResponse.getSessionId();
final int maxParallel = computeSplitsResponse.getSplitsSize();
// 業務統計行數使用。
AtomicLong rowCount = new AtomicLong(0);
/*
* 為了使用一個函數實現多線程功能,此處構建一個內部類繼承Thread來使用多線程。
* 您也可以構建一個正常的外部類,使代碼更有條理。
*/
final class ThreadForScanQuery extends Thread {
private final int currentParallelId;
private ThreadForScanQuery(int currentParallelId) {
this.currentParallelId = currentParallelId;
this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId); // 設定線程名稱。
}
@Override
public void run() {
System.out.println("start thread:" + this.getName());
try {
// 正常處理邏輯。
{
ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
.tableName(tableName)
.indexName(indexName)
.scanQuery(ScanQuery.newBuilder()
.query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此處的query決定了擷取什麼資料。
.limit(2000)
.currentParallelId(currentParallelId)
.maxParallel(maxParallel)
.build())
.addColumnsToGet("col_long", "col_keyword", "col_bool") // 設定要返回的多元索引中的部分欄位,或者使用下行注釋的內容擷取多元索引中全部資料。
//.returnAllColumnsFromIndex(true)
.sessionId(sessionId)
.build();
// 使用Iterator形式擷取所有資料。
RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
long count = 0;
while (ltr.hasNext()) {
Row row = ltr.next();
// 增加自訂的處理邏輯,此處代碼以統計行數為例介紹。
count++;
}
rowCount.addAndGet(count);
System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
}
} catch (Exception ex) {
// 如果有異常,此處需要考慮重試正常處理邏輯。
} finally {
semaphore.release();
}
}
}
// 多個線程同時執行,currentParallelId取值範圍為[0, maxParallel)。
List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
threadList.add(thread);
}
// 同時啟動。
for (ThreadForScanQuery thread : threadList) {
// 利用semaphore限制同時啟動的線程數量,避免用戶端瓶頸。
semaphore.acquire();
thread.start();
}
// 主線程阻塞等待所有線程完成任務。
for (ThreadForScanQuery thread : threadList) {
thread.join();
}
System.out.println("all thread finished! total rows:" + rowCount.get());
}
}