雲訊息佇列 RocketMQ 版支援訊息重試功能,即Consumer消費某條訊息失敗或消費逾時,雲訊息佇列 RocketMQ 版會根據訊息重試機制重新投遞訊息。本文介紹雲訊息佇列 RocketMQ 版分別在HTTP協議和TCP協議下的訊息重試策略。
注意事項
一條訊息無論重試多少次,這些重試訊息的Message ID都不會改變。
訊息重試只針對叢集消費模式生效;廣播消費模式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息。
訊息重試策略概述
雲訊息佇列 RocketMQ 版訊息收發過程中,若Consumer消費某條訊息失敗或消費逾時,則雲訊息佇列 RocketMQ 版會在稍候再試時間後,將訊息重新投遞給Consumer消費,若達到最大重試次數後訊息還沒有成功被消費,則訊息將被投遞至無效信件佇列。您可以通過消費無效信件佇列中的死信訊息來恢複業務異常。
訊息重試主要功能行為包括:
稍候再試:上一次消費失敗或逾時後,距下次訊息可被重新消費的間隔時間。
最大重試次數:訊息消費失敗後,可被雲訊息佇列 RocketMQ 版重複投遞的最大次數。
對於TCP協議和HTTP協議,訊息重試的稍候再試和最大重試次數有所不同,具體資訊請參見下文的TCP協議重試策略和HTTP協議重試策略。
TCP協議重試策略
重試狀態機器
Consumer消費訊息時,訊息的主要狀態變化如下:
Ready:已就緒狀態。
訊息在雲訊息佇列 RocketMQ 版服務端已就緒,可以被消費者消費。
Inflight:處理中狀態。
訊息被消費者用戶端擷取,處於消費中還未返回消費結果的狀態。
WaitingRetry:待重試狀態。
當消費者訊息處理失敗或消費逾時,會觸發消費重試邏輯判斷。如果當前重試次數未達到最大次數,則該訊息變為待重試狀態,經過稍候再試後,訊息將重新變為已就緒狀態可被重新消費。多次重試之間,可通過稍候再試進行延長,防止無效高頻的失敗。
Commit:提交狀態。
消費成功的狀態,消費者返回成功響應即可結束訊息的狀態機器。
DLQ:死信狀態。
消費邏輯的最終兜底機制,訊息重試失敗且超過最大重試次數,若儲存死信訊息功能開啟,該失敗訊息會被投遞至死信Topic。您可以通過消費死信Topic的訊息進行業務恢複。具體資訊,請參見無效信件佇列。
舉例:某條訊息的消費重試流程如上圖所示,假設訊息處於已就緒狀態的時間長度為5 s,消費耗時為6 s。
每次重試訊息狀態都會經過已就緒->處理中->待重試的變化,訊息的稍候再試指的是上一次消費失敗或逾時後,距下次訊息可被重新消費的間隔時間。實際訊息兩次消費之間的間隔時間還包括消費耗時和已就緒狀態的期間。例如:
訊息第一次消費時第0 s進入已就緒狀態。
受消費者處理速度的影響,到第5 s時才開始拉取訊息消費,6 s後訊息處理異常用戶端返回消費失敗。
此時還不能進行消費重試,需要等待稍候再試後才能開始再次消費。
等到第21 s時訊息再次變為已就緒狀態。
5 s後用戶端才再次開始重新消費訊息。
因此,實際訊息兩次消費的間隔時間為:消費耗時+稍候再試+已就緒的期間=21 s。
稍候再試和重試次數
協議 | 訊息類型 | 稍候再試 | 最大重試次數 |
TCP協議 | 順序訊息 | 間隔時間可通過自訂參數suspendTimeMillis取值進行配置。參數取值範圍:10~30000,單位:毫秒,預設值:1000毫秒,即1秒。 | 最大重試次數可通過自訂參數MaxReconsumeTimes取值進行配置。該參數取值無最大限制。若未設定參數值,預設最大重試次數為Integer.MAX。 |
無序訊息 | 間隔時間根據重試次數階梯變化,取值範圍:10秒~2小時。不支援自訂配置。
| 最大重試次數可通過自訂參數MaxReconsumeTimes取值進行配置。預設值為16次,該參數取值無最大限制,建議使用預設值。 |
表 1. TCP協議無序訊息稍候再試
第幾次重試 | 稍候再試 | 第幾次重試 | 稍候再試 |
1 | 10秒 | 9 | 7分鐘 |
2 | 30秒 | 10 | 8分鐘 |
3 | 1分鐘 | 11 | 9分鐘 |
4 | 2分鐘 | 12 | 10分鐘 |
5 | 3分鐘 | 13 | 20分鐘 |
6 | 4分鐘 | 14 | 30分鐘 |
7 | 5分鐘 | 15 | 1小時 |
8 | 6分鐘 | 16 | 2小時 |
配置方式
以下配置方式僅適用於TCP協議,HTTP協議不涉及。
訊息投遞失敗後需要重試
叢集消費模式下,訊息消費失敗後期望訊息重試,需要在訊息監聽器介面的實現中明確進行配置(三種方式任選一種):
方式1:返回Action.ReconsumeLater(推薦)
方式2:返回Null
方式3:拋出異常
範例程式碼
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //訊息處理邏輯拋出異常,訊息將重試。 doConsumeMessage(message); //方式1:返回Action.ReconsumeLater,訊息將重試。 return Action.ReconsumeLater; //方式2:返回null,訊息將重試。 return null; //方式3:直接拋出異常,訊息將重試。 throw new RuntimeException("Consumer Message exception"); } }
消費投遞失敗後無需重試
叢集消費模式下,訊息失敗後期望訊息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回Action.CommitMessage,此後這條訊息將不會再重試。
範例程式碼
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕獲消費邏輯中的所有異常,並返回Action.CommitMessage; return Action.CommitMessage; } //訊息處理正常,直接返回Action.CommitMessage; return Action.CommitMessage; } }
自訂訊息最大重試次數和稍候再試
說明自訂雲訊息佇列 RocketMQ 版的用戶端日誌配置,請升級TCP Java SDK到1.2.2或以上版本。更多資訊,請參見版本說明。
雲訊息佇列 RocketMQ 版允許Consumer執行個體啟動的時候設定最大重試次數和稍候再試,無序訊息稍候再試時間不支援自訂,以TCP協議無序訊息稍候再試為準。
配置方式如下:
Properties properties = new Properties(); //配置對應Group ID的最大訊息重試次數為20次,最大重試次數為字串類型。 properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); //配置對應Group ID的訊息稍候再試時間為3000毫秒,稍候再試時間為字串類型。 properties.put(PropertyKeyConst.SuspendTimeMillis,"3000"); Consumer consumer = ONSFactory.createConsumer(properties);
重要配置採用覆蓋的方式生效,即最後啟動的Consumer執行個體會覆蓋之前啟動的執行個體的配置。因此,請確保同一Group ID下的所有Consumer執行個體設定的最大重試次數和稍候再試相同,否則各執行個體間的配置將會互相覆蓋。
擷取訊息重試次數
Consumer收到訊息後,可按照以下方式擷取訊息的重試次數,訊息稍候再試時間一般不需要擷取。
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //擷取訊息的重試次數。 System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }
HTTP協議重試策略
協議 | 訊息類型 | 稍候再試 | 最大重試次數 | 配置方式 |
HTTP協議 | 順序訊息 | 1分鐘 | 288次 | 系統預設,不支援修改。 |
無序訊息 | 5分鐘 | 288次 | 系統預設,不支援修改。 |