如果您希望訊息被投遞後延遲一段時間被消費者消費,您可以使用雲訊息佇列 RabbitMQ 版的延時訊息。雲訊息佇列 RabbitMQ 版原生支援延時訊息,使用方式比開源RabbitMQ更簡單。
什麼是延時訊息
延時訊息是指在指定時間段之後才被消費者消費的訊息。
應用情境
延時訊息適用於以下情境:
對訊息生產和消費有時間視窗要求的情境。例如,在電商交易中逾時未支付關閉訂單的情境,在訂單建立時會發送一條延時訊息。這條訊息將會在30分鐘以後投遞給消費者,消費者收到此訊息後需要判斷對應的訂單是否已完成支付。如支付未完成,則關閉訂單。如已完成支付則忽略。
通過訊息觸發延時任務的情境。例如,在指定時間段之後向使用者發送提醒訊息。
延時時間設定規則
方案對比
雲訊息佇列 RabbitMQ 版無需進行代碼改造,即可實現開源RabbitMQ的所有延時訊息方案,詳情請參見下表:
專案 | 開源RabbitMQ | 雲訊息佇列 RabbitMQ 版 |
死信Exchange+Queue的訊息存活時間 | 支援 | 支援 |
死信Exchange+訊息的訊息存活時間 | 支援 | 支援 |
支援 | 支援 | |
不支援 | 支援 |
開源延時訊息外掛程式方案
為了減少與開源RabbitMQ的差別,雲訊息佇列 RabbitMQ 版也基於原生的延時訊息支援使用開源外掛程式式的方式來使用延時訊息,並免去外掛程式的安裝。具體使用流程如下:
聲明
x-delayed-message
類型的Exchange,並填寫該Exchange的擴充參數x-delayed-type以指定Exchange的路由類型。樣本如下:Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("ExchangeName", "x-delayed-message", true, false, args);
參數說明如下:
參數
說明
x-delayed-type
Exchange的類型,指定路由規則。取值說明如下:
direct
fanout
topic
headers
ExchangeName
Exchange的名稱。
說明請確保聲明的Exchange已存在。具體步驟,請參見Exchange管理。
x-delayed-message
指定Exchange類型,以支援投遞延時訊息。
發送延時訊息。在訊息的Header屬性中增加一個鍵為x-delay,值為毫秒數的索引值對,並且指定發送的目標Exchange為上一步已聲明的Exchange。樣本如下:
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000);//表示訊息延時5000毫秒。 AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("ExchangeName", "", props.build(), messageBodyBytes);
原生延時訊息方案
雲訊息佇列 RabbitMQ 版通過對訊息設定delay來實現延時效果。雲訊息佇列 RabbitMQ 版原生延時訊息的流轉過程如下:
生產者向Exchange發布設定了delay屬性的訊息。
Exchange將訊息路由至Queue。
在設定的delay時間到期後,消費者才能從Queue消費訊息。
原生延時訊息最佳實務
生產者用戶端
雲訊息佇列 RabbitMQ 版原生延時訊息的使用方式非常簡單。您只需要在生產者用戶端發布訊息時,通過delay為訊息設定一個延時時間。
發布延時訊息的Java範例程式碼如下:
Map<String, Object> headers = new HashMap<>(); headers.put("delay", "5000");//表示訊息延時5000毫秒。 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();
更多語言範例程式碼,請參見AMQP Demos。
消費者用戶端
為保證延時訊息時效性,建議您在消費訊息時使用push模式的
basic.consume
方法,而不要使用pull模式的basic.get
方法。因為雲訊息佇列 RabbitMQ 版的訊息是分布式儲存的,如果您使用pull模式的basic.get
方法擷取訊息,並不能保證正好從儲存的節點擷取訊息。
常見問題
為什麼實際的延時時間大於設定的延時時間?
因為用戶端使用了pull模式的basic.get方法消費訊息。雲訊息佇列 RabbitMQ 版的訊息是叢集儲存的,使用pull模式的basic.get方法路由到一台Broker時,可能無法及時拉取儲存在其他Broker上的訊息。