すべてのプロダクト
Search
ドキュメントセンター

Tablestore:並列スキャン

最終更新日:Dec 28, 2024

クエリ結果の順序に要件がない場合は、並列スキャン機能を使用してクエリ結果を効率的に取得できます。

重要

Tablestore Java SDK V5.6.0 以降では、並列スキャン機能がサポートされています。並列スキャン機能を使用する前に、正しいバージョンの Tablestore Java SDK を取得していることを確認してください。Tablestore Java SDK のバージョン履歴の詳細については、Tablestore Java SDK のバージョン履歴を参照してください。

背景情報

検索インデックス機能を使用すると、Search オペレーションを呼び出して、並べ替えや集計などのすべてのクエリ機能と分析機能を使用できます。クエリ結果は、Search オペレーションから特定の順序で返されます。

場合によっては、たとえば、Tablestore を Spark や Presto などのコンピューティング環境に接続する場合や、特定のオブジェクトグループをクエリする場合、クエリ結果の順序よりもクエリ速度の方が重要になることがあります。クエリ速度を向上させるために、Tablestore は検索インデックス機能に ParallelScan オペレーションを提供しています。

Search オペレーションと比較して、ParallelScan オペレーションはすべてのクエリ機能をサポートしますが、並べ替えや集計などの分析機能は提供しません。これにより、クエリ速度が 5 倍以上向上します。ParallelScan オペレーションを呼び出して、数億のデータ行を 1 分以内にエクスポートできます。データエクスポート機能は、上限なしで水平方向にスケーリングできます。

各 ParallelScan 呼び出しで返される行の最大数は、各 Search 呼び出しで返される行の最大数よりも大きくなります。Search オペレーションは呼び出しごとに最大 100 行を返しますが、ParallelScan オペレーションは呼び出しごとに最大 2,000 行を返します。並列スキャン機能を使用すると、複数のスレッドを使用してセッションでリクエストを並行して開始できるため、データエクスポートが高速化されます。

シナリオ

  • クエリ結果を並べ替えたり集計したりする場合、またはクエリリクエストがエンドユーザーから送信される場合は、Search オペレーションを使用します。

  • クエリ結果を並べ替える必要がなく、一致するすべての結果を効率的に返したい場合、またはデータが Spark や Presto などのコンピューティング環境によってプルされる場合は、ParallelScan オペレーションを使用します。

機能

以下の項目では、ParallelScan オペレーションと Search オペレーションの違いについて説明します。

  • 安定した結果

    並列スキャンのタスクはステートフルです。セッションでは、スキャンされたデータの結果セットは、最初のリクエストが開始されたときのデータステータスによって決定されます。最初のリクエストが送信された後にデータが挿入または変更されても、結果セットは影響を受けません。

  • セッション

    重要

    セッション ID を取得するのが難しい場合は、セッション ID を指定せずに ParallelScan オペレーションを呼び出してリクエストを開始できます。ただし、セッション ID を指定せずにリクエストを送信すると、取得した結果セットに重複データが発生する可能性が非常に低くなります。

    並列スキャン関連のオペレーションではセッションを使用します。セッション ID を使用して、スキャンされたデータの結果セットを決定できます。次のプロセスでは、セッション ID を取得して使用する方法について説明します。

    1. ComputeSplits オペレーションを呼び出して、並列スキャンタスクの最大数と現在のセッション ID をクエリします。

    2. 複数の並列スキャンリクエストを開始してデータを読み取ります。これらのリクエストでは、現在のセッション ID と並列スキャンタスク ID を指定する必要があります。

    並列スキャンプロセスでネットワーク例外、スレッド例外、スキーマの動的変更、またはインデックスの切り替えが発生し、データスキャンが停止すると、Tablestore は OTSSessionExpired エラーコードを返します。このような場合は、別の並列スキャンタスクを開始してデータを再度スキャンする必要があります。

    同じセッション ID と同じ ScanQuery パラメータ値を持つ並列スキャンタスクは、1 つのタスクと見なされます。並列スキャンタスクは、最初の ParallelScan リクエストを送信したときに開始し、すべてのデータがスキャンされるか、トークンの有効期限が切れると終了します。

  • 単一リクエストでの並列スキャンタスクの最大数

    ParallelScan オペレーションでサポートされる単一リクエストでの並列スキャンタスクの最大数は、ComputeSplits リクエストのレスポンスによって決定されます。データ量が多いほど、セッションで必要な並列スキャンタスクが多くなります。

    単一のリクエストは、1 つのクエリステートメントで指定されます。たとえば、Search オペレーションを使用して、city の値が杭州である結果をクエリする場合、この条件に一致するすべてのデータが結果に返されます。ただし、ParallelScan オペレーションを使用し、セッションの並列スキャンタスクの数が 2 の場合、各 ParallelScan リクエストは結果の半分を返します。完全な結果セットは、2 つの並列結果セットで構成されます。

  • パフォーマンス

    並列スキャンタスクを含む ParallelScan リクエストのクエリ速度は、Search リクエストのクエリ速度の 5 倍です。並列スキャン機能を使用する場合、クエリ速度はセッションの並列スキャンタスクの数とともに増加します。たとえば、セッションに 8 つの並列スキャンタスクが含まれている場合、クエリ速度は 4 倍向上します。

  • コスト

    ParallelScan リクエストは消費リソースが少なく、低価格で提供されます。大量のデータをエクスポートするには、ParallelScan オペレーションを使用することをお勧めします。

