當使用情境中不關心整個結果集的順序時,您可以使用並發匯出資料功能以更快的速度將命中的資料全部返回。
前提條件
已初始化OTSClient。具體操作,請參見初始化OTSClient。
已在資料表上建立多元索引。具體操作,請參見建立多元索引。
參數
參數 | 說明 | |
table_name | 資料表名稱。 | |
index_name | 多元索引名稱。 | |
scan_query | query | 多元索引的查詢語句。支援精確查詢、模糊查詢、範圍查詢、地理位置查詢、巢狀查詢等,功能和Search介面一致。 |
limit | 掃描資料時一次能返回的資料行數。 | |
max_parallel | 最大並發數。請求支援的最大並發數由使用者資料量決定。資料量越大,支援的並發數越多,每次任務前可以通過ComputeSplits API進行擷取。 | |
current_parallel_id | 當前並發ID。取值範圍為[0, max_parallel)。 | |
token | 用於翻頁功能。ParallelScan請求結果中有下一次進行翻頁的token,使用該token可以接著上一次的結果繼續讀取資料。 | |
alive_time | ParallelScan的當前任務有效時間,也是token的有效時間。預設值為60,建議使用預設值,單位為秒。如果在有效時間內沒有發起下一次請求,則不能繼續讀取資料。持續發起請求會重新整理token有效時間。 說明 動態修改schema中的切換索引、服務端單台機器故障、服務端負載平衡等均會導致Session提前到期,此時需要重新建立Session。 | |
columns_to_get | 指定分組結果中需要返回的列名,可以通過將列名加入columns來實現。 如果需要返回多元索引中的所有列,則可以通過設定return_type為RETURN_ALL_FROM_INDEX實現。 | |
session_id | 本次並發掃描資料任務的sessionId。建立Session可以通過ComputeSplits API來建立,同時獲得本次任務支援的最大並發數。 |
樣本
以下樣本用於並發匯出資料。
def fetch_rows_per_thread(query, session_id, current_thread_id, max_thread_num):
token = None
while True:
try:
scan_query = ScanQuery(query, limit = 20, next_token = token, current_parallel_id = current_thread_id,
max_parallel = max_thread_num, alive_time = 30)
response = client.parallel_scan(
table_name, index_name, scan_query, session_id,
columns_to_get = ColumnsToGet(return_type=ColumnReturnType.ALL_FROM_INDEX))
for row in response.rows:
print("%s:%s" % (threading.currentThread().name, str(row)))
if len(response.next_token) == 0:
break
else:
token = response.next_token
except OTSServiceError as e:
print (e)
except OTSClientError as e:
print (e)
def parallel_scan(table_name, index_name):
response = client.compute_splits(table_name, index_name)
query = TermQuery('d', 0.1)
params = []
for i in range(response.splits_size):
params.append((([query, response.session_id, i, response.splits_size], None)))
pool = threadpool.ThreadPool(response.splits_size)
requests = threadpool.makeRequests(fetch_rows_per_thread, params)
[pool.putRequest(req) for req in requests]
pool.wait()