All Products
Search
Document Center

ApsaraMQ for RocketMQ:How do I handle accumulated messages?

Last Updated:Sep 03, 2024

Symptoms

When I use my ApsaraMQ for RocketMQ instance, I receive a message accumulation alert. Then, I log on to the ApsaraMQ for RocketMQ console and find the following issues:

  • On the Group Details page, the value of Real-time Accumulated Messages of the group is higher than expected.

  • In the left-side navigation pane, I click Message Tracing. On the page that appears, I click Create Query Task. In the panel that appears, I click Query by Message ID and configure the parameters. Then, I find that some messages are published to the broker but are not delivered to consumers.

Causes

In ApsaraMQ for RocketMQ, after messages are sent to a broker, the client to which a group is bound pulls only some of the messages from the broker based on the consumer offset. In most cases, messages are not accumulated when a client pulls messages from a broker. However, if the consumption time is long or the consumption thread concurrency is low, the consumption capability of the client is insufficient. This may cause message accumulation. For information about the consumption mechanism and the causes of message accumulation, see Message accumulation and latency.

Solutions

If messages are accumulated, perform the following operations to troubleshoot the issue:

  1. Check whether messages are accumulated on the ApsaraMQ for RocketMQ broker or the client.

    Check whether the ons.log file of the client contains the following information:

    the cached message count exceeds the threshold
    • If the preceding information is found, the buffer queue on the client is full and messages are accumulated on the client. In this case, go to Step 2.

    • If the preceding information is not found, messages are not accumulated on the client. In this case, contact Alibaba Cloud technical support.

  2. Check whether the message consumption time is normal.

    • If the consumption time is abnormal, go to Step 3 to view the client stack information and troubleshoot business logic issues.

    • If the consumption time is normal, messages may be accumulated due to low consumption thread concurrency. In this case, increase the number of consumption threads or add nodes.

    You can view the consumption time by using one of the following methods:

  3. View the client stack information. You need to query only the stack information of the thread named ConsumeMessageThread. The thread includes the logic of message consumption. For information about how to determine the thread status and modify the business logic based on specific issues, see Java official documentation.

    You can obtain client stack information by using one of the following methods:

    • Log on to the ApsaraMQ for RocketMQ console. In the Client Connection section of the Group Details page, click View Stack Information in the Actions column of the client that you want to view. Then, you can view the client stack information. For more information, see View consumer details.

    • Use the Jstack tool to query the stack information.

      1. Obtain the host IP address of the consumer instance that has accumulated messages and log on to the host. For more information, see View the status of consumers.

      2. Run one of the following commands to view the ID of the process that is running on the Java client and note down the ID.

        ps -ef 
        |grep javajps -lm
      3. Run the following command to view the stack information:

        jstack -l pid > /tmp/pid.jstack
      4. Run the following command to view the information about the thread named ConsumeMessageThread:

        cat /tmp/pid.jstack|grep ConsumeMessageThread -A 10 --color

    The following items provide examples on common abnormal stacks:

    • Example 1: A stack is idle and has no accumulated messages.

      When the consumption thread has no message to consume, the thread is in the WAITING state and waits to pull messages from the buffer queue on the client.

      Sample stack 1

    • Example 2: The consumption thread is in a state such as lock stealing, sleeping, or waiting.

      The consumption thread is blocked in a sleep() method. This causes slow consumption. Sample stack 2

    • Example 3: The consumption thread is stuck when operations are being performed on external storage systems such as databases.

      The consumption thread is blocked due to external HTTP calls. This causes slow consumption. Stack example 3

  4. If the accumulated messages affect your business and can be skipped, you can reset the consumer offset. This way, consumption starts from the latest offset. For more information, see Reset consumer offsets.