本文介紹使用訊息模型時如何進行Queue管理。
擷取Queue執行個體
Queue是單個訊息佇列的抽象概念,對應TimelineStore下單個Identifier的所有訊息。擷取Queue執行個體時通過TimelineStore的介面建立。
TimelineIdentifier identifier = new TimelineIdentifier.Builder()
.addField("timeline_id", "group_1")
.build();
//單TimelineStore下單identifier對應的訊息佇列(Queue)。
TimelineQueue timelineQueue = timelineStore.createTimelineQueue(identifier);
Queue是單存放庫下單Identifier對應的訊息佇列的管理執行個體,主要有同步寫、非同步寫、批量寫、刪、同步改、非同步改、單行讀、範圍讀等介面。
Store
同步儲存訊息,兩個介面分別支援SequenceId的兩種實現方式自增列和手動設定,相關配置在TimelineSchema中。
timelineQueue.store(message); //自增列實現的SequenceId。
timelineQueue.store(sequenceId, message); //手動設定SequenceId。
StoreAsync
非同步儲存訊息,您可以自訂回調,對成功或者失敗做自訂處理。介面返回Future<TimelineEntry>。
TimelineCallback callback = new TimelineCallback() {
@Override
public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
// do something when succeed.
}
@Override
public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
// do something when failed.
}
};
timelineQueue.storeAsync(message, callback); //自增列實現的SequenceId。
timelineQueue.storeAsync(sequenceId, message, callback); //手動設定SequenceId。
BatchStore
批量儲存訊息,支援無回調和有回調兩種方式。您可以自訂回調,對成功或者失敗做自訂處理。
timelineQueue.batchStore(message); //自增列實現的SequenceId。
timelineQueue.batchStore(sequenceId, message); //手動設定SequenceId。
timelineQueue.batchStore(message, callback); //自增列實現的SequenceId。
timelineQueue.batchStore(sequenceId, message, callback); //手動設定SequenceId。
Get
通過SequenceId讀取單行訊息。當訊息不存在時不拋錯,返回null。
timelineQueue.get(sequenceId);
GetLatestTimelineEntry
讀取最新一條訊息。當訊息不存在時不拋錯,返回null。
timelineQueue.getLatestTimelineEntry();
GetLatestSequenceId
擷取最新一條訊息的SequenceId。當訊息不存在時不拋錯,返回0。
timelineQueue.getLatestSequenceId();
Update
通過SequenceId同步更新訊息內容。
TimelineMessage message = new TimelineMessage().setField("text", "Timeline is fine.");
//update message with new field
message.setField("text", "new value");
timelineQueue.update(sequenceId, message);
UpdateAsync
通過SequenceId非同步更新訊息。您可以自訂回調,對成功或者失敗做自訂處理。介面返回Future<TimelineEntry>。
TimelineMessage oldMessage = new TimelineMessage().setField("text", "Timeline is fine.");
TimelineCallback callback = new TimelineCallback() {
@Override
public void onCompleted(TimelineIdentifier i, TimelineMessage m, TimelineEntry t) {
// do something when succeed.
}
@Override
public void onFailed(TimelineIdentifier i, TimelineMessage m, Exception e) {
// do something when failed.
}
};
TimelineMessage newMessage = oldMessage;
newMessage.setField("text", "new value");
timelineQueue.updateAsync(sequenceId, newMessage, callback);
Delete
根據SequenceId刪除單行訊息。
timelineQueue.delete(sequenceId);
Scan
根據Scan參數正序(或逆序)範圍讀取單個Queue下的訊息,返回Iterator<TimelineEntry>,通過迭代器遍曆。
ScanParameter scanParameter = new ScanParameter().scanBackward(Long.MAX_VALUE, 0);
timelineQueue.scan(scanParameter);