制限

  • 並列スキャンタスクの最大数は 10 です。この制限は、ビジネス要件に基づいて調整できます。

  • 検索インデックスからは既存の列のみを返すことができます。ただし、DATE 型と NESTED 型の列は返すことができません。

    ParallelScan オペレーションは、ARRAY 列と GEOPOINT 列の値を返すことができます。ただし、戻り値はフォーマットされており、データテーブルに書き込まれた値とは異なる場合があります。たとえば、ARRAY 列に [1,2, 3, 4] を書き込むと、ParallelScan オペレーションは値として [1,2,3,4] を返します。GEOPOINT 列に 10,50 を書き込むと、ParallelScan オペレーションは値として 10.0,50.0 を返します。

    ReturnType パラメータは RETURN_ALL_INDEX または RETURN_SPECIFIED に設定できますが、RETURN_ALL には設定できません。

  • 各 ParallelScan 呼び出しで返される行の最大数は、limit パラメータで指定されます。limit パラメータのデフォルト値は 2,000 です。2,000 より大きい値を指定した場合、パフォーマンスは limit の増加に伴ってほとんど変化しません。

API オペレーション

並列スキャン機能を使用するには、次の API オペレーションを呼び出すことができます。

  • ComputeSplits: このオペレーションを呼び出して、単一の ParallelScan リクエストでサポートされる並列スキャンタスクの最大数をクエリできます。

  • ParallelScan: このオペレーションを呼び出してデータをエクスポートできます。

Tablestore SDK の使用

次の Tablestore SDK を使用して、データを並列にスキャンできます。

パラメータ

パラメータ

説明

tableName

データテーブルの名前。

indexName

検索インデックスの名前。

scanQuery

query

検索インデックスのクエリステートメント。このオペレーションは、Search オペレーションと同様に、用語クエリ、あいまいクエリ、範囲クエリ、地理クエリ、およびネストされたクエリをサポートします。

limit

各 ParallelScan 呼び出しで返される行の最大数。

maxParallel

リクエストあたりの並列スキャンタスクの最大数。リクエストあたりの並列スキャンタスクの最大数は、データ量によって異なります。データ量が多いほど、リクエストあたりの並列スキャンタスクが多くなります。ComputeSplits オペレーションを使用して、リクエストあたりの並列スキャンタスクの最大数をクエリできます。

currentParallelId

リクエストの並列スキャンタスクの ID。有効な値: [0, maxParallel の値)

token

クエリ結果をページ分割するために使用されるトークン。ParallelScan リクエストの結果には、次のページのトークンが含まれています。トークンを使用して次のページを取得できます。

aliveTime

現在の並列スキャンタスクの有効期間。この有効期間は、トークンの有効期間でもあります。単位: 秒。デフォルト値: 60。単位: 秒。デフォルト値を使用することをお勧めします。有効期間内に次のリクエストが開始されない場合、それ以上のデータはクエリできません。トークンの有効期間は、リクエストを送信するたびに更新されます。

説明

スキーマでインデックスの切り替えが動的に変更された場合、単一サーバーで障害が発生した場合、またはサーバーエンドの負荷分散が実行された場合、セッションは期限切れになります。この場合は、セッションを再作成する必要があります。

