全部產品
Search
文件中心

Tablestore:Queue管理

更新時間:Jul 20, 2024

本文介紹使用訊息模型時如何進行Queue管理。

擷取Queue執行個體

Queue是單個訊息佇列的抽象概念,對應TimelineStore下單個Identifier的所有訊息。擷取Queue執行個體時通過TimelineStore的介面建立。

TimelineIdentifier identifier = new TimelineIdentifier.Builder()
        .addField("timeline_id", "group_1")
        .build();

//單TimelineStore下單identifier對應的訊息佇列(Queue)。
TimelineQueue timelineQueue = timelineStore.createTimelineQueue(identifier);

Queue是單存放庫下單Identifier對應的訊息佇列的管理執行個體,主要有同步寫、非同步寫、批量寫、刪、同步改、非同步改、單行讀、範圍讀等介面。

Store

同步儲存訊息,兩個介面分別支援SequenceId的兩種實現方式自增列和手動設定,相關配置在TimelineSchema中。

timelineQueue.store(message); //自增列實現的SequenceId。
timelineQueue.store(sequenceId, message); //手動設定SequenceId。

StoreAsync

非同步儲存訊息,您可以自訂回調,對成功或者失敗做自訂處理。介面返回Future<TimelineEntry>。

TimelineCallback callback = new TimelineCallback() {
    @Override
    public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
        // do something when succeed.
    }

    @Override
    public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
        // do something when failed.
    }
};

timelineQueue.storeAsync(message, callback); //自增列實現的SequenceId。
timelineQueue.storeAsync(sequenceId, message, callback); //手動設定SequenceId。

BatchStore

批量儲存訊息,支援無回調和有回調兩種方式。您可以自訂回調,對成功或者失敗做自訂處理。

timelineQueue.batchStore(message); //自增列實現的SequenceId。
timelineQueue.batchStore(sequenceId, message); //手動設定SequenceId。

timelineQueue.batchStore(message, callback); //自增列實現的SequenceId。
timelineQueue.batchStore(sequenceId, message, callback); //手動設定SequenceId。

Get

通過SequenceId讀取單行訊息。當訊息不存在時不拋錯,返回null。

timelineQueue.get(sequenceId);

GetLatestTimelineEntry

讀取最新一條訊息。當訊息不存在時不拋錯,返回null。

timelineQueue.getLatestTimelineEntry();

GetLatestSequenceId

擷取最新一條訊息的SequenceId。當訊息不存在時不拋錯,返回0。

timelineQueue.getLatestSequenceId();

Update

通過SequenceId同步更新訊息內容。

TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine.");

//update message with new field
message.setField("text", "new value");
timelineQueue.update(sequenceId, message);

UpdateAsync

通過SequenceId非同步更新訊息。您可以自訂回調,對成功或者失敗做自訂處理。介面返回Future<TimelineEntry>。

TimelineMessage oldMessage = new TimelineMessage().setField("text", "Timeline is fine.");
TimelineCallback callback = new TimelineCallback() {
    @Override
    public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
        // do something when succeed.
    }

    @Override
    public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
        // do something when failed.
    }
};

TimelineMessage newMessage = oldMessage;
newMessage.setField("text", "new value");
timelineQueue.updateAsync(sequenceId, newMessage, callback);

Delete

根據SequenceId刪除單行訊息。

timelineQueue.delete(sequenceId);

Scan

根據Scan參數正序(或逆序)範圍讀取單個Queue下的訊息,返回Iterator<TimelineEntry>,通過迭代器遍曆。

ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0);

timelineQueue.scan(scanParameter);