すべてのプロダクト
Search
ドキュメントセンター

Tablestore:並列スキャン

最終更新日:Dec 28, 2024

クエリ結果の順序に要件がない場合は、並列スキャン機能を使用してクエリ結果を効率的に取得できます。

重要

Tablestore SDK for Java V5.6.0 以降では、並列スキャン機能がサポートされています。並列スキャン機能を使用する前に、正しいバージョンの Tablestore SDK for Java を取得していることを確認してください。Tablestore SDK for Java のバージョン履歴の詳細については、Tablestore SDK for Java のバージョン履歴を参照してください。

前提条件

パラメーター

パラメーター

説明

tableName

データテーブルの名前。

indexName

検索インデックスの名前。

scanQuery

query

検索インデックスのクエリステートメント。この操作は、Search 操作と同様に、タームクエリ、あいまいクエリ、範囲クエリ、地理クエリ、およびネストされたクエリをサポートします。

limit

各 ParallelScan 呼び出しで返される行の最大数。

maxParallel

リクエストあたりの並列スキャンタスクの最大数。リクエストあたりの並列スキャンタスクの最大数は、データ量によって異なります。データ量が多いほど、リクエストあたりの並列スキャンタスクが多くなります。 ComputeSplits 操作を使用して、リクエストあたりの並列スキャンタスクの最大数をクエリできます。

currentParallelId

リクエスト内の並列スキャンタスクの ID。有効な値: [0, maxParallel の値)

token

クエリ結果をページ分割するために使用されるトークン。ParallelScan リクエストの結果には、次のページのトークンが含まれています。トークンを使用して次のページを取得できます。

aliveTime

現在の並列スキャンタスクの有効期間。この有効期間は、トークンの有効期間でもあります。単位: 秒。デフォルト値: 60。単位: 秒。デフォルト値を使用することをお勧めします。有効期間内に次のリクエストが開始されない場合、それ以上のデータはクエリできません。トークンの有効期間は、リクエストを送信するたびに更新されます。

説明

スキーマでスイッチインデックスが動的に変更された場合、単一サーバーに障害が発生した場合、またはサーバーエンドの負荷分散が実行された場合、セッションは期限前に期限切れになります。この場合、セッションを再作成する必要があります。

columnsToGet

グループ化結果で返される列の名前。列名を Columns に追加できます。

検索インデックス内のすべての列を返す場合は、より簡潔な ReturnAllFromIndex 操作を使用できます。

重要

ReturnAll はここでは使用できません。

sessionId

並列スキャンタスクのセッション ID。ComputeSplits 操作を呼び出してセッションを作成し、並列スキャンリクエストでサポートされる並列スキャンタスクの最大数をクエリできます。

ビジネス要件に基づいて、単一スレッドまたは複数スレッドを使用してデータをスキャンできます。

単一スレッドを使用したデータのスキャン

並列スキャンを使用する場合、単一スレッドを使用するリクエストのコードは、複数スレッドを使用するリクエストのコードよりも簡単です。単一スレッドを使用するリクエストでは、currentParallelId パラメーターと maxParallel パラメーターは不要です。単一スレッドを使用する ParallelScan リクエストは、Search リクエストよりも高いスループットを提供します。ただし、単一スレッドを使用する ParallelScan リクエストは、複数スレッドを使用する ParallelScan リクエストよりも低いスループットを提供します。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

public class Test {

