This topic provides answers to frequently asked questions about message accumulation and latency when you use the ApsaraMQ for RocketMQ SDK for Java client over TCP. After you understand how the ApsaraMQ for RocketMQ client consumes messages and why messages are accumulated, you can better plan resources and configure settings to deploy your business. You can also adjust the business logic during O&M at the earliest opportunity to avoid impacts on business operations due to message accumulation and latency.
Background information
Assume that the message consumption speed of the client cannot keep up with the message sending speed of the broker in the message processing process. The number of unprocessed messages gradually increases. These messages are named accumulated messages. Message accumulation causes message consumption latency. Take special note of message accumulation and latency in the following scenarios:
Messages are continuously accumulated because the upstream and downstream capabilities of the business system cannot match each other. In addition, message consumption cannot be automatically recovered.
The business system has high requirements on real-time message consumption and even cannot accept latency caused by short accumulation.
How does the client consume messages?
The following figure shows the consumption process on the ApsaraMQ for RocketMQ SDK client that uses TCP.
When messages are consumed in push mode on the SDK client, the consumption consists of the following two phases:
Phase 1: Pull messages. The SDK client pulls messages from the ApsaraMQ for RocketMQ broker in batches by using the long polling mechanism. The pulled messages are cached in the local buffer queue.
High throughput is implemented in most intranet environments. For example, assume that a machine has a specification of 4 cores and 8 GB of memory. The transactions per second (TPS) of the machine can reach tens of thousands if the machine has a single thread and a single partition. The TPS can reach hundreds of thousands if the machine has multiple partitions. Therefore, this phase is not a bottleneck that causes message accumulation although the SDK client pulls messages in batches.
Phase 2: Submit messages to the consumption thread. The SDK client submits the locally cached messages to the consumption thread and the thread uses the business consumption logic to process the messages.
The consumption capability of the client depends on the complexity of the business logic (consumption time) and the consumption concurrency. If the business processing logic is complex and it takes a long time to process a single message, the overall message throughput will not be high. This way, the local buffer queue on the client will reach the upper limit, and the client will stop pulling messages from the broker.
Based on the preceding client-side consumption mechanism, the major bottleneck that causes message accumulation lies in the consumption capability of the local client. This means that the consumption time and consumption concurrency determine whether messages will be accumulated. To avoid or solve message accumulation problems, you must properly control the consumption time and concurrency. The consumption time takes priority over the consumption concurrency. Therefore, set an appropriate consumption concurrency on the premise that the consumption time is reasonable.
Consumption time
The consumption logic that affects the consumption time includes CPU and in-memory computing and external I/O operations. Compared with external I/O operations, the internal computing time is almost negligible if complex recursion and loops are not defined in the code. External I/O operations typically include the following business logic:
Data reads and writes on external databases, such as MySQL databases.
Data reads and writes on external cache systems, such as Redis.
Downstream system calls, such as Dubbo calls and downstream HTTP interface calls.
You must sort out the logic and system capacity of such external calls to understand the expected time consumed by each call. This way, you can determine whether the time consumed by the I/O operations in the consumption logic is reasonable. In most cases, messages are accumulated because the consumption time is increased due to service exceptions or capacity limits in downstream systems.
Assume that data needs to be written to the database and the consumption time of a single message is 1 millisecond in the consumption logic of your business. No errors occur in most cases because the message volume is small. When the business side holds large promotions, the TPS of write operations on the database explodes and reaches the limit of the database capacity. As a result, the time of consuming a single message increases to 100 milliseconds. The consumption speed sharply dropped. You cannot solve this problem by only adjusting the consumption concurrency of the ApsaraMQ for RocketMQ SDK client. To fundamentally improve the consumption capability of the client, you must upgrade the database capacity.
Consumption concurrency
The following table describes the methods of calculating message consumption concurrency in ApsaraMQ for RocketMQ.
Message type | Consumption concurrency |
Normal message | Number of threads per node × Number of nodes |
Scheduled message and delayed message | |
Transactional message | |
Ordered message | Min(Threads per node × Number of nodes, Number of partitions) |
The consumption concurrency of a client is determined by the number of threads per node and the number of nodes. In most cases, you must first adjust the number of threads on a single node. If your hardware resources on the single node have reached the upper limit, you must add nodes to increase the consumption concurrency.
The consumption concurrency of ordered messages is also limited by the number of partitions in a topic. Contact Alibaba Cloud Customer Services to evaluate the number of partitions.
Set the consumption concurrency on a single node with caution. An excessively large number of threads will lead to large overheads of thread switching. Use the following model to calculate the optimal number of threads for a single node in an ideal environment:
The number of vCPUs of a single node is C.
The time consumed for thread switching is ignored, and I/O operations do not consume CPU resources.
The thread has enough messages that wait for processing, and the memory is sufficient.
In the logic, the CPU time is T1 and the external I/O operation time is T2.
Therefore, a single thread can achieve a TPS of 1/(T1 + T2). If the CPU utilization reaches the desired value 100%, you must set C × (T1 + T2)/T1 threads to make the single node reach its maximum consumption capability.
The maximum number of threads in this example is only the theoretical data obtained under the ideal environment. In actual application environment, we recommend that you gradually increase the number of threads, observe the effects, and then make adjustments.
How can I avoid message accumulation and latency?
To avoid unexpected message accumulation and latency, you must check and sort out the entire business logic in the early stage of design. You can sort out the performance baseline for normal business operations so that you can locate the blocking points when faults occur. The major task is to sort out the message consumption time and concurrency.
Sort out the message consumption time
Obtain the message consumption time by performing stress tests and analyze the code logic of time-consuming operations. For more information about how to query the consumption time, see Query the message consumption time. Take note of the following information when you sort out the message consumption time:
Check whether the computing complexity of the message consumption logic is too high, and whether the code has defects, such as infinite loops and recursion.
Check whether I/O operations, such as external calls and read/write storage, in the message consumption logic are required. In addition, check whether solutions such as local cache can be used to avoid these operations.
Check whether complex and time-consuming operations in the consumption logic can be asynchronously processed. If the operations can be asynchronously processed, check whether the asynchronous operations will cause confusing logic. For example, the consumption is completed but the asynchronous operation is not completed.
Set the message consumption concurrency
Gradually increase the number of threads on a single node. Then, you can observe the metrics of the node to obtain the optimal number of consumption threads and maximum message throughput for the node.
After you obtain the optimal number of threads and message throughput for a single node, calculate the number of required nodes. You can calculate it based on the peak traffic of the upstream and downstream links by using the formula: Number of nodes = Peak traffic/Message throughput of a single thread.
How do I resolve message accumulation and latency?
To avoid the impacts of message accumulation or latency on your business, you can set alert rules by using the monitoring and alerting feature provided by ApsaraMQ for RocketMQ. This way, the system sends you alerts on message accumulation. You can also use event tracking to monitor message accumulation and handle issues immediately after they occur. For more information about how to set alert rules, see Monitoring and alerting.
Set the threshold in an alert rule for message accumulation based on your business needs. If the threshold is too low, the alert may be frequently triggered. If the threshold is too high, you cannot receive the alert and troubleshoot at the earliest opportunity.
For more information about how to handle message accumulation alerts, see How can I handle accumulated messages?