columnsToGet

グループ化結果で返される列の名前。列名を Columns に追加できます。

検索インデックスのすべての列を返す場合は、より簡潔な ReturnAllFromIndex オペレーションを使用できます。

重要

ReturnAll はここでは使用できません。

sessionId

並列スキャンタスクのセッション ID。ComputeSplits オペレーションを呼び出してセッションを作成し、並列スキャンリクエストでサポートされる並列スキャンタスクの最大数をクエリできます。

ビジネス要件に基づいて、単一スレッドを使用するか、複数のスレッドを同時に使用してデータをスキャンできます。

単一スレッドを使用したデータのスキャン

並列スキャンを使用する場合、単一スレッドを使用するリクエストのコードは、複数スレッドを使用するリクエストのコードよりも簡単です。単一スレッドを使用するリクエストには、currentParallelId パラメータと maxParallel パラメータは必要ありません。単一スレッドを使用する ParallelScan リクエストは、Search リクエストよりも高いスループットを提供します。ただし、単一スレッドを使用する ParallelScan リクエストは、複数スレッドを使用する ParallelScan リクエストよりも低いスループットを提供します。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

public class Test {

    public static List<Row> scanQuery(final SyncClient client) {
        String tableName = "<テーブル名>"; // テーブル名を指定します。
        String indexName = "<インデックス名>"; // インデックス名を指定します。
        // セッション ID とリクエストでサポートされる並列スキャンタスクの最大数をクエリします。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        byte[] sessionId = computeSplitsResponse.getSessionId();
        int splitsSize = computeSplitsResponse.getSplitsSize();
        /*
         * 並列スキャンリクエストを作成します。
         */
        ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
        parallelScanRequest.setTableName(tableName);
        parallelScanRequest.setIndexName(indexName);
        ScanQuery scanQuery = new ScanQuery();
        // このクエリは、スキャンするデータの範囲を決定します。ネストされた複雑なクエリを作成できます。
        Query query = new MatchAllQuery();
        scanQuery.setQuery(query);

        // 各 ParallelScan 呼び出しで返される行の最大数を指定します。
        scanQuery.setLimit(2000);
        parallelScanRequest.setScanQuery(scanQuery);
        ColumnsToGet columnsToGet = new ColumnsToGet();
        columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
        parallelScanRequest.setColumnsToGet(columnsToGet);
        parallelScanRequest.setSessionId(sessionId);

        /*
         * builder を使用して、前のリクエストと同じ機能を持つ並列スキャンリクエストを作成します。
         */
        ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
            .tableName(tableName)
            .indexName(indexName)
            .scanQuery(ScanQuery.newBuilder()
                .query(QueryBuilders.matchAll())
                .limit(2000)
                .build())
            .addColumnsToGet("col_1", "col_2")
            .sessionId(sessionId)
            .build();
        List<Row> result = new ArrayList<>();

        /*
         * ネイティブ API オペレーションを使用してデータをスキャンします。
         */
        {
            ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
            // 次のリクエストの ScanQuery のトークンをクエリします。
            byte[] nextToken = parallelScanResponse.getNextToken();
            // データを取得します。
            List<Row> rows = parallelScanResponse.getRows();
            result.addAll(rows);
            while (nextToken != null) {
                // トークンを指定します。
                parallelScanRequest.getScanQuery().setToken(nextToken);
                // データのスキャンを続けます。
                parallelScanResponse = client.parallelScan(parallelScanRequest);
                // データを取得します。
                rows = parallelScanResponse.getRows();
                result.addAll(rows);
                nextToken = parallelScanResponse.getNextToken();
            }
        }

        /*
         * 推奨される方法。
         * イテレータを使用して、一致するすべてのデータをスキャンします。この方法は、前の方法と比較して、同じクエリ速度ですが、使いやすくなっています。
         */
        {
            RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
            while (iterator.hasNext()) {
                Row row = iterator.next();
                result.add(row);
                // 特定の値を取得します。
                String col_1 = row.getLatestColumn("col_1").getValue().asString();
                long col_2 = row.getLatestColumn("col_2").getValue().asLong();
            }
        }

        /*
         * オペレーションが失敗した場合は、オペレーションを再試行します。この関数の呼び出し元に再試行メカニズムがある場合、または失敗したオペレーションを再試行したくない場合は、この部分を無視できます。
         * 可可用性を確保するために、例外が発生した場合は新しい並列スキャンタスクを開始することをお勧めします。
         * ParallelScan リクエストを送信すると、次の例外が発生する可能性があります。
         * 1. サーバー側でセッション例外が発生します。エラーコードは OTSSessionExpired です。
         * 2. クライアント側でネットワーク例外などの例外が発生します。
         */
        try {
            // 処理ロジックを実行します。
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    // データ行を処理します。十分なメモリリソースがある場合は、行をリストに追加できます。
                    result.add(row);
                }
            }
        } catch (Exception ex) {
            // 処理ロジックを再試行します。
            {
                result.clear();
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    // データ行を処理します。十分なメモリリソースがある場合は、行をリストに追加できます。
                    result.add(row);
                }
            }
        }
        return result;
    }
}