    public static List<Row> scanQuery(final SyncClient client) {
        String tableName = "<TableName>"; // テーブル名
        String indexName = "<IndexName>"; // インデックス名
        // セッションIDとリクエストでサポートされる並列スキャンタスクの最大数をクエリします。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        byte[] sessionId = computeSplitsResponse.getSessionId();
        int splitsSize = computeSplitsResponse.getSplitsSize();
        /*
         * 並列スキャンリクエストを作成します。
         */
        ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
        parallelScanRequest.setTableName(tableName);
        parallelScanRequest.setIndexName(indexName);
        ScanQuery scanQuery = new ScanQuery();
        // このクエリは、スキャンするデータの範囲を決定します。ネストされた複雑なクエリを作成できます。
        Query query = new MatchAllQuery();
        scanQuery.setQuery(query);

        // 各ParallelScan呼び出しで返される行の最大数を指定します。
        scanQuery.setLimit(2000);
        parallelScanRequest.setScanQuery(scanQuery);
        ColumnsToGet columnsToGet = new ColumnsToGet();
        columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
        parallelScanRequest.setColumnsToGet(columnsToGet);
        parallelScanRequest.setSessionId(sessionId);

        /*
         * builderを使用して、前のリクエストと同じ機能を持つ並列スキャンリクエストを作成します。
         */
        ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
            .tableName(tableName)
            .indexName(indexName)
            .scanQuery(ScanQuery.newBuilder()
                .query(QueryBuilders.matchAll())
                .limit(2000)
                .build())
            .addColumnsToGet("col_1", "col_2")
            .sessionId(sessionId)
            .build();
        List<Row> result = new ArrayList<>();

        /*
         * ネイティブAPI操作を使用してデータをスキャンします。
         */
        {
            ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
            // 次のリクエストのScanQueryのトークンをクエリします。
            byte[] nextToken = parallelScanResponse.getNextToken();
            // データを取得します。
            List<Row> rows = parallelScanResponse.getRows();
            result.addAll(rows);
            while (nextToken != null) {
                // トークンを指定します。
                parallelScanRequest.getScanQuery().setToken(nextToken);
                // データのスキャンを続けます。
                parallelScanResponse = client.parallelScan(parallelScanRequest);
                // データを取得します。
                rows = parallelScanResponse.getRows();
                result.addAll(rows);
                nextToken = parallelScanResponse.getNextToken();
            }
        }

        /*
         * 推奨される方法。
         * イテレータを使用して、一致するすべてのデータをスキャンします。この方法は、前の方法と比較して、同じクエリ速度ですが、使いやすくなっています。
         */
        {
            RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
            while (iterator.hasNext()) {
                Row row = iterator.next();
                result.add(row);
                // 特定の値を取得します。
                String col_1 = row.getLatestColumn("col_1").getValue().asString();
                long col_2 = row.getLatestColumn("col_2").getValue().asLong();
            }
        }

        /*
         * 操作が失敗した場合は、操作を再試行します。この関数の呼び出し元に再試行メカニズムがある場合、または失敗した操作を再試行したくない場合は、この部分を無視できます。
         * 可用性を確保するために、例外が発生した場合は新しい並列スキャンタスクを開始することをお勧めします。
         * ParallelScanリクエストを送信すると、次の例外が発生する可能性があります。
         * 1. サーバー側でセッション例外が発生します。エラーコードはOTSSessionExpiredです。
         * 2. クライアント側でネットワーク例外などの例外が発生します。
         */
        try {
            // 処理ロジックを実行します。
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    // データ行を処理します。十分なメモリリソースがある場合は、行をリストに追加できます。
                    result.add(row);
                }
            }
        } catch (Exception ex) {
            // 処理ロジックを再試行します。
            {
                result.clear();
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    // データ行を処理します。十分なメモリリソースがある場合は、行をリストに追加できます。
                    result.add(row);
                }
            }
        }
        return result;
    }
}

複数スレッドを使用したデータのスキャン

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

public class Test {

    public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
        // クライアントのCPUコア数をクエリします。
        final int cpuProcessors = Runtime.getRuntime().availableProcessors();
        // クライアントの並列スレッド数を指定します。クエリのパフォーマンスへの影響を防ぐため、クライアントのCPUコア数をクライアントの並列スレッド数として指定することをお勧めします。
        final Semaphore semaphore = new Semaphore(cpuProcessors);

        // セッションIDとリクエストでサポートされる並列スキャンタスクの最大数をクエリします。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        final byte[] sessionId = computeSplitsResponse.getSessionId();
        final int maxParallel = computeSplitsResponse.getSplitsSize();

        // ビジネスで行数を取得する必要がある場合は、AtomicLongオブジェクトを作成します。
        AtomicLong rowCount = new AtomicLong(0);
        /*
         * 関数を使用してマルチスレッドを実行する場合は、スレッドを継承する内部クラスを構築できます。
         * また、コードを整理するために外部クラスを構築することもできます。
         */
        final class ThreadForScanQuery extends Thread {
            private final int currentParallelId;

            private ThreadForScanQuery(int currentParallelId) {
                this.currentParallelId = currentParallelId;
                this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // スレッド名を指定します。
            }

            @Override
            public void run() {
                System.out.println("start thread:" + this.getName());
                try {
                    // 処理ロジックを実行します。
                    {
                        ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                .tableName(tableName)
                                .indexName(indexName)
                                .scanQuery(ScanQuery.newBuilder()
                                        .query(QueryBuilders.range("col_long").lessThan(10_0000)) // クエリするデータを指定します。
                                        .limit(2000)
                                        .currentParallelId(currentParallelId)
                                        .maxParallel(maxParallel)
                                        .build())
                                .addColumnsToGet("col_long", "col_keyword", "col_bool")  // 検索インデックスから返すフィールドを指定します。検索インデックスからすべてのフィールドを返すには、returnAllColumnsFromIndexをtrueに設定します。
                                //.returnAllColumnsFromIndex(true)
                                .sessionId(sessionId)
                                .build();
                        // イテレータを使用してすべてのデータを取得します。
                        RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                        long count = 0;
                        while (ltr.hasNext()) {
                            Row row = ltr.next();
                            // カスタム処理ロジックを追加します。次のサンプルコードは、行数をカウントするためのカスタム処理ロジックを追加する方法を示しています。
                            count++;
                        }
                        rowCount.addAndGet(count);
                        System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
                    }
                } catch (Exception ex) {
                    // 例外が発生した場合は、処理ロジックを再試行できます。
                } finally {
                    semaphore.release();
                }
            }
        }

        // スレッドを同時に実行します。currentParallelIdの有効な値:[0、maxParallelの値)。
        List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
        for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
            ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
            threadList.add(thread);
        }

        // スレッドを同時に開始します。
        for (ThreadForScanQuery thread : threadList) {
            // セマフォに値を指定して、同時に開始できるスレッド数を制限し、クライアントのボトルネックを防ぎます。
            semaphore.acquire();
            thread.start();
        }

        // メインスレッドは、すべてのスレッドが完了するまでブロックされます。
        for (ThreadForScanQuery thread : threadList) {
            thread.join();
        }
        System.out.println("all thread finished! total rows:" + rowCount.get());
    }
}