Tablestore提供了單行讀取、批量讀取、範圍讀取、迭代讀取和並行讀取的查詢方式用於讀取資料表中資料。資料寫入到資料表後,您可以選擇所需資料查詢方式進行資料讀取。
查詢方式
Tablestore提供的資料讀取介面包括GetRow、BatchGetRow和GetRange。讀取資料時,請根據實際查詢情境使用相應查詢方式讀取資料。
當要讀取帶有自增主鍵列的表資料時,請確保已擷取到包含自增主鍵列值在內的完整主鍵。更多資訊,請參見主鍵列自增。如果未記錄自增主鍵列的值,您可以使用範圍讀取資料按照第一個主鍵列確定範圍讀取資料。
查詢方式 | 說明 | 適用情境 |
調用GetRow介面讀取一行資料。 | 適用於能確定完整主鍵且要讀取行數較少的情境。 | |
調用BatchGetRow介面一次請求讀取多行資料或者一次對多張表進行讀取。 BatchGetRow操作由多個GetRow子操作組成,構造子操作的過程與使用GetRow介面時相同。 | 適用於能確定完整主鍵,且要讀取行數較多或者要讀取多個表中資料的情境。 | |
調用GetRange介面讀取一個範圍內的資料。 GetRange操作支援按照確定範圍進行正序讀取和逆序讀取,可以設定要讀取的行數。如果範圍較大,已掃描的行數或者資料量超過一定限制,會停止掃描,並返回已擷取的行和下一個主鍵資訊。您可以根據返回的下一個主鍵資訊,繼續發起請求,擷取範圍內剩餘的行。 | 適用於能確定完整主鍵範圍或者主鍵首碼的情境。 重要 如果不能確定主鍵首碼,您也可以通過設定完整主鍵範圍均為虛擬點INF_MIN和INF_MAX進行全表資料掃描,但是執行此操作會消耗較多計算資源,請謹慎使用。 | |
通過createRangeIterator介面迭代讀取資料。 | 適用於能確定完整主鍵範圍或者主鍵首碼,且需要迭代讀取的情境。 | |
TableStoreReader是Table StoreJava SDK提供的工具類,封裝了BatchGetRow介面,可以實現並發查詢表中資料。同時支援多表查詢、查詢狀態統計、行層級回調和自訂配置功能。 | 適用於能確定完整主鍵,且要讀取行數較多或者要讀取多個表中資料的情境。 |
前提條件
已初始化OTSClient。具體操作,請參見初始化OTSClient。
已建立資料表並寫入資料。
讀取單行資料
調用GetRow介面讀取一行資料。讀取的結果可能有如下兩種:
如果該行存在,則返回該行的各主鍵列以及屬性列。
如果該行不存在,則返回中不包含行,並且不會報錯。
參數
參數 | 說明 |
tableName | 資料表名稱。 |
primaryKey | 行的主鍵。主鍵包括主鍵列名、主鍵類型和主索引值。 重要 設定的主鍵個數和類型必須和表的主鍵個數和類型一致。 |
columnsToGet | 讀取的列集合,列名可以是主鍵列或屬性列。
說明
|
maxVersions | 最多讀取的版本數。 重要 maxVersions與timeRange必須至少設定一個。
|
timeRange | 讀取版本號碼範圍或特定版本號碼的資料。更多資訊,請參見TimeRange。 重要 maxVersions與timeRange必須至少設定一個。
timestamp和 時間戳記的單位為毫秒,最小值為0,最大值為 |
filter | 使用過濾器,在服務端對讀取結果再進行一次過濾,只返回符合過濾器中條件的資料行。更多資訊,請參見過濾器。 說明 當columnsToGet和filter同時使用時,執行順序是先擷取columnsToGet指定的列,再在返回的列中進行條件過濾。 |
樣本
讀取資料時,您可以指定要讀取的資料版本、要讀取的列、過濾器、正則過濾等。
讀取最新版本資料和指定列
以下樣本用於讀取資料表中的一行資料,設定讀取最新版本的資料和讀取指定的列。
private static void getRow(SyncClient client, String pkValue) {
//構造主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
//讀取一行資料,設定資料表名稱。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
//設定讀取最新版本。
criteria.setMaxVersions(1);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為: ");
System.out.println(row);
//設定讀取某些列。
criteria.addColumnsToGet("Col0");
getRowResponse = client.getRow(new GetRowRequest(criteria));
row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為:");
System.out.println(row);
}
讀取資料時使用過濾器
以下樣本用於讀取資料表中的一行資料,設定讀取最新版本的資料以及根據Col0列的值過濾資料。
private static void getRow(SyncClient client, String pkValue) {
//構造主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
//讀取一行資料,設定資料表名稱。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
//設定讀取最新版本。
criteria.setMaxVersions(1);
//設定過濾器,當Col0列的值為0時,返回該行。
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
//如果Col0列不存在,則不返回該行。
singleColumnValueFilter.setPassIfMissing(false);
criteria.setFilter(singleColumnValueFilter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為: ");
System.out.println(row);
}
讀取資料時使用正則過濾
以下樣本用於讀取資料表一行中Col1列的資料,並對該列的資料執行正則過濾。
private static void getRow(SyncClient client, String pkValue) {
//設定資料表名稱。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>");
//構造主鍵。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue))
.build();
criteria.setPrimaryKey(primaryKey);
// 設定讀取最新版本。
criteria.setMaxVersions(1);
// 設定過濾器,當cast<int>(regex(Col1)) > 100時,返回該行。
RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
SingleColumnValueRegexFilter filter = new SingleColumnValueRegexFilter("Col1",
regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN, ColumnValue.fromLong(100));
criteria.setFilter(filter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為: ");
System.out.println(row);
}
批量讀取資料
調用BatchGetRow介面一次請求讀取多行資料,也支援一次對多張表進行讀取。BatchGetRow由多個GetRow子操作組成。構造子操作的過程與使用GetRow介面時相同。
BatchGetRow的各個子操作獨立執行,Tablestore會分別返回各個子操作的執行結果。
注意事項
批量讀取的所有行採用相同的參數條件,例如
ColumnsToGet=[colA]
,則要讀取的所有行都唯讀取colA列。由於批量讀取可能存在部分行失敗的情況,失敗行的錯誤資訊在返回的BatchGetRowResponse中,但並不拋出異常。因此調用BatchGetRow介面時,需要檢查傳回值,可通過BatchGetRowResponse的isAllSucceed方法判斷是否所有行都擷取成功;通過BatchGetRowResponse的getFailedRows方法擷取失敗行的資訊。
BatchGetRow操作單次支援讀取的最大行數為100行。
參數
更多資訊,請參見讀取單行資料參數。
樣本
以下樣本用於讀取10行,設定版本條件、要讀取的列、過濾器等。
private static void batchGetRow(SyncClient client) {
//設定資料表名稱。
MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria("<TABLE_NAME>");
//加入10個要讀取的行。
for (int i = 0; i < 10; i++) {
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("pk" + i));
PrimaryKey primaryKey = primaryKeyBuilder.build();
multiRowQueryCriteria.addRow(primaryKey);
}
//添加條件。
multiRowQueryCriteria.setMaxVersions(1);
multiRowQueryCriteria.addColumnsToGet("Col0");
multiRowQueryCriteria.addColumnsToGet("Col1");
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
singleColumnValueFilter.setPassIfMissing(false);
multiRowQueryCriteria.setFilter(singleColumnValueFilter);
BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
//BatchGetRow支援讀取多個表的資料,一個multiRowQueryCriteria對應一個表的查詢條件,可以添加多個multiRowQueryCriteria。
batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);
BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);
System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
System.out.println("讀取完畢,結果為: ");
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getSucceedRows()) {
System.out.println(rowResult.getRow());
}
if (!batchGetRowResponse.isAllSucceed()) {
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
System.out.println("失敗的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
System.out.println("失敗原因:" + rowResult.getError());
}
/**
* 可以通過createRequestForRetry方法再構造一個請求對失敗的行進行重試。此處只給出構造重試請求的部分。
* 推薦的重試方法是使用SDK的自訂重試策略功能,支援對batch操作的部分行錯誤進行重試。設定重試策略後,調用介面處無需增加重試代碼。
*/
BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
}
}
詳細代碼請參見BatchGetRow@GitHub。
範圍讀取資料
調用GetRange介面讀取一個範圍內的資料。
GetRange操作支援按照確定範圍進行正序讀取和逆序讀取,可以設定要讀取的行數。如果範圍較大,已掃描的行數或者資料量超過一定限制,會停止掃描,並返回已擷取的行和下一個主鍵資訊。您可以根據返回的下一個主鍵資訊,繼續發起請求,擷取範圍內剩餘的行。
Tablestore表中的行都是按照主鍵排序的,而主鍵是由全部主鍵列按照順序組成的,所以不能理解為Tablestore會按照某列主鍵排序,這是常見的誤區。
注意事項
GetRange操作遵循最左匹配原則,讀取資料時,依次比較第一主鍵列到第四主鍵列。例如表的主鍵包括PK1、PK2、PK3三個主鍵列,讀取資料時,優先比較PK1是否在開始主鍵與結束主鍵的範圍內,如果PK1在設定的主鍵範圍內,則不會再比較其他的主鍵,返回在PK1主鍵範圍內的資料;如果PK1在設定的主鍵邊界上,則繼續比較PK2是否在開始主鍵與結束主鍵的範圍內,以此類推。
GetRange操作可能在如下情況停止執行並返回資料。
掃描的行資料大小之和達到4 MB。
掃描的行數等於5000。
返回的行數等於最大返回行數。
當前剩餘的預留讀輸送量已全部使用,餘量不足以讀取下一條資料。
當使用GetRange掃描的資料量較大時,Tablestore每次請求僅會掃描一次(行數大於5000或者大小大於4 MB停止掃描),超過限制的資料不會繼續返回,需要通過翻頁繼續擷取後面的資料。
參數
參數 | 說明 |
tableName | 資料表名稱。 |
direction | 讀取方向。
假設同一表中有兩個主鍵A和B,A小於B,如果正序讀取 |
inclusiveStartPrimaryKey | 本次範圍讀取的起始主鍵和結束主鍵,起始主鍵和結束主鍵需要是有效主鍵或者是由INF_MIN和INF_MAX類型組成的虛擬點,虛擬點的列數必須與主鍵相同。 其中INF_MIN表示無限小,任何類型的值都比它大;INF_MAX表示無限大,任何類型的值都比它小。
資料表中的行按主鍵從小到大排序,讀取範圍是一個左閉右開的區間,正序讀取時,返回的是大於等於起始主鍵且小於結束主鍵的所有的行。 |
exclusiveEndPrimaryKey | |
limit | 資料的最大返回行數,此值必須大於 0。 Tablestore按照正序或者逆序返回指定的最大返回行數後即結束該操作的執行,即使該區間內仍有未返回的資料。此時可以通過返回結果中的nextStartPrimaryKey記錄本次讀取到的位置,用於下一次讀取。 |
columnsToGet | 讀取的列集合,列名可以是主鍵列或屬性列。
說明
|
maxVersions | 最多讀取的版本數。 重要 maxVersions與timeRange必須至少設定一個。
|
timeRange | 讀取版本號碼範圍或特定版本號碼的資料。更多資訊,請參見TimeRange。 重要 maxVersions與timeRange必須至少設定一個。
timestamp和 時間戳記的單位為毫秒,最小值為0,最大值為 |
filter | 使用過濾器,在服務端對讀取結果再進行一次過濾,只返回符合過濾器中條件的資料行。更多資訊,請參見過濾器。 說明 當columnsToGet和filter同時使用時,執行順序是先擷取columnsToGet指定的列,再在返回的列中進行條件過濾。 |
nextStartPrimaryKey | 根據返回結果中的nextStartPrimaryKey判斷資料是否全部讀取。
|
樣本
按照確定範圍讀取資料
以下樣本用於按照確定範圍進行正序讀取,判斷nextStartPrimaryKey是否為空白,讀取完範圍內的全部資料。
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
//設定資料表名稱。
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//設定起始主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//設定結束主鍵。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的結果為:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
//如果NextStartPrimaryKey不為null,則繼續讀取。
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
按照第一個主鍵列確定範圍讀取資料
以下樣本用於按照第一個主鍵列確定範圍、第二主鍵列從最小值(INF_MIN)到最大值(INF_MAX)進行正序讀取,判斷nextStartPrimaryKey是否為null,讀取完範圍內的全部資料。
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
//設定資料表名稱。
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//設定起始主鍵,以兩個主鍵為例。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(startPkValue));//確定值。
primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MIN);//最小值。
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//設定結束主鍵。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(endPkValue));//確定值。
primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MAX);//最大值。
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的結果為:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
//如果nextStartPrimaryKey不為null,則繼續讀取。
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
按照確定範圍讀取資料並對指定列使用正則過濾
以下樣本用於讀取主鍵範圍為["pk:2020-01-01.log", "pk:2021-01-01.log")
時Col1列的資料,並對該列的資料執行正則過濾。
private static void getRange(SyncClient client) {
//設定資料表名稱。
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//設定主鍵範圍為["pk:2020-01-01.log", "pk:2021-01-01.log"),讀取範圍為左閉右開的區間。
PrimaryKey pk0 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2020-01-01.log"))
.build();
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2021-01-01.log"))
.build();
criteria.setInclusiveStartPrimaryKey(pk0);
criteria.setExclusiveEndPrimaryKey(pk1);
//設定讀取最新版本。
criteria.setMaxVersions(1);
//設定過濾器,當cast<int>(regex(Col1)) > 100時,返回該行。
RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
SingleColumnValueRegexFilter filter = new SingleColumnValueRegexFilter("Col1",
regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN,ColumnValue.fromLong(100));
criteria.setFilter(filter);
while (true) {
GetRangeResponse resp = client.getRange(new GetRangeRequest(criteria));
for (Row row : resp.getRows()) {
// do something
System.out.println(row);
}
if (resp.getNextStartPrimaryKey() != null) {
criteria.setInclusiveStartPrimaryKey(resp.getNextStartPrimaryKey());
} else {
break;
}
}
}
詳細代碼請參見GetRange@GitHub。
迭代讀取資料
以下樣本用於通過createRangeIterator介面迭代讀取exampletable表中的資料。
private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
//設定資料表名稱。
RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter("<TABLE_NAME>");
//設定起始主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//設定結束主鍵。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeIteratorParameter.setMaxVersions(1);
Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);
System.out.println("使用Iterator進行GetRange的結果為:");
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}
詳細代碼請參見GetRangeByIterator@GitHub。
並發讀取資料
TableStoreReader是Table StoreJava SDK提供的工具類,封裝了BatchGetRow介面,可以實現並發查詢表中資料。同時支援多表查詢、查詢狀態統計、行層級回調和自訂配置功能。
Table Store從Java SDK 5.16.1版本開始支援此功能,請確保使用了正確的SDK版本。關於Java SDK歷史迭代版本的更多資訊,請參見Java SDK歷史迭代版本。
快速上手
構造TableStoreReader。
String endpoint = "<ENDPOINT>"; String accessKeyId = System.getenv("OTS_AK_ENV"); String accessKeySecret = System.getenv("OTS_SK_ENV"); String instanceName = "<INSTANCE_NAME>"; AsyncClientInterface client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName); TableStoreReaderConfig config = new TableStoreReaderConfig(); ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024)) TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, null);
構造查詢請求。
將要查詢的資料緩衝到記憶體中,支援一次添加一條或多條資料。
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0)) .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0)) .build(); //新增一列查詢表中,pk1的屬性列。 Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME1>", pk1); //也可以使用List一次添加多列。 List<PrimaryKey> primaryKeyList = new ArrayList<PrimaryKey>(); Future<ReaderResult> readerResult = reader.addPrimaryKeysWithFuture("<TABLE_NAME2>", primaryKeyList);
查詢資料。
將記憶體中緩衝的查詢發送出去。您可以通過同步阻塞方式或者非同步方式查詢資料。
同步阻塞方式
reader.flush();
非同步方式
reader.send();
擷取查詢結果。
//列印成功查詢和失敗查詢的資訊。 for (RowReadResult success : readerResult.get().getSucceedRows()) { System.out.println(success.getRowResult()); } for (RowReadResult fail : readerResult.get().getFailedRows()) { System.out.println(fail.getRowResult()); }
關閉TableStoreReader。
reader.close(); //根據實際需要關閉client和executor。 client.shutdown(); executor.shutdown();
配置項
您可以通過修改TableStoreReaderConfig
自訂配置TableStoreReader。
參數 | 說明 |
checkTableMeta | 是否在添加查詢行時檢查表的結構。預設值為true。 如果在添加查詢行時不需要檢查表結構,請設定此參數為false。 |
bucketCount | Reader記憶體中的緩衝分桶數。預設值為4。 |
bufferSize | 每個分桶的RingBuffer緩衝區大小,預設值為1024。 |
concurrency | 調用batchGetRow的最大並發度。預設值為10。 |
maxBatchRowsCount | 每次batchGetRow的最大請求行數。預設值為100,最大值為100。 |
defaultMaxVersions | 預設情況下,getRow查詢的最大版本數。預設值為1。 |
flushInterval | 自動flush緩衝中查詢請求的間隔。預設值為10000。單位為毫秒。 |
logInterval | 自動列印任務狀態的間隔。預設值為10000。單位為毫秒 |
指定查詢條件
您可以指定表層級的查詢參數,例如查詢的最大版本數,要查詢的列、時間範圍等。
//查詢表的col1列過去60秒的最多10個版本。
//設定資料表名稱。
RowQueryCriteria criteria = new RowQueryCriteria("<TABLE_NAME>");
//設定要返回的列。
criteria.addColumnsToGet("col1");
//設定返回的最大版本數。
criteria.setMaxVersions(10);
criteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 60 * 1000, System.currentTimeMillis()));
reader.setRowQueryCriteria(criteria);
完整樣本
public class TableStoreReaderDemo {
private static final String endpoint = "<ENDPOINT>";
private static final String accessKeyId = System.getenv("OTS_AK_ENV");
private static final String accessKeySecret = System.getenv("OTS_SK_ENV");
private static final String instanceName = "<INSTANCE_NAME>";
private static AsyncClientInterface client;
private static ExecutorService executor;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 步驟一:構造TableStoreReader。
*/
//構造AsyncClient。
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
//構造reader配置類。
TableStoreReaderConfig config = new TableStoreReaderConfig();
{
//以下參數均有預設值,可不配置。
//向reader添加要查詢的資料前,會先檢查表的結構。
config.setCheckTableMeta(true);
//一次請求的最大請求行數,上限為100。
config.setMaxBatchRowsCount(100);
//預設情況下,擷取的columns最大版本數。
config.setDefaultMaxVersions(1);
//發送請求的總並發數。
config.setConcurrency(16);
//記憶體分桶數。
config.setBucketCount(4);
//將快取資料全部發送的時間間隔。
config.setFlushInterval(10000);
//日誌記錄reader狀態的時間間隔。
config.setLogInterval(10000);
}
//構造用於發送請求的executor。
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "reader-" + counter.getAndIncrement());
}
};
executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(1024), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
//構造reader的回呼函數。
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable req, RowReadResult res) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(PrimaryKeyWithTable req, Exception ex) {
failedRows.incrementAndGet();
}
};
TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, callback);
/**
* 步驟二:構造查詢請求。
*/
//向記憶體添加一列要查詢的資料。
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
.build();
reader.addPrimaryKey("<TABLE_NAME1>", pk1);
//向記憶體添加一列要查詢的資料,並擷取查詢結果Future。
PrimaryKey pk2 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
.build();
Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME2>", pk2);
/**
* 步驟三:查詢資料。
*/
//非同步將記憶體中的資料發送出去。
reader.send();
/**
* 步驟四:擷取查詢結果。
*/
//列印成功查詢和失敗查詢的資訊。
for (RowReadResult success : readerResult.get().getSucceedRows()) {
System.out.println(success.getRowResult());
}
for (RowReadResult fail : readerResult.get().getFailedRows()) {
System.out.println(fail.getRowResult());
}
/**
* 步驟五:關閉TableStoreReader。
*/
reader.close();
client.shutdown();
executor.shutdown();
}
}
相關文檔
如果要可視化展示表中資料,您可以通過對接DataV或者Grafana工具實現。更多資訊,請參見資料視覺效果。
如果要下載表中資料到本地,您可以通過DataX、Table Store命令列CLI工具實現。更多資訊,請參見將Table Store資料下載到本地檔案。
如果要計算與分析表中資料,您可以通過Table StoreSQL查詢實現。更多資訊,請參見SQL查詢。
說明您還可以通過MaxCompute、Spark、Hive或者HadoopMR、Function Compute、Flink等計算引擎實現表中資料的計算與分析。具體操作,請參見計算與分析。