全部產品
Search
文件中心

Tablestore:讀取資料

更新時間:Jun 30, 2024

Tablestore提供了單行讀取、批量讀取、範圍讀取、迭代讀取和並行讀取的查詢方式用於讀取資料表中資料。資料寫入到資料表後,您可以選擇所需資料查詢方式進行資料讀取。

查詢方式

Tablestore提供的資料讀取介面包括GetRow、BatchGetRow和GetRange。讀取資料時,請根據實際查詢情境使用相應查詢方式讀取資料。

重要

當要讀取帶有自增主鍵列的表資料時,請確保已擷取到包含自增主鍵列值在內的完整主鍵。更多資訊,請參見主鍵列自增。如果未記錄自增主鍵列的值,您可以使用範圍讀取資料按照第一個主鍵列確定範圍讀取資料。

查詢方式

說明

適用情境

讀取單行資料

調用GetRow介面讀取一行資料。

適用於能確定完整主鍵且要讀取行數較少的情境。

批量讀取資料

調用BatchGetRow介面一次請求讀取多行資料或者一次對多張表進行讀取。

BatchGetRow操作由多個GetRow子操作組成,構造子操作的過程與使用GetRow介面時相同。

適用於能確定完整主鍵,且要讀取行數較多或者要讀取多個表中資料的情境。

範圍讀取資料

調用GetRange介面讀取一個範圍內的資料。

GetRange操作支援按照確定範圍進行正序讀取和逆序讀取,可以設定要讀取的行數。如果範圍較大,已掃描的行數或者資料量超過一定限制,會停止掃描,並返回已擷取的行和下一個主鍵資訊。您可以根據返回的下一個主鍵資訊,繼續發起請求,擷取範圍內剩餘的行。

適用於能確定完整主鍵範圍或者主鍵首碼的情境。

重要

如果不能確定主鍵首碼,您也可以通過設定完整主鍵範圍均為虛擬點INF_MIN和INF_MAX進行全表資料掃描,但是執行此操作會消耗較多計算資源,請謹慎使用。

迭代讀取資料

通過createRangeIterator介面迭代讀取資料。

適用於能確定完整主鍵範圍或者主鍵首碼,且需要迭代讀取的情境。

並發讀取資料

TableStoreReader是Table StoreJava SDK提供的工具類,封裝了BatchGetRow介面,可以實現並發查詢表中資料。同時支援多表查詢、查詢狀態統計、行層級回調和自訂配置功能。

適用於能確定完整主鍵,且要讀取行數較多或者要讀取多個表中資料的情境。

前提條件

  • 已初始化OTSClient。具體操作,請參見初始化OTSClient

  • 已建立資料表並寫入資料。

讀取單行資料

調用GetRow介面讀取一行資料。讀取的結果可能有如下兩種:

  • 如果該行存在,則返回該行的各主鍵列以及屬性列。

  • 如果該行不存在,則返回中不包含行,並且不會報錯。

參數

參數

說明

tableName

資料表名稱。

primaryKey

行的主鍵。主鍵包括主鍵列名、主鍵類型和主索引值。

重要

設定的主鍵個數和類型必須和表的主鍵個數和類型一致。

columnsToGet

讀取的列集合,列名可以是主鍵列或屬性列。

  • 如果不設定返回的列名,則返回整行資料。

  • 如果設定了返回的列名,當某行中指定的列均不存在時,則不返回該行,即傳回值為null;當某行中存在部分指定的列時,則返回該行且只返回存在的列。

說明
  • 查詢一行資料時,預設返回此行所有列的資料。如果需要只返回特定列,可以通過設定columnsToGet參數限制。如果將col0和col1加入到columnsToGet中,則只返回col0和col1列的值。

  • 當columnsToGet和filter同時使用時,執行順序是先擷取columnsToGet指定的列,再在返回的列中進行條件過濾。

maxVersions

最多讀取的版本數。

重要

