全部产品
Search
文档中心

云消息队列 RocketMQ 版:消费重试

更新时间:Jun 12, 2024

消费者出现异常,云消息队列 RocketMQ 版会根据消费重试策略重新投递该消息进行故障恢复。本文介绍消费重试的应用场景、原理机制、版本兼容性和使用建议。

应用场景

云消息队列 RocketMQ 版的消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题,是一种为业务兜底的策略,不应该被用作业务流程控制。

  • 以下场景建议使用消息重试

    • 业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。

    • 消费失败的原因不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞。

  • 以下场景不建议使用消息重试

    • 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的,因为处理逻辑已经预见了一定会大量出现该判断分支。

    • 消费处理中使用消费失败来做处理速率限流,是不合理的。限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。

应用目的

消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,如何保证整个调用链路的完整性。云消息队列 RocketMQ 版作为金融级的可靠业务消息中间件,在消息投递处理机制的设计上天然支持可靠传输策略,通过完整的确认和重试机制保证每条消息都按照业务的预期被处理。

了解云消息队列 RocketMQ 版的消息确认机制以及消费重试策略可以帮助您分析如下问题:

  • 如何保证业务完整处理消息:了解消费重试策略,可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息出现异常时被忽略,导致业务状态不一致。

  • 系统异常时处理中的消息状态如何恢复:帮助您了解当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,是否会出现状态不一致。

消费重试策略

消费重试策略指消费者在消费某条消息失败后,消息重试的间隔时间和最大重试次数。

消息重试的触发条件

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。

  • 消息处理超时,包括在PushConsumer中排队超时。

消息重试主要行为

  • 重试过程状态机:控制消息在重试流程中的状态和变化逻辑。

  • 重试间隔:上一次消费失败或超时后,距下次消息可被重新消费的间隔时间。

  • 最大重试次数:消息可被重试消费的最大次数。

消息重试策略差异

根据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下:

消费者类型

重试过程状态机

重试间隔

最大重试次数

PushConsumer

  • 已就绪

  • 处理中

  • 待重试

  • 提交

  • 死信

  • 丢弃

消费者分组创建时元数据控制。

  • 无序消息:阶梯间隔

  • 顺序消息:固定间隔时间

通过控制台或OpenAPI设置

修改最大重试次数

SimpleConsumer

  • 已就绪

  • 处理中

  • 提交

  • 死信

  • 丢弃

通过API修改获取消息时的不可见时间。

通过控制台或OpenAPI设置

修改最大重试次数

具体的重试策略,请参见下文PushConsumer消费重试策略SimpleConsumer消费重试策略

PushConsumer消费重试策略

重试状态机

PushConsumer消费消息时,消息的几个主要状态如下:Push消费状态机

  • Ready:已就绪状态。

    消息在云消息队列 RocketMQ 版服务端已就绪,可以被消费者消费。

  • Inflight:处理中状态。

    消息被消费者客户端获取,处于消费中还未返回消费结果的状态。

  • WaitingRetry:待重试状态,PushConsumer独有的状态。

    当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。

  • Commit:提交状态。

    消费成功的状态,消费者返回成功响应即可结束消息的状态机。

  • DLQ:死信状态。

    消费逻辑的最终兜底机制,消息重试失败且超过最大重试次数,若保存死信消息功能开启,该失败消息会被投递至死信Topic。您可以通过消费死信Topic的消息进行业务恢复。具体信息,请参见死信消息

  • Discard:丢弃。

    消息重试失败且超过最大重试次数,若保存死信消息功能未开启,该失败消息会被直接丢弃。

消息间隔时间

举例:某条消息的消费重试流程如上图所示,假设消息处于已就绪状态的时长为5 s,消费耗时为6 s。

每次重试消息状态都会经过已就绪->处理中->待重试的变化,消息的重试间隔指的是上一次消费失败或超时后,距下次消息可被重新消费的间隔时间。实际消息两次消费之间的间隔时间还包括消费耗时和已就绪状态的持续时间。例如:

  • 消息第一次消费时第0 s进入已就绪状态。

  • 受消费者处理速度的影响,到第5 s时才开始拉取消息消费,6 s后消息处理异常客户端返回消费失败。

  • 此时还不能进行消费重试,需要等待重试间隔后才能开始再次消费。

  • 等到第21 s时消息再次变为已就绪状态。

  • 5 s后客户端才再次开始重新消费消息。

因此,实际消息两次消费的间隔时间为:消费耗时+重试间隔+已就绪的持续时间=21 s。

重试间隔时间

  • 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下:

    第几次重试

    重试间隔时间

    第几次重试

    重试间隔时间

    1

    10秒

    9

    7分钟

    2

    30秒

    10

    8分钟

    3

    1分钟

    11

    9分钟

    4

    2分钟

    12

    10分钟

    5

    3分钟

    13

    20分钟

    6

    4分钟

    14

    30分钟

    7

    5分钟

    15

    1小时

    8

    6分钟

    16

    2小时

    说明

    若重试次数超过16次,后面每次重试间隔都为2小时。

  • 顺序消息:重试间隔为固定时间,具体取值,请参见参数限制