複数スレッドを使用したデータのスキャン

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

public class Test {

    public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
        // クライアントの CPU コア数をクエリします。
        final int cpuProcessors = Runtime.getRuntime().availableProcessors();
        // クライアントの並列スレッド数を指定します。クエリのパフォーマンスへの影響を防ぐために、クライアントの CPU コア数をクライアントの並列スレッド数として指定することをお勧めします。
        final Semaphore semaphore = new Semaphore(cpuProcessors);

        // セッション ID とリクエストでサポートされる並列スキャンタスクの最大数をクエリします。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        final byte[] sessionId = computeSplitsResponse.getSessionId();
        final int maxParallel = computeSplitsResponse.getSplitsSize();

        // ビジネスで行数を取得する必要がある場合は、AtomicLong オブジェクトを作成します。
        AtomicLong rowCount = new AtomicLong(0);
        /*
         * 関数を使用してマルチスレッドを実行する場合は、スレッドを継承する内部クラスを構築できます。
         * また、コードを整理するために外部クラスを構築することもできます。
         */
        final class ThreadForScanQuery extends Thread {
            private final int currentParallelId;

            private ThreadForScanQuery(int currentParallelId) {
                this.currentParallelId = currentParallelId;
                this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // スレッド名を指定します。
            }

            @Override
            public void run() {
                System.out.println("スレッドを開始します:" + this.getName());
                try {
                    // 処理ロジックを実行します。
                    {
                        ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                .tableName(tableName)
                                .indexName(indexName)
                                .scanQuery(ScanQuery.newBuilder()
                                        .query(QueryBuilders.range("col_long").lessThan(10_0000)) // クエリするデータを指定します。
                                        .limit(2000)
                                        .currentParallelId(currentParallelId)
                                        .maxParallel(maxParallel)
                                        .build())
                                .addColumnsToGet("col_long", "col_keyword", "col_bool")  // 検索インデックスから返すフィールドを指定します。検索インデックスからすべてのフィールドを返すには、returnAllColumnsFromIndex を true に設定します。
                                //.returnAllColumnsFromIndex(true)
                                .sessionId(sessionId)
                                .build();
                        // イテレータを使用してすべてのデータを取得します。
                        RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                        long count = 0;
                        while (ltr.hasNext()) {
                            Row row = ltr.next();
                            // カスタム処理ロジックを追加します。次のサンプルコードは、行数をカウントするためのカスタム処理ロジックを追加する方法を示しています。
                            count++;
                        }
                        rowCount.addAndGet(count);
                        System.out.println("スレッド[" + this.getName() + "] が完了しました。このスレッドが取得した行:" + count);
                    }
                } catch (Exception ex) {
                    // 例外が発生した場合は、処理ロジックを再試行できます。
                } finally {
                    semaphore.release();
                }
            }
        }

        // currentParallelId の有効な値: [0, maxParallel の値)。同時にスレッドを実行します。
        List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
        for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
            ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
            threadList.add(thread);
        }

        // スレッドを同時に開始します。
        for (ThreadForScanQuery thread : threadList) {
            // セマフォの値を指定して、同時に開始できるスレッドの数を制限し、クライアントのボトルネックを防ぎます。
            semaphore.acquire();
            thread.start();
        }

        // すべてのスレッドが完了するまで、メインスレッドはブロックされます。
        for (ThreadForScanQuery thread : threadList) {
            thread.join();
        }
        System.out.println("すべてのスレッドが完了しました! 合計行数:" + rowCount.get());
    }
}