maxVersions與timeRange必須至少設定一個。

  • 如果僅設定maxVersions,則最多返回所有版本中從新到舊指定數量版本的資料。

  • 如果僅設定timeRange,則返回該範圍內所有資料或指定版本資料。

  • 如果同時設定maxVersions和timeRange,則最多返回版本號碼範圍內從新到舊指定數量版本的資料。

timeRange

讀取版本號碼範圍或特定版本號碼的資料。更多資訊,請參見TimeRange

重要

maxVersions與timeRange必須至少設定一個。

  • 如果僅設定maxVersions,則最多返回所有版本中從新到舊指定數量版本的資料。

  • 如果僅設定timeRange,則返回該範圍內所有資料或指定版本資料。

  • 如果同時設定maxVersions和timeRange,則最多返回版本號碼範圍內從新到舊指定數量版本的資料。

  • 如果要查詢一個範圍的資料,則需要設定start和end。start和end分別表示起始時間戳記和結束時間戳記,範圍為前閉後開區間,即[start, end)

  • 如果要查詢特定版本號碼的資料,則需要設定timestamp。timestamp表示特定的時間戳記。

timestamp和[start, end)中只需要設定一個。

時間戳記的單位為毫秒,最小值為0,最大值為Long.MAX_VALUE

filter

使用過濾器,在服務端對讀取結果再進行一次過濾,只返回符合過濾器中條件的資料行。更多資訊,請參見過濾器

說明

當columnsToGet和filter同時使用時,執行順序是先擷取columnsToGet指定的列,再在返回的列中進行條件過濾。

樣本

讀取資料時,您可以指定要讀取的資料版本、要讀取的列、過濾器、正則過濾等。

讀取最新版本資料和指定列

以下樣本用於讀取資料表中的一行資料,設定讀取最新版本的資料和讀取指定的列。

private static void getRow(SyncClient client, String pkValue) {
    //構造主鍵。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();

    //讀取一行資料,設定資料表名稱。
    SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
    //設定讀取最新版本。
    criteria.setMaxVersions(1);
    GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
    Row row = getRowResponse.getRow();

    System.out.println("讀取完畢,結果為: ");
    System.out.println(row);

    //設定讀取某些列。
    criteria.addColumnsToGet("Col0");
    getRowResponse = client.getRow(new GetRowRequest(criteria));
    row = getRowResponse.getRow();

    System.out.println("讀取完畢,結果為:");
    System.out.println(row);
} 

讀取資料時使用過濾器

以下樣本用於讀取資料表中的一行資料,設定讀取最新版本的資料以及根據Col0列的值過濾資料。

private static void getRow(SyncClient client, String pkValue) {
    //構造主鍵。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
    PrimaryKey primaryKey = primaryKeyBuilder.build();

    //讀取一行資料,設定資料表名稱。
    SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
    //設定讀取最新版本。
    criteria.setMaxVersions(1);

    //設定過濾器,當Col0列的值為0時,返回該行。
    SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
            SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
    //如果Col0列不存在,則不返回該行。
    singleColumnValueFilter.setPassIfMissing(false);
    criteria.setFilter(singleColumnValueFilter);

    GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
    Row row = getRowResponse.getRow();

    System.out.println("讀取完畢,結果為: ");
    System.out.println(row);
}

讀取資料時使用正則過濾

以下樣本用於讀取資料表一行中Col1列的資料,並對該列的資料執行正則過濾。

private static void getRow(SyncClient client, String pkValue) {
    //設定資料表名稱。
    SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>");
 
    //構造主鍵。
    PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue))
        .build();
    criteria.setPrimaryKey(primaryKey);
 
    // 設定讀取最新版本。
    criteria.setMaxVersions(1);
 
    // 設定過濾器,當cast<int>(regex(Col1)) > 100時,返回該行。
    RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
    SingleColumnValueRegexFilter filter =  new SingleColumnValueRegexFilter("Col1",
        regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN, ColumnValue.fromLong(100));
    criteria.setFilter(filter);
 
    GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
    Row row = getRowResponse.getRow();

    System.out.println("讀取完畢,結果為: ");
    System.out.println(row);
}

批量讀取資料