最大重试次数

  • 默认值:16次。

  • 最大限制:不超过1000次。

PushConsumer的最大重试次数由消费者分组的元数据控制,修改方式,请参见修改最大重试次数

例如,最大重试次数为3次,则该消息最多可被投递4次,1次为原始消息,3次为重试投递次数。

使用示例

PushConsumer触发消息重试只需要返回消费失败的状态码即可,当出现非预期的异常时,也会被SDK捕获。

SimpleConsumer simpleConsumer = null;
        //消费示例:使用PushConsumer消费普通消息,如果消费失败返回错误,即可触发重试。
        MessageListener messageListener = new MessageListener() {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                System.out.println(messageView);
                //返回消费失败,会自动重试,直至到达最大重试次数。
                return ConsumeResult.FAILURE;
            }
        };
            

查看消费重试日志

PushConsumer顺序消费的重试是在消费者客户端进行,服务端无法获取消费重试的详细日志,若消息轨迹中顺序消息的投递结果为失败时,您需要在消费者客户端日志中查看消息重试的最大次数、消费者客户端等信息。

消费者客户端日志查看路径,请参见日志配置

您可以通过搜索以下关键字在客户端日志中快速定位消费失败的相关内容:

Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times

SimpleConsumer消费重试策略

重试状态机

SimpleConsumer消费消息时,消息的几个主要状态如下:PushConsumer状态机

  • Ready:已就绪状态。

    消息在云消息队列 RocketMQ 版服务端已就绪,可以被消费者消费。

  • Inflight:处理中状态。

    消息被消费者客户端获取,处于消费中还未返回消费结果的状态。

  • Commit:提交状态。

    消费成功的状态,消费者返回成功响应即可结束消息的状态机。

  • DLQ:死信状态。

    消费逻辑的最终兜底机制,消息重试失败且超过最大重试次数,若保存死信消息功能开启,该失败消息会被投递至死信Topic。您可以通过消费死信Topic的消息进行业务恢复。具体信息,请参见死信消息

  • Discard:丢弃。

    消息重试失败且超过最大重试次数,若保存死信消息功能未开启,该失败消息会被直接丢弃。

和PushConsumer消费重试策略不同的是,SimpleConsumer消费者的重试间隔是预分配的,每次获取消息,消费者会在调用API时设置一个不可见时间参数InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。

simpleconsumer重试

由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,您可以通过API接口修改不可见时间。

例如,您预设消息处理耗时最多20 ms,但实际业务中20 ms内消息处理不完,您可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。

修改消息不可见时间需要满足以下条件:

  • 消息处理未超时

  • 消息处理未提交消费状态

如下图所示,消息不可见时间修改后立即生效,即从调用API时刻开始,重新计算消息不可见时间。

修改不可见时间

消息重试间隔

消息重试间隔=不可见时间-消息实际处理时长

SimpleConsumer的消费重试间隔通过消息的不可见时间控制。例如,消息不可见时间为30 ms,实际消息处理用了10 ms就返回失败响应,则距下次消息重试还需要20 ms,此时的消息重试间隔即为20 ms;若直到30 ms消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为0 ms。

最大重试次数

  • 默认值:16次。

  • 最大限制:不超过1000次。

SimpleConsumer的最大重试次数由消费者分组创建的元数据控制,修改方式,请参见修改最大重试次数

例如,最大重试次数为3次,则该消息最多可被投递4次,1次为原始消息,3次为重试投递次数。

使用示例

SimpleConsumer触发消息重试只需要等待即可。

 //消费示例:使用SimpleConsumer消费普通消息,如果希望重试,只需要静默等待超时即可,服务端会自动重试。
        List<MessageView> messageViewList = null;
        try {
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //如果处理失败,希望服务端重试,只需要忽略即可,等待消息再次可见后即可重试获取。
            });
        } catch (ClientException e) {
            //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
            e.printStackTrace();
        }

修改最大重试次数

您可以通过以下方式修改PushConsumer和SimpleConsumer的消息消费的最大重试次数。

  • 通过OpenAPI方式修改:UpdateConsumerGroup - 更新消费者分组

  • 通过控制台方式修改:

    操作入口如下:

    1. 实例列表页面中单击目标实例名称。

    2. 在左侧导航栏单击Group 管理,然后在Group 管理页面单击创建 Group

    消息重试策略

使用建议

合理重试,避免因限流等诉求触发消费重试

上文应用场景中提到,消息重试适用业务处理失败且当前消费为小概率事件的场景,不适合在连续性失败的场景下使用,例如消费限流场景。

  • 错误示例:

    如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。

  • 正确示例:

    如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。

消息重试常见问题

消息消费超时时间如何设置?

消费超时时间由消费者客户端设置,具体参数设置如下:

  • SimpleConsumer:超时时间最大可设置12小时;最小设置为10秒。

    代码示例如下:

    private long minInvisiableTimeMillsForRecv = Duration.ofSeconds(10).toMillis();
    private long maxInvisiableTimeMills = Duration.ofHours(12).toMillis();
  • PushConsumer:默认1分钟,不支持修改。