本文主要介绍云消息队列 RocketMQ 版TCP协议的Java客户端使用过程中,经常会出现的消息堆积和消息延迟的问题。通过了解云消息队列 RocketMQ 版客户端的消费原理和消息堆积的主要原因,帮助您可以在业务部署前更好的规划资源和配置,或在运维过程中及时调整业务逻辑,避免因消息堆积和延迟影响业务运行。
背景信息
- 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
- 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消息延迟也无法接受。
客户端消费原理
- 阶段一:获取消息,SDK客户端通过长轮询批量拉取的方式从云消息队列 RocketMQ 版服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。
SDK获取消息的方式为批量拉取,常见内网环境下都会有很高的吞吐量,例如:1个单线程单分区的低规格机器(4C8GB)可以达到几万TPS,如果是多个分区可以达到几十万TPS。所以这一阶段一般不会成为消息堆积的瓶颈。
- 阶段二:提交消费线程,SDK客户端将本地缓存的消息提交到消费线程中,使用业务消费逻辑进行处理。
此时客户端的消费能力就完全依赖于业务逻辑的复杂度(消费耗时)和消费逻辑并发度了。如果业务处理逻辑复杂,处理单条消息耗时都较长,则整体的消息吞吐量肯定不会高,此时就会导致客户端本地缓冲队列达到上限,停止从服务端拉取消息。
消费耗时
- 读写外部数据库,例如MySQL数据库读写。
- 读写外部缓存等系统,例如Redis读写。
- 下游系统调用,例如Dubbo调用或者下游HTTP接口调用。
例如:某业务消费逻辑中需要写一条数据到数据库,单次消费耗时为1 ms,平时消息量小未出现异常。业务侧进行大促活动时,写数据库TPS爆发式增长,并很快达到数据库容量限制,导致消费单条消息的耗时增加到100 ms,业务侧可以明显感受到消费速度大幅下跌。此时仅通过调整云消息队列 RocketMQ 版SDK的消费并发度并不能解决问题,需要对数据库容量进行升配才能从根本上提高客户端消费能力。
消费并发度
消息类型 | 消费并发度 |
---|---|
普通消息 | 单节点线程数*节点数量 |
定时和延时消息 | |
事务消息 | |
顺序消息 | Min(单节点线程数*节点数量,分区数) |
- 单机vCPU核数为C。
- 线程切换耗时忽略不计,I/O操作不消耗CPU。
- 线程有足够消息等待处理,且内存充足。
- 逻辑中CPU计算耗时为T1,外部I/O操作为T2。
如何避免消息堆积和延迟
为了避免在业务使用时出现非预期的消息堆积和延迟问题,您需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。
- 梳理消息的消费耗时通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。查询消费耗时,请参见获取消息消费耗时。梳理消息的消费耗时需要关注以下信息:
- 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
- 消息消费逻辑中的I/O操作(如:外部调用、读写存储等)是否是必须的,能否用本地缓存等方案规避。
- 消费逻辑中的复杂耗时的操作是否可以做异步化处理,如果可以是否会造成逻辑错乱(消费完成但异步操作未完成)。
- 设置消息的消费并发度
- 逐步调大线程的单个节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。
- 得到单个节点的最优线程数和消息吞吐量后,根据上下游链路的流量峰值计算出需要设置的节点数,节点数=流量峰值/单线程消息吞吐量。
如何解决消息堆积和延迟问题
想要快速避免消息堆积和延迟给业务带来的影响,您可以通过云消息队列 RocketMQ 版提供的监控报警功能,设置告警规则提前预警消息堆积问题,或通过业务埋点,触发报警事件,及时监控到消息堆积问题并进行处理。设置报警规则,请参见监控报警。
若收到消息堆积报警,处理方法,请参见如何处理消息堆积。