調用BatchGetRow介面一次請求讀取多行資料,也支援一次對多張表進行讀取。BatchGetRow由多個GetRow子操作組成。構造子操作的過程與使用GetRow介面時相同。

BatchGetRow的各個子操作獨立執行,Tablestore會分別返回各個子操作的執行結果。

注意事項

  • 批量讀取的所有行採用相同的參數條件,例如ColumnsToGet=[colA],則要讀取的所有行都唯讀取colA列。

  • 由於批量讀取可能存在部分行失敗的情況,失敗行的錯誤資訊在返回的BatchGetRowResponse中,但並不拋出異常。因此調用BatchGetRow介面時,需要檢查傳回值,可通過BatchGetRowResponse的isAllSucceed方法判斷是否所有行都擷取成功;通過BatchGetRowResponse的getFailedRows方法擷取失敗行的資訊。

  • BatchGetRow操作單次支援讀取的最大行數為100行。

參數

更多資訊,請參見讀取單行資料參數

樣本

以下樣本用於讀取10行,設定版本條件、要讀取的列、過濾器等。

private static void batchGetRow(SyncClient client) {
    //設定資料表名稱。
    MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria("<TABLE_NAME>");
    //加入10個要讀取的行。
    for (int i = 0; i < 10; i++) {
        PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
        primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("pk" + i));
        PrimaryKey primaryKey = primaryKeyBuilder.build();
        multiRowQueryCriteria.addRow(primaryKey);
    }
    //添加條件。
    multiRowQueryCriteria.setMaxVersions(1);
    multiRowQueryCriteria.addColumnsToGet("Col0");
    multiRowQueryCriteria.addColumnsToGet("Col1");
    SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
            SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
    singleColumnValueFilter.setPassIfMissing(false);
    multiRowQueryCriteria.setFilter(singleColumnValueFilter);

    BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
    //BatchGetRow支援讀取多個表的資料,一個multiRowQueryCriteria對應一個表的查詢條件,可以添加多個multiRowQueryCriteria。
    batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);

    BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);

    System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
    System.out.println("讀取完畢,結果為: ");
    for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getSucceedRows()) {
        System.out.println(rowResult.getRow());
    }
    if (!batchGetRowResponse.isAllSucceed()) {
        for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
            System.out.println("失敗的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
            System.out.println("失敗原因:" + rowResult.getError());
        }

        /**
         * 可以通過createRequestForRetry方法再構造一個請求對失敗的行進行重試。此處只給出構造重試請求的部分。
         * 推薦的重試方法是使用SDK的自訂重試策略功能,支援對batch操作的部分行錯誤進行重試。設定重試策略後,調用介面處無需增加重試代碼。
         */
        BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
    }
}

詳細代碼請參見BatchGetRow@GitHub

範圍讀取資料

調用GetRange介面讀取一個範圍內的資料。

GetRange操作支援按照確定範圍進行正序讀取和逆序讀取,可以設定要讀取的行數。如果範圍較大,已掃描的行數或者資料量超過一定限制,會停止掃描,並返回已擷取的行和下一個主鍵資訊。您可以根據返回的下一個主鍵資訊,繼續發起請求,擷取範圍內剩餘的行。

說明

Tablestore表中的行都是按照主鍵排序的,而主鍵是由全部主鍵列按照順序組成的,所以不能理解為Tablestore會按照某列主鍵排序,這是常見的誤區。

注意事項

GetRange操作遵循最左匹配原則,讀取資料時,依次比較第一主鍵列到第四主鍵列。例如表的主鍵包括PK1、PK2、PK3三個主鍵列,讀取資料時,優先比較PK1是否在開始主鍵與結束主鍵的範圍內,如果PK1在設定的主鍵範圍內,則不會再比較其他的主鍵,返回在PK1主鍵範圍內的資料;如果PK1在設定的主鍵邊界上,則繼續比較PK2是否在開始主鍵與結束主鍵的範圍內,以此類推。

GetRange操作可能在如下情況停止執行並返回資料。

  • 掃描的行資料大小之和達到4 MB。

  • 掃描的行數等於5000。

  • 返回的行數等於最大返回行數。

  • 當前剩餘的預留讀輸送量已全部使用,餘量不足以讀取下一條資料。

