If you reset a consumer offset, the offset from which a consumer starts to consume messages is changed. If faults occur or wrong messages are consumed during message consumption, you can reset the consumer offset to roll back the consumption to a specific offset or an offset in a partition for reconsumption. You can also change the offset to the latest offset and temporarily leave the accumulated messages unhandled.
Background information
ApsaraMQ for RocketMQ uses consumer offsets to manage consumer progress. ApsaraMQ for RocketMQ does not delete a message immediately after the message is consumed. Instead, ApsaraMQ for RocketMQ maintains a record of the latest message that is consumed by a consumer group, which is also called a consumer offset.
If a client is restarted, the consumers can continue processing messages based on the consumer offset that is saved on the broker.
If exceptions occur during consumption or messages are not consumed from the expected offset, you can reset the consumer offset to adjust the consumer progress.
ApsaraMQ for RocketMQ allows you to reset consumer offsets by using one of the following methods:
Start Consumption from Latest Offset
When consumers consume messages in a specific topic, the consumers skip all accumulated messages and start consumption from the latest message that is sent after you reset the consumer offset.
Start Consumption from Offset Corresponding to Specified Point in Time
Consumers start consumption from the message that corresponds to the reset point in time, regardless of whether the message is consumed.
The valid time range is between the time when the first message that is stored in the topic is produced and the time when the last message that is stored in the topic is produced.
If you reset a consumer offset to a specific point in time, the broker changes the consumer offset to an offset that is closest to the point in time.
Scenarios
Consumer lag: If consumers cannot keep up with the rate at which messages are produced, a large number of messages can accumulate. If the accumulated messages are not mission-critical, you can set the consumer offset to a larger value to skip these messages and reduce downstream workload.
Business backtracking and corrective processing: If you want to re-consume messages that are incorrectly consumed due to business errors, you can set the consumer offset to a smaller value.
Usage notes
Resetting consumer offset in a topic changes only the consumer offset of the specified group. The consumer offsets of other groups that subscribe to the topic are not affected.
You cannot reset consumer offsets in broadcasting consumption mode.
ApsaraMQ for RocketMQ allows you to reset consumer offsets only for groups that are created by using the TCP protocol.
ApsaraMQ for RocketMQ allows you to reset consumer offsets only for messages that are visible to consumers.
You can reset consumer offsets only when consumers are connected to ApsaraMQ for RocketMQ brokers.
Procedure
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances.
In the top navigation bar, select a region, such as China (Hangzhou). On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane, click Groups. On the page that appears, click the TCP tab.
Find the group for which you want to reset the consumer offset, click More in the Actions column, and then select Reset Consumer Offset.
In the Reset Consumer Offset panel, select a reset method based on your business requirements and click OK.
WarningIf you select Start Consumption from Latest Offset, all accumulated messages of the group are cleared in the specified topic. This operation takes effect after approximately 2 to 3 minutes. Do not repeat the operation. Before the reset operation takes effect, all consumers in the application stop consuming messages. If your business is latency-sensitive, proceed with caution.
If you select Start Consumption from Latest Offset, the consumers of the group skip all accumulated messages in the topic and start consumption from the latest offset.
If you select Start Consumption from Offset Corresponding to Specified Point in Time, a time picker appears. After you select a point in time from the time picker, consumers consume messages that are sent after the selected point in time regardless of whether the messages are previously consumed.
In the Note message, click OK.
FAQ
What do I do if I fail to reset a consumer offset by using the preceding two methods?
Check whether the clustering consumption mode is used. You cannot reset consumer offsets in broadcasting consumption mode.
Check whether the current consumer is connected to the ApsaraMQ for RocketMQ broker. You can reset consumer offsets only for consumers that are connected to ApsaraMQ for RocketMQ brokers.
Check the SDK version. The issue may occur because the SDK version is outdated. We recommend that you upgrade the SDK to the latest version. For example, the version of the TCP client SDK for Java must be 1.8.0.Final or later. For more information, see Release notes.
Why are specific accumulated messages delivered after I reset the consumer offset to skip accumulated messages?
The consumer offset reset feature does not take effect for messages that are being retried. Therefore, a small number of messages are still delivered after you reset a consumer offset.
What is the earliest message that consumers can consume by resetting the consumer offset?
If you select Start Consumption from Offset Corresponding to Specified Point in Time, the earliest time that you can select is the time when the first message is stored in the topic. ApsaraMQ for RocketMQ 4.x instances allow you to store messages for up to three days. Messages that are stored for more than three days are automatically deleted.
Therefore, the earliest message that consumers can consume by resetting the consumer offset is the message that is stored in the topic three days before the current point in time.
References
ApsaraMQ for RocketMQ also allows you to reset consumer offsets by calling the following API operation: