This topic describes the best practices for ApsaraMQ for Kafka producers. The information described in this topic can help you reduce errors that occur when messages are sent. The best practices in this article are written based on a Java client. A Java client uses the same basic concepts and ideas as the clients that use other programming languages. However, the implementation details of the Java client may be different from the clients that use other programming languages.
Message sending
Sample code for sending a message:
Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
topic, // The topic of the message.
null, // The partition number. We recommend that you set this parameter to null. This way, the producer automatically specifies a partition number.
System.currentTimeMillis(), // The timestamp of the message.
String.valueOf(value.hashCode()), // The key of the message.
value // The value of the message.
));
For more information about the complete sample code, see Overview.
Key and Value fields
ApsaraMQ for Kafka V0.10.2.2 supports the following message fields:
- Key: the identifier of a message.
- Value: the content of a message.
To perform message tracing, specify a unique key for each message. If you want to track the details of message sending and message consumption, you can use a unique key to query the sending and consumption logs of the message.
If you want to send a large number of messages, we recommend that you use the sticky partitioning strategy. For more information about the sticky partitioning strategy, see Sticky partitioning strategy.
Retry
In a distributed environment, a message may fail to be sent due to network issues. This failure may occur after a message is sent and ACK failure occurs, or a message fails to be sent.
ApsaraMQ for Kafka uses a virtual IP address (VIP) network architecture in which connections that are idle for a period of time are closed. As a result, inactive producers or consumers may receive the connection reset by peer
error message. If this error occurs, we recommend that you resend the message.
retries
: the number of retries allowed if a message fails to be sent.retry.backoff.ms
: the interval between two consecutive retries if a message fails to be sent. We recommend that you set this parameter to1000
. Unit: milliseconds.
Asynchronous transmission
Messages are sent in asynchronous mode. To obtain the information about the sent messages, you can call the metadataFuture.get(timeout, TimeUnit.MILLISECONDS) method.
Thread safety
Producers are thread-safe and can send messages to all topics. In most cases, one application corresponds to one producer.
Acks
ACKs have the following settings:
acks=0
: No response is returned from the broker. In this mode, the performance is high, but the risk of data loss is also high.acks=1
: A response is returned when data is written to the leader. In this mode, the performance and the risk of data loss are moderate. Data loss may occur if a failure occurs on the leader.acks=all
: A response is returned when data is written to the leader and synchronized to the followers. In this mode, the performance is low, but the risk of data loss is also low. Data loss occurs if the leader and the followers fail at the same time.
To improve message sending performance, we recommend that you set the acks parameter to 1
.
Send messages in batches to improve message sending performance
A ApsaraMQ for Kafka topic has multiple partitions. Before the ApsaraMQ for Kafka producer sends messages to the broker, the producer needs to select a partition of a topic to send messages to. To send multiple messages to the same partition, the producer packages relevant messages into a batch and sends the messages to the broker at a time. When the producer processes messages in batches, additional overheads are incurred. Small batches may increase the number of requests that are generated by the producer. The requests are stored in a queue on the producer and the broker, and also cause high CPU utilization. This increases the consumption latency and the period of time that is required to send a message. When the producer sends messages to the broker, a suitable size of each batch can reduce the number of requests that are sent from the producer to the broker. This can also increase the throughput and decrease the latency for message sending.
batch.size
: the volume of cached messages that are sent to each partition. This parameter specifies the total number of bytes of all messages in a batch, rather than the number of messages. When the volume of cached messages reaches the specified size, a network request is triggered for the producer to send the messages to the broker in a batch. If thebatch.size
is set to a small value, the message sending performance and stability may be affected. We recommend that you use the default value 16384. Unit: bytes.linger.ms
: the maximum period of time that each message can be stored in the cache. If a message is stored in the cache for a period longer than the specified limit, the producer immediately sends the message to the broker without using thebatch.size
parameter. We recommend that you set thelinger.ms
parameter to a value that ranges from 100 to 1000 based on your business requirements. Unit: milliseconds.
The batch.size
and linger.ms
parameters can be used together to determine when the ApsaraMQ for Kafka producer sends messages to the broker in batches. You can configure these two parameters based on your business requirements. To improve the message sending performance and ensure service stability, we recommend that you specify batch.size=16384
and linger.ms=1000
.
Sticky partitioning strategy
Only messages to be sent to the same partition can be included in the same batch. The partitioning strategy of the ApsaraMQ for Kafka producer is used to determine how to generate a batch. You can use the Partitioner class to select a suitable partition for the ApsaraMQ for Kafka producer based on your business requirements. For messages that have a key, the default partitioning strategy of the ApsaraMQ for Kafka producer is to hash the key of each message, and then select a partition based on the hash result. Messages that have the same key are sent to the same partition.
For messages that do not have a key, the default partitioning strategy of the ApsaraMQ for Kafka producer in versions earlier than 2.4 is to recycle all partitions of a topic, and then send messages to each partition in polling mode. The default partitioning strategy may cause higher latency because a large number of small batches may be generated. The sticky partitioning strategy is supported in ApsaraMQ for Kafka version 2.4 because the efficiency of the default partitioning strategy for messages that do not have a key is low.
The sticky partitioning strategy can be used to reduce the number of small batches. The small batches are generated because the messages that do not have a key are distributed among different partitions. When a batch is full of messages, the producer randomly selects another partition and sends subsequent messages to this partition. In this strategy, messages are sent to the same partition in a short period of time. When the producer is used for a longer period of time, messages can be evenly distributed in each partition. This strategy can be used to prevent partition skew of messages. The strategy can also help decrease the latency to improve the overall performance of the system.
If you use the ApsaraMQ for Kafka producer whose version is V2.4 or later, the producer uses the sticky partitioning strategy by default. If you are using the producer whose version is earlier than V2.4, you can set the partitioner.class
parameter to a partitioning strategy. This way, data can be partitioned based on your specified sticky partitioning strategy.
public class MyStickyPartitioner implements Partitioner {
// Record the time of the last partition change.
private long lastPartitionChangeTimeMillis = 0L;
// Record the current partition.
private int currentPartition = -1;
// The interval at which partitions are changed. Specify the interval based on your business requirements.
private long partitionChangeTimeGap = 100L;
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// Query the information about all partitions.
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
int availablePartitionSize = availablePartitions.size();
// Determine the available partitions.
if (availablePartitionSize > 0) {
handlePartitionChange(availablePartitionSize);
return availablePartitions.get(currentPartition).partition();
} else {
handlePartitionChange(numPartitions);
return currentPartition;
}
} else {
// For messages that have a key, select a partition based on the hash value of the key.
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private void handlePartitionChange(int partitionNum) {
long currentTimeMillis = System.currentTimeMillis();
// If the interval at which partitions are changed is longer than the specified time interval, select another partition. If the interval at which partitions are changed is not longer than the specified time interval, select the same partition.
if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
|| currentPartition < 0 || currentPartition >= partitionNum) {
lastPartitionChangeTimeMillis = currentTimeMillis;
currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
}
}
public void close() {}
}
OOM
Based on the design of batches in ApsaraMQ for Kafka, ApsaraMQ for Kafka caches messages and then sends the messages in batches. However, if excessive messages are cached, an out of memory (OOM) error may occur.
buffer.memory
: the size of the memory pool that is allocated to message sending. If this parameter is set to an excessively small value, the time consumed to apply for memory may be long. This affects the message sending performance and causes message sending timeout. We recommend that you set the buffer.memory parameter to a value that is greater than or equal to the value ofValue of the batch.size parameter × Number of partitions × 2
. Unit: bytes.- The default cache size that is specified by the
buffer.memory
parameter is 32 MB. The cache size is sufficient for a single producer.Important If you enable multiple producers on the same Java virtual machine (JVM), an OOM error may occur because each producer may use 32 MB of the cache space. - In most cases, you do not need to enable multiple producers when messages are produced. To prevent OOM errors in special scenarios, configure the
buffer.memory
parameter.
Partitionally ordered messages
In each partition, messages are ordered because the messages are stored in the order in which the messages are sent.
By default, to improve the availability, ApsaraMQ for Kafka does not ensure the absolute order of messages in a single partition. A small number of messages become out of order during upgrades or downtime due to failovers. Messages in a partition on which a failure occurs are moved to other partitions.
If your business requires messages to be stored in a partition in a strict order, select local storage when you create a topic.