當使用GetRange掃描的資料量較大時,Tablestore每次請求僅會掃描一次(行數大於5000或者大小大於4 MB停止掃描),超過限制的資料不會繼續返回,需要通過翻頁繼續擷取後面的資料。

參數

參數

說明

tableName

資料表名稱。

direction

讀取方向。

  • 如果值為正序(FORWARD),則起始主鍵必須小於結束主鍵,返回的行按照主鍵由小到大的順序進行排列。

  • 如果值為逆序(BACKWARD),則起始主鍵必須大於結束主鍵,返回的行按照主鍵由大到小的順序進行排列。

假設同一表中有兩個主鍵A和B,A小於B,如果正序讀取[A, B),則按從A到B的順序返回主鍵大於等於A且小於B的行資料;如果逆序讀取[B, A),則按從B到A的順序返回大於A且小於等於B的行資料。

inclusiveStartPrimaryKey

本次範圍讀取的起始主鍵和結束主鍵,起始主鍵和結束主鍵需要是有效主鍵或者是由INF_MIN和INF_MAX類型組成的虛擬點,虛擬點的列數必須與主鍵相同。

其中INF_MIN表示無限小,任何類型的值都比它大;INF_MAX表示無限大,任何類型的值都比它小。

  • inclusiveStartPrimaryKey表示起始主鍵,如果該行存在,則返回結果中一定會包含此行。

  • exclusiveEndPrimaryKey表示結束主鍵,無論該行是否存在,返回結果中都不會包含此行。

資料表中的行按主鍵從小到大排序,讀取範圍是一個左閉右開的區間,正序讀取時,返回的是大於等於起始主鍵且小於結束主鍵的所有的行。

exclusiveEndPrimaryKey

limit

資料的最大返回行數,此值必須大於 0。

Tablestore按照正序或者逆序返回指定的最大返回行數後即結束該操作的執行,即使該區間內仍有未返回的資料。此時可以通過返回結果中的nextStartPrimaryKey記錄本次讀取到的位置,用於下一次讀取。

columnsToGet

讀取的列集合,列名可以是主鍵列或屬性列。

  • 如果不設定返回的列名,則返回整行資料。

  • 如果設定了返回的列名,當某行中指定的列均不存在時,則不返回該行,即傳回值為null;當某行中存在部分指定的列時,則返回該行且只返回存在的列。

說明
  • 查詢一行資料時,預設返回此行所有列的資料。如果需要只返回特定列,可以通過設定columnsToGet參數限制。如果將col0和col1加入到columnsToGet中,則只返回col0和col1列的值。

  • 如果某行資料的主鍵屬於讀取範圍,但是該行資料不包含指定返回的列,則返回結果中不包含該行資料。

  • 當columnsToGet和filter同時使用時,執行順序是先擷取columnsToGet指定的列,再在返回的列中進行條件過濾。

maxVersions

最多讀取的版本數。

重要

maxVersions與timeRange必須至少設定一個。

  • 如果僅設定maxVersions,則最多返回所有版本中從新到舊指定數量版本的資料。

  • 如果僅設定timeRange,則返回該範圍內所有資料或指定版本資料。

  • 如果同時設定maxVersions和timeRange,則最多返回版本號碼範圍內從新到舊指定數量版本的資料。

timeRange

讀取版本號碼範圍或特定版本號碼的資料。更多資訊,請參見TimeRange

重要

maxVersions與timeRange必須至少設定一個。

  • 如果僅設定maxVersions,則最多返回所有版本中從新到舊指定數量版本的資料。

  • 如果僅設定timeRange,則返回該範圍內所有資料或指定版本資料。

  • 如果同時設定maxVersions和timeRange,則最多返回版本號碼範圍內從新到舊指定數量版本的資料。

  • 如果要查詢一個範圍的資料,則需要設定start和end。start和end分別表示起始時間戳記和結束時間戳記,範圍為前閉後開區間,即[start, end)

  • 如果要查詢特定版本號碼的資料,則需要設定timestamp。timestamp表示特定的時間戳記。

