全部產品
Search
文件中心

Tablestore:增量資料操作

更新時間:Jul 13, 2024

Table Store提供了ListStream和DescribeStream介面用於查看錶的Stream資訊,以及GetShardIterator和GetStreamRecord介面用於擷取Stream下Shard的起始迭代值和更新資訊。

列出所有的Stream(ListStream)

調用ListStream介面列出當前執行個體和表下的所有Stream資訊。

以下樣本用於列出某個表的所有Stream資訊。

private static void listStream(SyncClient client, String tableName) {
    ListStreamRequest listStreamRequest = new ListStreamRequest(tableName);
    ListStreamResponse result = client.listStream(listStreamRequest);
    System.out.println(result.getStreams());
}

查詢表Stream描述資訊(DescribeStream)

調用DescribeStream介面查詢Stream的建立時間(creationTime)、到期時間(expirationTime)、當前的狀態(status) 、包含shard的列表(shards)和下一個起始shard的id(如果還有尚未返回的shard)。

您可以擷取當前Stream的所有Shard資訊或者根據指定條件擷取滿足指定條件的Shard資訊。

擷取Stream的所有Shard資訊

以下樣本用於擷取當前Stream的所有Shard資訊。

private static void describeStream(SyncClient client, String streamId) {
    DescribeStreamRequest desRequest = new DescribeStreamRequest(streamId);
    DescribeStreamResponse desStream = client.describeStream(desRequest);
    // 列印Shard資訊。
    System.out.println("Shard Info: " + desStream.getShards());
    // 列印建立時間,單位為微秒。
    System.out.println("Creation Time: " + desStream.getCreationTime());
    // 列印到期時間,單位為小時。
    System.out.println("Expiration Time: " + desStream.getExpirationTime());
}

擷取Stream下滿足指定條件的Shard資訊

以下樣本用於根據開始shardID(InclusiveStartShardId)和每次返回的最大Shard數目擷取指定Shard資訊。

private static void describeStream(SyncClient client, String streamId, String shardId) {
    DescribeStreamRequest dsRequest = new DescribeStreamRequest(streamId);
    // Shard ID必須實際存在。您可以通過擷取Stream下的所有Shard資訊來擷取所需起始Shard ID。
    dsRequest.setInclusiveStartShardId(shardId);
    dsRequest.setShardLimit(10);
    DescribeStreamResponse dscStream = client.describeStream(dsRequest);
    // 列印Shard資訊。
    System.out.println(dscStream.getShards());
}			

擷取Shard的讀取迭代值(GetShardIterator)

調用GetShardIterator介面用於擷取Shard的讀取起始迭代值。

以下樣本用於擷取Shard的讀取起始迭代值。

private static void getShardIterator(SyncClient client, String streamId, String shardId) {
    GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(streamId, shardId);
    GetShardIteratorResponse shardIterator = client.getShardIterator(getShardIteratorRequest);
    System.out.println(shardIterator.getShardIterator());
}

擷取Shard的更新記錄(GetStreamRecord)

調用GetStreamRecord介面用於擷取Shard的每條更新記錄。

以下樣本用於擷取Shard的最初100條更新資訊。

private static void getShardIterator(SyncClient client, String shardIterator) {
    GetStreamRecordRequest streamRecordRequest = new GetStreamRecordRequest(shardIterator);
    streamRecordRequest.setLimit(100);
    GetStreamRecordResponse streamRecordResponse = client.getStreamRecord(streamRecordRequest);
    List<StreamRecord> records = streamRecordResponse.getRecords();
    for(int k=0;k<records.size();k++){
        System.out.println("record info:" +  records.get(k).toString());
    }
        System.out.println("next iterator:" + streamRecordResponse.getNextShardIterator());
}