All Products
Search
Document Center

ApsaraMQ for RocketMQ:Consumer progress management

更新時間:Oct 11, 2023

ApsaraMQ for RocketMQ uses consumer offsets to manage the progress of consumers. This topic describes the consumer progress management mechanism of ApsaraMQ for RocketMQ.

Background information

In ApsaraMQ for RocketMQ, messages can be generated before or after they are subscribed to by consumers. How does a consumer know where to start consuming messages, and how are consumed messages marked? To overcome this challenge, ApsaraMQ for RocketMQ has developed the consumer progress management mechanism.

The consumer progress management mechanism of ApsaraMQ for RocketMQ solves the following problems:

  • Where does a client start to consume messages after it is launched?

  • How is a consumed message marked to ensure that it is not processed multiple times?

  • Can a message be consumed again by the same client if a service exception occurs?

Working mechanism

Offset

In ApsaraMQ for RocketMQ, messages are stored in multiple queues of a specific topic in the order that they arrive at the broker. Each message is assigned a unique Long-type coordinate, which is also known as the offset of the message.

Theoretically speaking, a message queue can store an indefinite number of messages. Therefore, the value range of offset is from 0 to Long.MAX_VALUE. You can locate a message based on its topic, queue, and offset. The following figure shows the relationship between these concepts.消息位点

In ApsaraMQ for RocketMQ, the offset of the earliest message in a queue is called the minimum offset (MinOffset), and the offset of the latest message is called the maximum offset (MaxOffset). Although a message queue can theoretically hold an indefinite number of messages, the physical machines on which they are stored have limited space. Therefore, ApsaraMQ for RocketMQ dynamically deletes the earliest stored message from a queue, and the MinOffset and MaxOffset values of the queue increase constantly.消费位点更新

Consumer offset

ApsaraMQ for RocketMQ follows the publish-subscribe pattern. Multiple consumer groups can subscribe to the same queue. In scenarios such as this, when a consumer deletes a message after consuming the message, other consumers are unable to consume the message.

To prevent this from happening, ApsaraMQ for RocketMQ uses consumer offsets to manage the message consumption progress of different consumers. ApsaraMQ for RocketMQ does not delete a message immediately after the message is consumed. Instead, ApsaraMQ for RocketMQ maintains a record of the latest message that is consumed by a consumer group, which is also called a consumer offset.

In the event that a client is restarted, the consumer is able to continue processing messages based on the consumer offset that is saved in the broker. If the consumer offset expires and gets deleted, the MinOffset value of the queue that is saved in the broker is used as the consumer offset.

Note

Consumer offsets are saved to and restored from ApsaraMQ for RocketMQ brokers and are not related to a specific consumer. Therefore, ApsaraMQ for RocketMQ can restore consumer progress across different consumers.

The following figure shows the relationships between the minimum offset, maximum offset, and a consumer offset in a message queue.消费进度

  • The consumer offset is always smaller than or equal to the maximum offset.

    • If messages are produced and consumed at the same rate and no unconsumed messages exist in the queue, the consumer offset is the same as the maximum offset.

    • If messages are consumed slower than they are produced, unconsumed messages exist in the queue. In this case, the consumer offset is smaller than the maximum offset, and the difference is the number of unconsumed messages.

  • Typically, the consumer offset is larger than or equal to the minimum offset. If the consumer offset is smaller than the minimum offset, the consumer is unable to consume messages. In this case, the broker restores the correct consumer offset for the consumer.

Initial consumer offset

An initial consumer offset is the consumer offset that is saved in a broker when a consumer group starts to consume a message queue for the first time.

ApsaraMQ for RocketMQ uses the maximum offset of a message queue when a consumer obtains messages from the queue for the first time as the initial consumer offset. In other words, the consumer starts consumption from the latest message in the queue.

Reset a consumer offset

If the initial or current consumer offset is not aligned with the state of your business, you can reset the consumer offset to adjust your consumer progress.

Scenarios

  • Improper initial consumer offset: The initial consumer offset is the maximum offset of the queue, and the client starts consumption from the latest message. If you need to consume earlier messages, you can reset the consumer offset to that of an earlier message.

  • Consumer lag: A large number of messages can accumulate if the consumer is unable to keep up with the speed at which messages are generated. If the accumulated messages are not mission-critical, you can set the consumer offset to a larger value to skip these messages and alleviate downstream burden.

  • Business backtracking and corrective processing: If you want to re-consume messages that have been incorrectly consumed due to business errors, you can set the consumer offset to a smaller value.

The consumer offset reset feature

The consumer offset reset feature of ApsaraMQ for RocketMQ provides the following capabilities:

  • Reset a consumer offset to the latest offset

    Consumers in the specified consumer group skip all accumulated messages in the specified topic and start consumption from the latest offset.

  • Reset a consumer offset to a specific point in time

    • Consumers start consumption from the message that corresponds to the reset point in time, regardless of whether the message is consumed.

    • You can specify a point in time within the time range from the point in time when the first message was sent to the topic to the point in time when the latest message was sent to the topic.

    • If you reset a consumer offset to a specific point in time, the broker adjusts the consumer offset to an offset that is closest to the time point.

Setting methods

  • Console operations:

    1. Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, select Instances.

    2. On the Instances page, select the instance that you want to manage. In the left-side navigation pane of the Instance Details page that appears, click Groups.

    3. On the Groups page, click the group that you want to manage. On the Group Details page that appears, reset the consumer offset.

  • API operation: ResetConsumeOffset

Limits

  • After you reset a consumer offset, the consumer starts to consume messages from the new offset. In backtracking scenarios, the consumer starts with historical messages that are mostly cold data. This is referred to as cold reads and may cause undue burden to your system. Evaluate the risks and benefits before you reset a consumer offset. We recommend that you implement strict control policies for this permission to prevent abuse and frequent resets.

  • ApsaraMQ for RocketMQ allows you to reset the consumer offset only for visible messages. You cannot reset the offset for messages that are in the scheduling or retry pending states. For more information, see Scheduled and delayed messages and Consumption retry.

Version compatibility

Brokers have different definitions for the initial consumer offset in different versions of ApsaraMQ for RocketMQ:

  • In 4.x and 3.x versions, the initial consumer offset is defined as the message status of a queue.

  • In 5.x versions, the initial consumer offset is the maximum offset of the queue at the time when the consumer starts receiving messages.

Therefore, if you upgrade from an earlier version, you must pay attention to the initial consumer offset when you launch your client.

Usage notes

Strictly control the reset permissions

Resetting the consumer offset incurs additional burden on the system and may affect message reads and writes. Therefore, we recommend that you evaluate the risks and benefits before you perform this operation.