timestamp和[start, end)中只需要設定一個。

時間戳記的單位為毫秒,最小值為0,最大值為Long.MAX_VALUE

filter

使用過濾器,在服務端對讀取結果再進行一次過濾,只返回符合過濾器中條件的資料行。更多資訊,請參見過濾器

說明

當columnsToGet和filter同時使用時,執行順序是先擷取columnsToGet指定的列,再在返回的列中進行條件過濾。

nextStartPrimaryKey

根據返回結果中的nextStartPrimaryKey判斷資料是否全部讀取。

  • 當返回結果中nextStartPrimaryKey不為空白時,可以使用此傳回值作為下一次GetRange操作的起始點繼續讀取資料。

  • 當返回結果中nextStartPrimaryKey為空白時,表示讀取範圍內的資料全部返回。

樣本

按照確定範圍讀取資料

以下樣本用於按照確定範圍進行正序讀取,判斷nextStartPrimaryKey是否為空白,讀取完範圍內的全部資料。

private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
    //設定資料表名稱。
    RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");

    //設定起始主鍵。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
    rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

    //設定結束主鍵。
    primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
    rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

    rangeRowQueryCriteria.setMaxVersions(1);

    System.out.println("GetRange的結果為:");
    while (true) {
        GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
        for (Row row : getRangeResponse.getRows()) {
            System.out.println(row);
        }

        //如果NextStartPrimaryKey不為null,則繼續讀取。
        if (getRangeResponse.getNextStartPrimaryKey() != null) {
            rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
        } else {
            break;
        }
    }
}         

按照第一個主鍵列確定範圍讀取資料

以下樣本用於按照第一個主鍵列確定範圍、第二主鍵列從最小值(INF_MIN)到最大值(INF_MAX)進行正序讀取,判斷nextStartPrimaryKey是否為null,讀取完範圍內的全部資料。

private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
    //設定資料表名稱。
    RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
    //設定起始主鍵,以兩個主鍵為例。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(startPkValue));//確定值。
    primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MIN);//最小值。
    rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

    //設定結束主鍵。
    primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(endPkValue));//確定值。
    primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MAX);//最大值。
    rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

    rangeRowQueryCriteria.setMaxVersions(1);

    System.out.println("GetRange的結果為:");
    while (true) {
        GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
        for (Row row : getRangeResponse.getRows()) {
            System.out.println(row);
        }

        //如果nextStartPrimaryKey不為null,則繼續讀取。
        if (getRangeResponse.getNextStartPrimaryKey() != null) {
            rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
        } else {
            break;
        }
    }
}    

按照確定範圍讀取資料並對指定列使用正則過濾

以下樣本用於讀取主鍵範圍為["pk:2020-01-01.log", "pk:2021-01-01.log")時Col1列的資料,並對該列的資料執行正則過濾。

private static void getRange(SyncClient client) {
    //設定資料表名稱。
    RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("<TABLE_NAME>");
 
    //設定主鍵範圍為["pk:2020-01-01.log", "pk:2021-01-01.log"),讀取範圍為左閉右開的區間。
    PrimaryKey pk0 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2020-01-01.log"))
        .build();
    PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
        .addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2021-01-01.log"))
        .build();
    criteria.setInclusiveStartPrimaryKey(pk0);
    criteria.setExclusiveEndPrimaryKey(pk1);
 
    //設定讀取最新版本。
    criteria.setMaxVersions(1);
 
    //設定過濾器,當cast<int>(regex(Col1)) > 100時,返回該行。
    RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
    SingleColumnValueRegexFilter filter =  new SingleColumnValueRegexFilter("Col1",
        regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN,ColumnValue.fromLong(100));
    criteria.setFilter(filter);

    while (true) {
        GetRangeResponse resp = client.getRange(new GetRangeRequest(criteria));
        for (Row row : resp.getRows()) {
            // do something
            System.out.println(row);
        }
        if (resp.getNextStartPrimaryKey() != null) {
            criteria.setInclusiveStartPrimaryKey(resp.getNextStartPrimaryKey());
        } else {
            break;
        }
   }
}

