当使用场景中不关心整个结果集的顺序时,您可以使用并发导出数据功能以更快的速度将命中的数据全部返回。
前提条件
已初始化OTSClient。具体操作,请参见初始化OTSClient。
已在数据表上创建多元索引。具体操作,请参见创建多元索引。
参数
参数 | 说明 | |
table_name | 数据表名称。 | |
index_name | 多元索引名称。 | |
scan_query | query | 多元索引的查询语句。支持精确查询、模糊查询、范围查询、地理位置查询、嵌套查询等,功能和Search接口一致。 |
limit | 扫描数据时一次能返回的数据行数。 | |
max_parallel | 最大并发数。请求支持的最大并发数由用户数据量决定。数据量越大,支持的并发数越多,每次任务前可以通过ComputeSplits API进行获取。 | |
current_parallel_id | 当前并发ID。取值范围为[0, max_parallel)。 | |
token | 用于翻页功能。ParallelScan请求结果中有下一次进行翻页的token,使用该token可以接着上一次的结果继续读取数据。 | |
alive_time | ParallelScan的当前任务有效时间,也是token的有效时间。默认值为60,建议使用默认值,单位为秒。如果在有效时间内没有发起下一次请求,则不能继续读取数据。持续发起请求会刷新token有效时间。 说明 动态修改schema中的切换索引、服务端单台机器故障、服务端负载均衡等均会导致Session提前过期,此时需要重新创建Session。 | |
columns_to_get | 指定分组结果中需要返回的列名,可以通过将列名加入columns来实现。 如果需要返回多元索引中的所有列,则可以通过设置return_type为RETURN_ALL_FROM_INDEX实现。 | |
session_id | 本次并发扫描数据任务的sessionId。创建Session可以通过ComputeSplits API来创建,同时获得本次任务支持的最大并发数。 |
示例
以下示例用于并发导出数据。
def fetch_rows_per_thread(query, session_id, current_thread_id, max_thread_num):
token = None
while True:
try:
scan_query = ScanQuery(query, limit = 20, next_token = token, current_parallel_id = current_thread_id,
max_parallel = max_thread_num, alive_time = 30)
response = client.parallel_scan(
table_name, index_name, scan_query, session_id,
columns_to_get = ColumnsToGet(return_type=ColumnReturnType.ALL_FROM_INDEX))
for row in response.rows:
print("%s:%s" % (threading.currentThread().name, str(row)))
if len(response.next_token) == 0:
break
else:
token = response.next_token
except OTSServiceError as e:
print (e)
except OTSClientError as e:
print (e)
def parallel_scan(table_name, index_name):
response = client.compute_splits(table_name, index_name)
query = TermQuery('d', 0.1)
params = []
for i in range(response.splits_size):
params.append((([query, response.session_id, i, response.splits_size], None)))
pool = threadpool.ThreadPool(response.splits_size)
requests = threadpool.makeRequests(fetch_rows_per_thread, params)
[pool.putRequest(req) for req in requests]
pool.wait()