すべてのプロダクト
Search
ドキュメントセンター

Tablestore:並列スキャンを実行する

最終更新日:Dec 28, 2024

クエリ結果の順序に要件がない場合は、並列スキャン機能を使用してクエリ結果を効率的に取得できます。

前提条件

パラメータ

パラメータ

説明

TableName

データテーブルの名前。

IndexName

検索インデックスの名前。

ScanQuery

クエリ

検索インデックスのクエリステートメント。用語クエリ、あいまいクエリ、範囲クエリ、地理クエリ、ネストクエリなどのクエリタイプがサポートされており、Search オペレーションでサポートされているものと同じです。

Limit

各 ParallelScan 呼び出しで返されることができる行の最大数。

MaxParallel

リクエストあたりの並列スキャンタスクの最大数。リクエストあたりの並列スキャンタスクの最大数は、データ量によって異なります。データ量が多いほど、リクエストあたりの並列スキャンタスクが多くなります。 ComputeSplits オペレーションを呼び出して、リクエストあたりの並列スキャンタスクの最大数をクエリできます。

CurrentParallelID

リクエスト内の並列スキャンタスクの ID。有効な値: [0,MaxParallel)。

Token

クエリ結果をページングするために使用されるトークン。ParallelScan リクエストの結果には、次のページのトークンが含まれています。トークンを使用して次のページを取得できます。

AliveTime

現在の並列スキャンタスクの有効期間。この有効期間はトークンの有効期間でもあります。デフォルト値: 60。単位: 秒。デフォルト値を使用することをお勧めします。有効期間内に次のリクエストが開始されない場合、それ以上のデータはクエリできません。リクエストを送信するたびにトークンの有効時間が更新されます。

説明

スキーマでスイッチインデックスが動的に変更された場合、単一サーバーに障害が発生した場合、またはサーバーの負荷分散が実行された場合、セッションは期限前に期限切れになります。この場合、セッションを再作成する必要があります。

ColumnsToGet

グループ化結果で返される列の名前。Columns パラメータを列名に設定できます。

検索インデックスのすべての列を返す場合は、ReturnAllFromIndex パラメータを指定できます。

重要

ReturnAll パラメータはサポートされていません。

SessionId

並列スキャンタスクのセッション ID。ComputeSplits オペレーションを呼び出してセッションを作成し、並列スキャンリクエストでサポートされている並列スキャンタスクの最大数をクエリできます。

ビジネス要件に基づいて、単一スレッドまたは複数スレッドを使用してデータをスキャンできます。

単一スレッドを使用してデータをスキャンする

並列スキャン機能を使用する場合、単一スレッドを使用するリクエストのコードは、複数スレッドを使用するリクエストのコードよりも簡単です。単一スレッドを使用するリクエストには、currentParallelId パラメータと maxParallel パラメータは必要ありません。単一スレッドを使用する ParallelScan リクエストは、Search リクエストよりも高いスループットを提供します。ただし、単一スレッドを使用する ParallelScan リクエストは、複数スレッドを使用する ParallelScan リクエストよりも低いスループットを提供します。

func computeSplits(client *tablestore.TableStoreClient, tableName string, indexName string) (*tablestore.ComputeSplitsResponse, error) {
	req := &tablestore.ComputeSplitsRequest{}
	req.
		SetTableName(tableName).
		SetSearchIndexSplitsOptions(tablestore.SearchIndexSplitsOptions{IndexName: indexName})
	res, err := client.ComputeSplits(req)
	if err != nil {
		return nil, err
	}
	return res, nil
}

/**
 * 並列スキャンを実行して、単一スレッドを使用してデータをスキャンします。
 */
func ParallelScanSingleConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
        computeSplitsResp, err := computeSplits(client, tableName, indexName)
        if err != nil {
                fmt.Printf("%#v", err)
                return
        }

        query := search.NewScanQuery().SetQuery(&search.MatchAllQuery{}).SetLimit(2)

        req := &tablestore.ParallelScanRequest{}
        req.SetTableName(tableName).
                SetIndexName(indexName).
                SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
                SetScanQuery(query).
                SetSessionId(computeSplitsResp.SessionId)

        res, err := client.ParallelScan(req)
        if err != nil {
                fmt.Printf("%#v", err)
                return
        }

        total := len(res.Rows)
        for res.NextToken != nil {
                req.SetScanQuery(query.SetToken(res.NextToken))
                res, err = client.ParallelScan(req)
                if err != nil {
                        fmt.Printf("%#v", err)
                        return
                }

                total += len(res.Rows) // 各ループで行を処理します
        }
        fmt.Println("total: ", total)
}

複数スレッドを使用してデータをスキャンする

func computeSplits(client *tablestore.TableStoreClient, tableName string, indexName string) (*tablestore.ComputeSplitsResponse, error) {
	req := &tablestore.ComputeSplitsRequest{}
	req.
		SetTableName(tableName).
		SetSearchIndexSplitsOptions(tablestore.SearchIndexSplitsOptions{IndexName: indexName})
	res, err := client.ComputeSplits(req)
	if err != nil {
		return nil, err
	}
	return res, nil
}

/**
 * 並列スキャンを実行して、複数スレッドを使用してデータをスキャンします。
 */
func ParallelScanMultiConcurrency(client *tablestore.TableStoreClient, tableName string, indexName string) {
        computeSplitsResp, err := computeSplits(client, tableName, indexName)
        if err != nil {
                fmt.Printf("%#v", err)
                return
        }

        var wg sync.WaitGroup
        wg.Add(int(computeSplitsResp.SplitsSize))

        for i := int32(0); i < computeSplitsResp.SplitsSize; i++ {
                current := i
                go func() {
                        defer wg.Done()
                        query := search.NewScanQuery().
                                SetQuery(&search.MatchAllQuery{}).
                                SetCurrentParallelID(current).
                                SetMaxParallel(computeSplitsResp.SplitsSize).
                                SetLimit(2)

                        req := &tablestore.ParallelScanRequest{}
                        req.SetTableName(tableName).
                                SetIndexName(indexName).
                                SetColumnsToGet(&tablestore.ColumnsToGet{ReturnAllFromIndex: false}).
                                SetScanQuery(query).
                                SetSessionId(computeSplitsResp.SessionId)

                        res, err := client.ParallelScan(req)
                        if err != nil {
                                fmt.Printf("%#v", err)
                                return
                        }

                        total := len(res.Rows)
                        for res.NextToken != nil {
                                req.SetScanQuery(query.SetToken(res.NextToken))
                                res, err = client.ParallelScan(req)
                                if err != nil {
                                        fmt.Printf("%#v", err)
                                        return
                                }

                                total += len(res.Rows) // 各ループで行を処理します
                        }
                        fmt.Println("total: ", total)
                }()
        }
        wg.Wait()
}

FAQ

Search オペレーションを呼び出してもデータが見つからない場合はどうすればよいですか?

関連情報