詳細代碼請參見GetRange@GitHub

迭代讀取資料

以下樣本用於通過createRangeIterator介面迭代讀取exampletable表中的資料。

private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
    //設定資料表名稱。
    RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter("<TABLE_NAME>");

    //設定起始主鍵。
    PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
    rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

    //設定結束主鍵。
    primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
    primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
    rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

    rangeIteratorParameter.setMaxVersions(1);

    Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);

    System.out.println("使用Iterator進行GetRange的結果為:");
    while (iterator.hasNext()) {
        Row row = iterator.next();
        System.out.println(row);
    }
}           

詳細代碼請參見GetRangeByIterator@GitHub

並發讀取資料

TableStoreReader是Table StoreJava SDK提供的工具類,封裝了BatchGetRow介面,可以實現並發查詢表中資料。同時支援多表查詢、查詢狀態統計、行層級回調和自訂配置功能。

重要

Table Store從Java SDK 5.16.1版本開始支援此功能,請確保使用了正確的SDK版本。關於Java SDK歷史迭代版本的更多資訊,請參見Java SDK歷史迭代版本

快速上手

  1. 構造TableStoreReader。

    String endpoint = "<ENDPOINT>";
    String accessKeyId = System.getenv("OTS_AK_ENV");
    String accessKeySecret = System.getenv("OTS_SK_ENV");
    String instanceName = "<INSTANCE_NAME>";
    
    AsyncClientInterface client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
    TableStoreReaderConfig config = new TableStoreReaderConfig();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024))
    
    TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, null);
  2. 構造查詢請求。

    將要查詢的資料緩衝到記憶體中,支援一次添加一條或多條資料。

    PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
            .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
            .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
            .build();
    //新增一列查詢表中,pk1的屬性列。
    Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME1>", pk1);
    //也可以使用List一次添加多列。
    List<PrimaryKey> primaryKeyList = new ArrayList<PrimaryKey>();
    Future<ReaderResult> readerResult = reader.addPrimaryKeysWithFuture("<TABLE_NAME2>", primaryKeyList);
  3. 查詢資料。

    將記憶體中緩衝的查詢發送出去。您可以通過同步阻塞方式或者非同步方式查詢資料。

    • 同步阻塞方式

      reader.flush();
    • 非同步方式

      reader.send();
  4. 擷取查詢結果。

    //列印成功查詢和失敗查詢的資訊。
    for (RowReadResult success : readerResult.get().getSucceedRows()) {
        System.out.println(success.getRowResult());
    }
    
    for (RowReadResult fail : readerResult.get().getFailedRows()) {
        System.out.println(fail.getRowResult());
    }
  5. 關閉TableStoreReader。

    reader.close();
    //根據實際需要關閉client和executor。
    client.shutdown();
    executor.shutdown();

配置項

您可以通過修改TableStoreReaderConfig自訂配置TableStoreReader。

參數

說明

checkTableMeta

是否在添加查詢行時檢查表的結構。預設值為true。

如果在添加查詢行時不需要檢查表結構,請設定此參數為false。

bucketCount

Reader記憶體中的緩衝分桶數。預設值為4。

bufferSize

每個分桶的RingBuffer緩衝區大小,預設值為1024。

concurrency

調用batchGetRow的最大並發度。預設值為10。

maxBatchRowsCount

每次batchGetRow的最大請求行數。預設值為100,最大值為100。

defaultMaxVersions

預設情況下,getRow查詢的最大版本數。預設值為1。

flushInterval

自動flush緩衝中查詢請求的間隔。預設值為10000。單位為毫秒。

logInterval

自動列印任務狀態的間隔。預設值為10000。單位為毫秒

指定查詢條件

您可以指定表層級的查詢參數,例如查詢的最大版本數,要查詢的列、時間範圍等。

