應用情境
說明 定時訊息和延時訊息本質相同,都是服務端根據訊息設定的定時時間在某一固定時刻將訊息投遞給消費者消費。因此,下文統一用定時訊息描述。
在分布式定時調度觸發、任務逾時處理等情境,需要實現精準、可靠的定時事件觸發。使用雲訊息佇列 RocketMQ 版的定時訊息可以簡化定時調度任務的開發邏輯,實現高效能、可擴充、高可靠的定時觸發能力。
典型情境一:分布式定時調度
在分布式定時調度情境下,需要實現各類精度的定時任務,例如每天5點執行檔案清理,每隔2分鐘觸發一次訊息推送等需求。傳統基於資料庫的定時調度方案在分布式情境下,效能不高,實現複雜。基於雲訊息佇列 RocketMQ 版的定時訊息可以封裝出多種類型的定時觸發器。
典型情境二:任務逾時處理
以電商交易情境為例,訂單下單後暫未支付,此時不可以直接關閉訂單,而是需要等待一段時間後才能關閉訂單。使用雲訊息佇列 RocketMQ 版定時訊息可以實現逾時任務的檢查觸發。
功能原理
什麼是定時訊息
定時訊息是雲訊息佇列 RocketMQ 版提供的一種進階訊息類型,訊息被發送至服務端後,在指定時間後才能被消費者消費。通過設定一定的定時時間可以實現分布式情境的延時調度觸發效果。
定時時間設定原則
雲訊息佇列 RocketMQ 版定時訊息設定的定時時間是一個預期觸發的系統時間戳,延時時間也需要轉換成當前系統時間後的某一個時間戳記,而不是一段延時時間長度。
定時時間的格式為毫秒級的Unix時間戳記,您需要將要設定的時刻轉換成時間戳記形式。
定時時間必須設定在定時時間長度範圍內,超過範圍則定時不生效,服務端會立即投遞訊息。
定時訊息最大定時時間長度:
定時時間必須設定為目前時間之後,若設定到目前時間之前,則定時不生效,服務端會立即投遞訊息。
樣本如下:
定時訊息:例如,當前系統時間為2022-06-09 17:30:00,您希望訊息在下午19:20:00定時投遞,則定時時間為2022-06-09 19:20:00,轉換成時間戳記格式為1654773600000。
延時訊息:例如,當前系統時間為2022-06-09 17:30:00,您希望延時1個小時後投遞訊息,則您需要根據目前時間和延時時間長度換算成定時時刻,即訊息投遞時間為2022-06-09 18:30:00,轉換為時間戳記格式為1654770600000。
定時訊息生命週期
初始化
訊息被生產者構建並完成初始化,待發送到服務端的狀態。
定時中
訊息被發送到服務端,和普通訊息不同的是,服務端不會直接構建訊息索引,而是會將定時訊息單獨儲存在定時儲存系統中,等待定時時刻到達。
待消費
定時時刻到達後,服務端將訊息重新寫入普通儲存引擎,對下遊消費者可見,等待消費者消費的狀態。
消費中
訊息被消費者擷取,並按照消費者本地的商務邏輯進行處理的過程。
此時服務端會等待消費者完成消費並提交消費結果,如果一定時間後沒有收到消費者的響應,雲訊息佇列 RocketMQ 版會對訊息進行重試處理。具體資訊,請參見消費重試。
消費提交
消費者完成消費處理,並向服務端提交消費結果,服務端標記當前訊息已經被處理(包括消費成功和失敗)。
雲訊息佇列 RocketMQ 版預設支援保留所有訊息,此時訊息資料並不會立即被刪除,只是邏輯標記已消費。訊息在儲存時間到期或儲存空間不足被刪除前,消費者仍然可以回溯訊息重新消費。
訊息刪除
雲訊息佇列 RocketMQ 版按照訊息儲存機制滾動清理最早的訊息資料,將訊息從物理檔案中刪除。更多資訊,請參見訊息儲存和清理機制。
使用限制
訊息類型一致性
定時訊息僅支援在MessageType為Delay的主題內使用,即定時訊息只能發送至類型為定時訊息的主題中,發送的訊息的類型必須和主題的類型一致。
定時精度約束
雲訊息佇列 RocketMQ 版定時訊息的定時時間長度參數精確到毫秒級,但是預設精度為1000ms,即定時訊息為秒級精度。
雲訊息佇列 RocketMQ 版定時訊息的狀態支援持久化儲存,系統由於故障重啟後,仍支援按照原來設定的定時時間觸發訊息投遞。若儲存系統異常重啟,可能會導致定時訊息投遞出現一定延遲。
使用樣本
和普通訊息相比,定時消費發送時,必須設定定時觸發的目標時間戳記。
以Java語言為例,使用定時訊息樣本參考如下:
完整的訊息收發範例程式碼請參見RocketMQ 5.x系列SDK(推薦)。
範例程式碼
定時/延時訊息發送
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//訊息發送的目標Topic名稱,需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//定時/延時訊息發送
//以下樣本表示:延遲時間為10分鐘之後的Unix時間戳記。
long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = provider.newMessageBuilder()
.setTopic("topic")
//設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
.setKeys("messageKey")
//設定訊息Tag,用於消費端根據指定Tag過濾訊息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//訊息體
.setBody("messageBody".getBytes())
.build();
try {
//發送訊息,需要關注發送結果,並捕獲失敗等異常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
PushConsumer消費
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
//訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設定消費者分組。
.setConsumerGroup(consumerGroup)
//設定預綁定的訂閱關係。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//設定消費監聽器。
.setMessageListener(messageView -> {
//處理訊息並返回消費結果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可關閉該進程。
//pushConsumer.close();
}
}
SimpleConsumer消費
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
//訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化SimpleConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設定消費者分組。
.setConsumerGroup(consumerGroup)
//設定長輪詢逾時時間。
.setAwaitDuration(awaitDuration)
//設定預綁定的訂閱關係。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
//設定本次拉取的最大訊息條數。
int maxMessageNum = 16;
//設定訊息的不可見時間。
Duration invisibleDuration = Duration.ofSeconds(10);
//SimpleConsumer需要用戶端一直主動迴圈擷取訊息,並進行消費處理。
//如果需要提高消費即時性,建議多線程並發拉取。
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
//消費處理完成後,需要主動調用ACK向服務端提交消費結果。
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// 如果不需要再使用SimpleConsumer,可關閉該進程。
// consumer.close();
}
}
使用建議
避免大量相同定時時刻的訊息
定時訊息的實現邏輯需要先經過定時儲存等待觸發,定時時間到達後才會被投遞給消費者。因此,如果將大量定時訊息的定時時間設定為同一時刻,則到達該時刻後會有大量訊息同時需要被處理,會造成系統壓力過大,導致訊息分發延遲,影響定時精度。
定時/延時訊息常見問題
定時訊息在定時時間到達前可以撤回或修改定時時間嗎?
不支援。
定時時間設定一個已過去的時間會怎麼樣?
定時不生效,訊息會被立即投遞。
定時訊息已發送成功為什麼控制台查詢不到?
定時訊息到達定時時間後才對消費者可見,並在控制台查詢到訊息軌跡。