クエリ結果の順序に要件がない場合は、パラレル スキャン機能を使用してクエリ結果を効率的に取得できます。
前提条件
OTSClient インスタンスが初期化されていること。詳細については、OTSClient インスタンスの初期化を参照してください。
データテーブルが作成され、データがデータテーブルに書き込まれていること。詳細については、データテーブルの作成とデータの書き込みを参照してください。
データテーブルの検索インデックスが作成されていること。詳細については、検索インデックスの作成を参照してください。
パラメーター
パラメーター | 説明 | |
table_name | データテーブルの名前。 | |
index_name | 検索インデックスの名前。 | |
scan_query | query | クエリのタイプ。この操作は、Search 操作でサポートされているものと同様の、タームクエリ、あいまいクエリ、範囲クエリ、地理クエリ、およびネストされたクエリをサポートしています。 |
limit | 各 ParallelScan 呼び出しで返されることができる行の最大数。 | |
max_parallel | リクエストごとのパラレル スキャン タスクの最大数。リクエストごとのパラレル スキャン タスクの最大数は、データ量によって異なります。データ量が多いほど、リクエストごとのパラレル スキャン タスクが多くなります。 ComputeSplits 操作を使用して、リクエストごとのパラレル スキャン タスクの最大数をクエリできます。 | |
current_parallel_id | リクエスト内のパラレル スキャン タスクの ID。有効な値: [0, max_parallel)。 | |
token | クエリ結果をページ分割するために使用されるトークン。 ParallelScan リクエストの結果には、次のページのトークンが含まれています。トークンを使用して次のページを取得できます。 | |
alive_time | 現在のパラレル スキャン タスクの有効期間。この有効期間は、トークンの有効期間でもあります。単位: 秒。デフォルト値: 60。デフォルト値を使用することをお勧めします。有効期間内に次のリクエストが開始されない場合、それ以上のデータはクエリできません。トークンの有効期間は、リクエストを送信するたびに更新されます。 説明 ソースインデックスとカナリアインデックスのスキーマが切り替えられた場合、単一サーバーに障害が発生した場合、またはサーバーエンドの負荷分散が実行された場合、セッションは早期に期限切れになります。この場合、セッションを再作成する必要があります。 | |
columns_to_get | クエリ条件を満たす各行について返される列の名前。 検索インデックス内のすべての列を返すには、return_type パラメーターを RETURN_ALL_FROM_INDEX に設定します。 | |
session_id | パラレル スキャン タスクのセッション ID。 ComputeSplits 操作を呼び出してセッションを作成し、パラレル スキャン リクエストでサポートされているパラレル スキャン タスクの最大数をクエリできます。 |
例
次のサンプルコードは、パラレル スキャンを実行する方法の例を示しています。
def fetch_rows_per_thread(query, session_id, current_thread_id, max_thread_num):
# スレッドごとにデータを取得
token = None
while True:
try:
scan_query = ScanQuery(query, limit = 20, next_token = token, current_parallel_id = current_thread_id,
max_parallel = max_thread_num, alive_time = 30)
response = client.parallel_scan(
table_name, index_name, scan_query, session_id,
columns_to_get = ColumnsToGet(return_type=ColumnReturnType.ALL_FROM_INDEX))
for row in response.rows:
print("%s:%s" % (threading.currentThread().name, str(row)))
if len(response.next_token) == 0:
break
else:
token = response.next_token
except OTSServiceError as e:
print (e)
except OTSClientError as e:
print (e)
def parallel_scan(table_name, index_name):
# parallel_scan の実行例
response = client.compute_splits(table_name, index_name)
query = TermQuery('d', 0.1)
params = []
for i in range(response.splits_size):
params.append((([query, response.session_id, i, response.splits_size], None)))
pool = threadpool.ThreadPool(response.splits_size)
requests = threadpool.makeRequests(fetch_rows_per_thread, params)
[pool.putRequest(req) for req in requests]
pool.wait()