//查詢表的col1列過去60秒的最多10個版本。
//設定資料表名稱。
RowQueryCriteria criteria = new RowQueryCriteria("<TABLE_NAME>");
//設定要返回的列。
criteria.addColumnsToGet("col1");
//設定返回的最大版本數。
criteria.setMaxVersions(10);
criteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 60 * 1000, System.currentTimeMillis()));
reader.setRowQueryCriteria(criteria);

完整樣本

public class TableStoreReaderDemo {
    private static final String endpoint = "<ENDPOINT>";
    private static final String accessKeyId = System.getenv("OTS_AK_ENV");
    private static final String accessKeySecret = System.getenv("OTS_SK_ENV");
    private static final String instanceName = "<INSTANCE_NAME>";
    private static AsyncClientInterface client;
    private static ExecutorService executor;
    private static AtomicLong succeedRows = new AtomicLong();
    private static AtomicLong failedRows = new AtomicLong();

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        /**
         * 步驟一:構造TableStoreReader。
         */
        //構造AsyncClient。
        client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
        //構造reader配置類。
        TableStoreReaderConfig config = new TableStoreReaderConfig();
        {
            //以下參數均有預設值,可不配置。
            //向reader添加要查詢的資料前,會先檢查表的結構。
            config.setCheckTableMeta(true);  
            //一次請求的最大請求行數,上限為100。
            config.setMaxBatchRowsCount(100);    
            //預設情況下,擷取的columns最大版本數。
            config.setDefaultMaxVersions(1);
            //發送請求的總並發數。
            config.setConcurrency(16); 
            //記憶體分桶數。
            config.setBucketCount(4);      
            //將快取資料全部發送的時間間隔。
            config.setFlushInterval(10000);      
            //日誌記錄reader狀態的時間間隔。
            config.setLogInterval(10000);                   
        }
        //構造用於發送請求的executor。
        ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger counter = new AtomicInteger(1);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "reader-" + counter.getAndIncrement());
            }
        };
        executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue(1024), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

        //構造reader的回呼函數。
        TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
            @Override
            public void onCompleted(PrimaryKeyWithTable req, RowReadResult res) {
                succeedRows.incrementAndGet();
            }

            @Override
            public void onFailed(PrimaryKeyWithTable req, Exception ex) {
                failedRows.incrementAndGet();
            }
        };
        TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, callback);
        /**
         * 步驟二:構造查詢請求。
         */
        //向記憶體添加一列要查詢的資料。
        PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
                .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
                .build();
        reader.addPrimaryKey("<TABLE_NAME1>", pk1);

        //向記憶體添加一列要查詢的資料,並擷取查詢結果Future。
        PrimaryKey pk2 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
                .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
                .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
                .build();
        Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME2>", pk2);
        /**
         * 步驟三:查詢資料。
         */
        //非同步將記憶體中的資料發送出去。
        reader.send();
        /**
         * 步驟四:擷取查詢結果。
         */
        //列印成功查詢和失敗查詢的資訊。
        for (RowReadResult success : readerResult.get().getSucceedRows()) {
            System.out.println(success.getRowResult());
        }
        for (RowReadResult fail : readerResult.get().getFailedRows()) {
            System.out.println(fail.getRowResult());
        }
        /**
         * 步驟五:關閉TableStoreReader。
         */
        reader.close();
        client.shutdown();
        executor.shutdown();
    }
}

相關文檔

  • 如果要使用索引技術加速資料查詢,您可以通過二級索引或者多元索引功能實現。更多資訊,請參見二級索引或者多元索引

  • 如果要可視化展示表中資料,您可以通過對接DataV或者Grafana工具實現。更多資訊,請參見資料視覺效果

  • 如果要下載表中資料到本地,您可以通過DataX、Table Store命令列CLI工具實現。更多資訊,請參見將Table Store資料下載到本地檔案

  • 如果要計算與分析表中資料,您可以通過Table StoreSQL查詢實現。更多資訊,請參見SQL查詢

    說明

    您還可以通過MaxCompute、Spark、Hive或者HadoopMR、Function Compute、Flink等計算引擎實現表中資料的計算與分析。具體操作,請參見計算與分析