云消息队列 RocketMQ 版支持消息重试功能,即Consumer消费某条消息失败或消费超时,云消息队列 RocketMQ 版会根据消息重试机制重新投递消息。本文介绍云消息队列 RocketMQ 版分别在HTTP协议和TCP协议下的消息重试策略。
注意事项
一条消息无论重试多少次,这些重试消息的Message ID都不会改变。
消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
消息重试策略概述
云消息队列 RocketMQ 版消息收发过程中,若Consumer消费某条消息失败或消费超时,则云消息队列 RocketMQ 版会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。您可以通过消费死信队列中的死信消息来恢复业务异常。
消息重试主要功能行为包括:
重试间隔:上一次消费失败或超时后,距下次消息可被重新消费的间隔时间。
最大重试次数:消息消费失败后,可被云消息队列 RocketMQ 版重复投递的最大次数。
对于TCP协议和HTTP协议,消息重试的重试间隔和最大重试次数有所不同,具体信息请参见下文的TCP协议重试策略和HTTP协议重试策略。
TCP协议重试策略
重试状态机
Consumer消费消息时,消息的主要状态变化如下:
Ready:已就绪状态。
消息在云消息队列 RocketMQ 版服务端已就绪,可以被消费者消费。
Inflight:处理中状态。
消息被消费者客户端获取,处于消费中还未返回消费结果的状态。
WaitingRetry:待重试状态。
当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
Commit:提交状态。
消费成功的状态,消费者返回成功响应即可结束消息的状态机。
DLQ:死信状态。
消费逻辑的最终兜底机制,消息重试失败且超过最大重试次数,若保存死信消息功能开启,该失败消息会被投递至死信Topic。您可以通过消费死信Topic的消息进行业务恢复。具体信息,请参见死信队列。
举例:某条消息的消费重试流程如上图所示,假设消息处于已就绪状态的时长为5 s,消费耗时为6 s。
每次重试消息状态都会经过已就绪->处理中->待重试的变化,消息的重试间隔指的是上一次消费失败或超时后,距下次消息可被重新消费的间隔时间。实际消息两次消费之间的间隔时间还包括消费耗时和已就绪状态的持续时间。例如:
消息第一次消费时第0 s进入已就绪状态。
受消费者处理速度的影响,到第5 s时才开始拉取消息消费,6 s后消息处理异常客户端返回消费失败。
此时还不能进行消费重试,需要等待重试间隔后才能开始再次消费。
等到第21 s时消息再次变为已就绪状态。
5 s后客户端才再次开始重新消费消息。
因此,实际消息两次消费的间隔时间为:消费耗时+重试间隔+已就绪的持续时间=21 s。
重试间隔和重试次数
协议 | 消息类型 | 重试间隔 | 最大重试次数 |
TCP协议 | 顺序消息 | 间隔时间可通过自定义参数suspendTimeMillis取值进行配置。参数取值范围:10~30000,单位:毫秒,默认值:1000毫秒,即1秒。 | 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX。 |
无序消息 | 间隔时间根据重试次数阶梯变化,取值范围:10秒~2小时。不支持自定义配置。
| 最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值。 |
表 1. TCP协议无序消息重试间隔
第几次重试 | 重试间隔 | 第几次重试 | 重试间隔 |
1 | 10秒 | 9 | 7分钟 |
2 | 30秒 | 10 | 8分钟 |
3 | 1分钟 | 11 | 9分钟 |
4 | 2分钟 | 12 | 10分钟 |
5 | 3分钟 | 13 | 20分钟 |
6 | 4分钟 | 14 | 30分钟 |
7 | 5分钟 | 15 | 1小时 |
8 | 6分钟 | 16 | 2小时 |
配置方式
以下配置方式仅适用于TCP协议,HTTP协议不涉及。
消息投递失败后需要重试
集群消费模式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
方式1:返回Action.ReconsumeLater(推荐)
方式2:返回Null
方式3:抛出异常
示例代码
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //消息处理逻辑抛出异常,消息将重试。 doConsumeMessage(message); //方式1:返回Action.ReconsumeLater,消息将重试。 return Action.ReconsumeLater; //方式2:返回null,消息将重试。 return null; //方式3:直接抛出异常,消息将重试。 throw new RuntimeException("Consumer Message exception"); } }
消费投递失败后无需重试
集群消费模式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。
示例代码
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕获消费逻辑中的所有异常,并返回Action.CommitMessage; return Action.CommitMessage; } //消息处理正常,直接返回Action.CommitMessage; return Action.CommitMessage; } }
自定义消息最大重试次数和重试间隔
说明自定义云消息队列 RocketMQ 版的客户端日志配置,请升级TCP Java SDK到1.2.2或以上版本。更多信息,请参见版本说明。
云消息队列 RocketMQ 版允许Consumer实例启动的时候设置最大重试次数和重试间隔,无序消息重试间隔时间不支持自定义,以TCP协议无序消息重试间隔为准。
配置方式如下:
Properties properties = new Properties(); //配置对应Group ID的最大消息重试次数为20次,最大重试次数为字符串类型。 properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); //配置对应Group ID的消息重试间隔时间为3000毫秒,重试间隔时间为字符串类型。 properties.put(PropertyKeyConst.SuspendTimeMillis,"3000"); Consumer consumer = ONSFactory.createConsumer(properties);
重要配置采用覆盖的方式生效,即最后启动的Consumer实例会覆盖之前启动的实例的配置。因此,请确保同一Group ID下的所有Consumer实例设置的最大重试次数和重试间隔相同,否则各实例间的配置将会互相覆盖。
获取消息重试次数
Consumer收到消息后,可按照以下方式获取消息的重试次数,消息重试间隔时间一般不需要获取。
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //获取消息的重试次数。 System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }
HTTP协议重试策略
协议 | 消息类型 | 重试间隔 | 最大重试次数 | 配置方式 |
HTTP协议 | 顺序消息 | 1分钟 | 288次 | 系统预设,不支持修改。 |
无序消息 | 5分钟 | 288次 | 系统预设,不支持修改。 |