全部产品
Search
文档中心

云消息队列 Kafka 版:发布者最佳实践

更新时间:Aug 24, 2023

本文介绍云消息队列 Kafka 版发布者的最佳实践,帮助您降低发送消息的错误率。本文最佳实践基于Java客户端。对于其他语言的客户端,其基本概念与思想是相通的,但实现细节可能存在差异。

发送消息

发送消息的示例代码如下:

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   //消息主题。
        null,   //分区编号。建议为null,由Producer分配。
        System.currentTimeMillis(),   //时间戳。
        String.valueOf(value.hashCode()),   //消息键。
        value   //消息值。
));

完整示例代码,请参见SDK概述

Key和Value

0.10.2版本的云消息队列 Kafka 版的消息有以下两个字段:

  • Key:消息的标识。

  • Value:消息内容。

为了便于追踪,请为消息设置一个唯一的Key。您可以通过Key追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。

如果消息发送量较大,建议不要设置Key,并使用黏性分区策略。黏性分区策略详情,请参见黏性分区策略

重要

在0.11.0以及之后的版本,云消息队列 Kafka 版开始支持headers,如果您需要使用headers,需要将服务端升级至2.2.0版本。

失败重试

分布式环境下,由于网络等原因偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是ACK失败,也有可能是确实没发送成功。

云消息队列 Kafka 版是VIP网络架构,长时间不进行通信连接会被主动断开,因此,不是一直活跃的客户端会经常收到connection reset by peer错误,建议重试消息发送。

您可以根据业务需求,设置以下重试参数:

  • retries:消息发送失败时的重试次数。

  • retry.backoff.ms,消息发送失败的重试间隔,建议设置为1000,单位:毫秒。

异步发送

发送接口是异步的,如果您想接收发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)

线程安全

Producer是线程安全的,且可以往任何Topic发送消息。通常情况下,一个应用对应一个Producer。

Acks

Acks的说明如下:

  • acks=0:无需服务端的Response、性能较高、丢数据风险较大。

  • acks=1:服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。

  • acks=all:服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。

为了提升发送性能, 建议设置为acks=1

提升发送性能(减少碎片化发送请求)

一般情况下,一个云消息队列 Kafka 版Topic会有多个分区。云消息队列 Kafka 版Producer客户端在向服务端发送消息时,需要先确认往哪个Topic的哪个分区发送。我们给同一个分区发送多条消息时,Producer客户端将相关消息打包成一个Batch,批量发送到服务端。Producer客户端在处理Batch时,是有额外开销的。一般情况下,小Batch会导致Producer客户端产生大量请求,造成请求队列在客户端和服务端的排队,并造成相关机器的CPU升高,从而整体推高了消息发送和消费延迟。一个合适的Batch大小,可以减少发送消息时客户端向服务端发起的请求次数,在整体上提高消息发送的吞吐和延迟。

Batch机制,云消息队列 Kafka 版Producer端主要通过两个参数进行控制:

  • batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络请求,然后Producer客户端把消息批量发往服务器。如果batch.size设置过小,有可能影响发送性能和稳定性。建议保持默认值16384。单位:字节。

  • linger.ms : 每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size的限制,立即把消息发往服务器。建议根据业务场景, 设置linger.ms在100~1000之间。单位:毫秒。

因此,云消息队列 Kafka 版Producer客户端什么时候把消息批量发送至服务器是由batch.sizelinger.ms共同决定的。您可以根据具体业务需求进行调整。为了提升发送的性能,保障服务的稳定性, 建议您设置batch.size=16384linger.ms=1000

黏性分区策略

只有发送到相同分区的消息,才会被放到同一个Batch中,因此决定一个Batch如何形成的一个因素是云消息队列 Kafka 版Producer端设置的分区策略。云消息队列 Kafka 版Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。在消息指定Key的情况下,云消息队列 Kafka 版Producer的默认策略是对消息的Key进行哈希,然后根据哈希结果选择分区,保证相同Key的消息会发送到同一个分区。

在消息没有指定Key的情况下,云消息队列 Kafka 版2.4版本之前的默认策略是循环使用主题的所有分区,将消息以轮询的方式发送到每一个分区上。但是,这种默认策略Batch的效果会比较差,在实际使用中,可能会产生大量的小Batch,从而使得实际的延迟增加。鉴于该默认策略对无Key消息的分区效率低问题,云消息队列 Kafka 版在2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。

黏性分区策略主要解决无Key消息分散到不同分区,造成小Batch问题。其主要策略是如果一个分区的Batch完成后,就随机选择另一个分区,然后后续的消息尽可能地使用该分区。这种策略在短时间内看,会将消息发送到同一个分区,如果拉长整个运行时间,消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜,同时还可以降低延迟,提升服务整体性能。

如果您使用的云消息队列 Kafka 版Producer客户端是2.4及以上版本,默认的分区策略就采用黏性分区策略。如果您使用的Producer客户端版本小于2.4,可以根据黏性分区策略原理,自行实现分区策略,然后通过参数partitioner.class设置指定的分区策略。

关于黏性分区策略实现,您可以参考如下Java版代码实现。该代码的实现逻辑主要是根据一定的时间间隔,切换一次分区。

public class MyStickyPartitioner implements Partitioner {

    // 记录上一次切换分区时间。
    private long lastPartitionChangeTimeMillis = 0L;
    // 记录当前分区。
    private int currentPartition = -1;
    // 分区切换时间间隔,可以根据实际业务选择切换分区的时间间隔。
    private long partitionChangeTimeGap = 100L;
    
    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取所有分区信息。
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();

            // 判断当前可用分区。
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // 对于有key的消息,根据key的哈希值选择分区。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();

        // 如果超过分区切换时间间隔,则切换下一个分区,否则还是选择之前的分区。
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }

    public void close() {}

}

OOM

结合云消息队列 Kafka 版的Batch设计思路,云消息队列 Kafka 版会缓存消息并打包发送,如果缓存太多,则有可能造成OOM(Out of Memory)。

  • buffer.memory : 发送的内存池大小。如果内存池设置过小,则有可能导致申请内存耗时过长,从而影响发送性能,甚至导致发送超时。建议buffer.memory ≧ batch.size * 分区数 * 2。单位:字节。

  • buffer.memory的默认数值是32 MB,对于单个Producer而言,可以保证足够的性能。

    重要

    如果您在同一个JVM中启动多个Producer,那么每个Producer都有可能占用32 MB缓存空间,此时便有可能触发OOM。

  • 在生产时,一般没有必要启动多个Producer;如有特殊情况需要,则需要考虑buffer.memory的大小,避免触发OOM。

分区顺序

单个分区(Partition)内,消息是按照发送顺序储存的,是基本有序的。

默认情况下,云消息队列 Kafka 版为了提升可用性,并不保证单个分区内绝对有序,在升级或者宕机时,会发生少量消息乱序(某个分区挂掉后把消息Failover到其它分区)。

如果业务要求分区保证严格有序,请在创建Topic时选择使用Local存储。