本文主要介绍云消息队列 Kafka 版订阅者的最佳实践,帮助您减少消费消息出错的可能性。
消费消息基本流程
云消息队列 Kafka 版订阅者在订阅消息时的基本流程为:Poll数据→执行消费逻辑→再次Poll数据,详情参见下图。
负载均衡
每个Group可以包含多个消费实例,即可以启动多个云消息队列 Kafka 版Consumer,并把参数group.id
设置成相同的值。属于同一个Group的消费实例会负载消费订阅的Topic。
例如Group A订阅了Topic A,并开启三个消费实例C1、C2、C3,则发送到Topic A的每条消息最终只会传给C1、C2、C3的某一个。云消息队列 Kafka 版默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。
云消息队列 Kafka 版负载均衡消费的内部原理是,把订阅的Topic的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有消费实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。
消费客户端(Consumer)频繁出现Rebalance
心跳超时会引发Rebalance,可以通过参数调整、提高消费速度等方法解决。更多信息,请参见为什么消费客户端频繁出现Rebalance?。
分区个数
分区个数主要影响的是消费者的并发数量。
对于同一个Group内的消费者来说,一个分区最多只能被一个消费者消费。因此,消费实例的个数不要大于分区的数量,否则会有消费实例分配不到任何分区而处于空跑状态。
控制台的默认分区个数是12,可以满足绝大部分场景的需求。您可以根据业务使用量进行增加。不建议分区数小于12,否则可能影响消费发送性能;也不建议超过100个,否则易引发消费端Rebalance。
分区增加后,将不能减少,请小幅度调整。
多个订阅
云消息队列 Kafka 版支持以下多个订阅方式:
Group订阅多个Topic。
一个Group可以订阅多个Topic,多个Topic的消息被Group中的Consumer均匀消费。例如Group A订阅了Topic A、Topic B、Topic C,则这三个Topic中的消息,被Group中的Consumer均匀消费。
Group订阅多个Topic的示例代码如下:
String topicStr = kafkaProperties.getProperty("topic"); String[] topics = topicStr.split(","); for (String topic: topics) { subscribedTopics.add(topic.trim()); } consumer.subscribe(subscribedTopics);
Topic被多个Group订阅。
一个Topic可以被多个Group订阅,且各个Group独立消费Topic下的所有消息。例如Group A订阅了Topic A,Group B也订阅了Topic A,则发送到Topic A的每条消息,不仅会传一份给Group A的消费实例,也会传一份给Group B的消费实例,且这两个过程相互独立,相互没有任何影响。
一个Group对应一个应用
建议一个Group对应一个应用,即不同的应用对应不同的代码。如果您需要将不同的代码写在同一个应用中,请准备多份不同的kafka.properties。例如kafka1.properties、kafka2.properties。
消费位点
每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset。
云消息队列 Kafka 版Consumer会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为消费位点ConsumerOffset。
剩余的未消费的条数(也称为消息堆积量)=MaxOffset-ConsumerOffset。
消费位点提交
云消息队列 Kafka 版消费者有两个相关参数:
enable.auto.commit:是否采用自动提交位点机制。默认值为true,表示默认采用自动提交机制。
auto.commit.interval.ms: 自动提交位点时间间隔。默认值为1000,即1s。
这两个参数组合的结果就是,每次poll数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数auto.commit.interval.ms规定的时长,则客户端会启动位点提交动作。
因此,如果将enable.auto.commit设置为true,则需要在每次poll数据时,确保前一次poll出来的数据已经消费完毕,否则可能导致位点跳跃。
如果想自己控制位点提交,请把enable.auto.commit设为false,并调用commit(offsets)
函数自行控制位点提交。
消费位点重置
以下两种情况,会发生消费位点重置:
当服务端不存在曾经提交过的位点时(例如客户端第一次上线)。
当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。
Java客户端可以通过auto.offset.reset来配置重置策略,主要有三种策略:
latest:从最大位点开始消费。
earliest:从最小位点开始消费。
none:不做任何操作,即不重置。
建议设置成latest,而不要设置成earliest,避免因位点非法时从头开始消费,从而造成大量重复。
如果是您自己管理位点,可以设置成none。
拉取大消息
消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时,需要注意控制拉取速度,注意修改配置:
max.poll.records:每次Poll获取的最大消息数量。如果单条消息超过1 MB,建议设置为1。
fetch.max.bytes:设置比单条消息的大小略大一点。
max.partition.fetch.bytes:设置比单条消息的大小略大一点。
拉取大消息的核心是逐条拉取的。
消息重复和消费幂等
云消息队列 Kafka 版消费的语义是at least once, 也就是至少投递一次,保证消息不丢失,但是无法保证消息不重复。在出现网络问题、客户端重启时均有可能造成少量重复消息,此时应用消费端如果对消息重复比较敏感(例如订单交易类),则应该做消息幂等。
以数据库类应用为例,常用做法是:
发送消息时,传入key作为唯一流水号ID。
消费消息时,判断key是否已经消费过,如果已经被消费,则忽略,如果没消费过,则消费一次。
如果应用本身对少量消息重复不敏感,则不需要做此类幂等检查。
消费失败
云消息队列 Kafka 版是按分区逐条消息顺序向前推进消费的,如果消费端拿到某条消息后执行消费逻辑失败,例如应用服务器出现了脏数据,导致某条消息处理失败,等待人工干预,那么有以下两种处理方式:
失败后一直尝试再次执行消费逻辑。这种方式有可能造成消费线程阻塞在当前消息,无法向前推进,造成消息堆积。
云消息队列 Kafka 版没有处理失败消息的设计,实践中通常会打印失败的消息或者存储到某个服务(例如创建一个Topic专门用来放失败的消息),然后定时检查失败消息的情况,分析失败原因,根据情况处理。
消费延迟
云消息队列 Kafka 版的消费机制是由客户端主动去服务端拉取消息进行消费的。因此,如果客户端能够及时消费,则不会产生较大延迟。如果产生了较大延迟,请先关注是否有堆积,并注意提高消费速度。
消费阻塞以及堆积
消费端最常见的问题就是消费堆积,最常造成堆积的原因是:
消费速度跟不上生产速度,此时应该提高消费速度,详情请参见提高消费速度。
消费端产生了阻塞。
消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。
消费端应该竭力避免堵塞消费线程,如果存在等待调用结果的情况,建议设置等待的超时时间,超时后作为消费失败进行处理。
提高消费速度
提高消费速度有以下两个办法:
增加Consumer实例个数。
可以在进程内直接增加(需要保证每个实例对应一个线程,否则没有太大意义),也可以部署多个消费实例进程;需要注意的是,实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作。
增加消费线程。
增加Consumer实例本质上也是增加线程的方式来提升速度,因此更加重要的性能提升方式是增加消费线程,最基本的步骤如下:
定义一个线程池。
Poll数据。
把数据提交到线程池进行并发处理。
等并发结果返回成功后,再次poll数据执行。
消息过滤
云消息队列 Kafka 版自身没有消息过滤的语义。实践中可以采取以下两个办法:
如果过滤的种类不多,可以采取多个Topic的方式达到过滤的目的。
如果过滤的种类多,则最好在客户端业务层面自行过滤。
实践中请根据业务具体情况进行选择,也可以综合运用上面两种办法。
消息广播
云消息队列 Kafka 版没有消息广播的语义,可以通过创建不同的Group来模拟实现。
订阅关系
同一个Group内,各个消费实例订阅的Topic最好保持一致,避免给排查问题带来干扰。