This topic describes the definition, model relationship, internal attributes, and behavior constraints for consumers in ApsaraMQ for RocketMQ. This topic also provides version compatibility information and usage notes for consumers.

Definition

A consumer is an entity that receives and processes messages in ApsaraMQ for RocketMQ.

Consumers are usually integrated in business systems. They obtain messages from ApsaraMQ for RocketMQ brokers and convert the messages into information that can be perceived and processed by business logic.

The following items determine consumer behavior:
  • Consumer identity: A consumer must be associated with a consumer group to obtain behavior settings and consumption status.
  • Consumer type: ApsaraMQ for RocketMQ provides a variety of consumer types for different development scenarios, including push consumers and simple consumers. For more information, see Consumer types.
  • Local settings for consumers: These settings specify how consumer clients run based on the consumer type. For example, you can configure the number of threads and concurrency settings on consumers to achieve different transmission effects.

Model relationship

The following figure shows how consumers are positioned in the domain model of ApsaraMQ for RocketMQ.Consumers
  1. Producers produce and send messages to the ApsaraMQ for RocketMQ broker.
  2. The ApsaraMQ for RocketMQ broker stores the messages in the queue that is specified by the topic in the order in which the messages are received.
  3. Consumers obtain and consume messages from the ApsaraMQ for RocketMQ broker based on the specified subscriptions.

Internal attributes

Consumer group name
  • Definition: the name of the consumer group associated with the current consumer. Consumers inherit their behavior from the consumer groups. For more information, see Consumer groups.
  • Values: Consumer groups are the logical resources of ApsaraMQ for RocketMQ. You must create consumer groups by using the console or calling API operations in advance. For more information about the limits on this operation, see Usage limits.
Client ID
  • Definition: the identity of a consumer client. This attribute is used to distinguish between different consumers. The value must be unique within a cluster.
  • Values: The client ID is automatically generated by the ApsaraMQ for RocketMQ SDK. It is mainly used for O&M purposes such as log viewing and problem locating. The client ID cannot be modified.

Communication parameters

Pre-bound subscription list
  • Definition: the subscription list of the specified consumer.

    The ApsaraMQ for RocketMQ broker can use the pre-bound subscription list to verify the permissions and validity of the subscribed topic during consumer initialization, instead of after the application is started.

  • Values: We recommend that you specify the subscription or the list of subscribed topics during consumer initialization. If the subscription is not specified or the subscribed topics are changed, ApsaraMQ for RocketMQ dynamically verifies the topics.
Message listener
  • Definition: the listener that a consumer uses to invoke the message consumption logic after ApsaraMQ for RocketMQ broker pushes a message to the consumer.
  • Values: The value of a message listener is configured on the consumer client.
  • Constraints: When you consume messages as a push consumer, you must configure the message listener on the consumer client. For more information about consumer types, see Consumer types.

Limits on consumption logic

In the domain model of ApsaraMQ for RocketMQ, consumers are managed in consumer groups. Consumers in the same consumer group share all messages that are delivered to the group. To ensure that messages are loaded and consumed as expected in a consumer group in ApsaraMQ for RocketMQ, the following consumption logic must be consistent for all consumers in the group:
  • Message delivery order
  • Consumption retry policy

Version compatibility

The message delivery order and consumption retry policy must be consistent for all consumers in a consumer group. The following items describe how the consistency is ensured in different ApsaraMQ for RocketMQ broker versions:
  • ApsaraMQ for RocketMQ 5.x brokers: Consumers obtain the message delivery order and consumption retry policy from the consumer group with which the consumers are associated. The consumption logic is the same for all consumers. You do not need to specify the message delivery order or consumption retry policy on the client.
  • ApsaraMQ for RocketMQ 3.x and 4.x brokers: The message delivery order and consumption retry policy are defined by API operations on your client. You must specify the message delivery order and consumption retry policy on the client to ensure that the consumption logic of all consumers in the consumer group is the same.

If you use an SDK of an earlier version to access a ApsaraMQ for RocketMQ 5.x broker, the consumption logic of consumers is determined by the configurations of API operations on the consumer client.

Usage notes

We recommend that you limit the number of consumers on individual processes.

The consumers of ApsaraMQ for RocketMQ support the non-blocking transmission mode at the communication protocol level. The non-blocking transmission mode has higher communication efficiency and supports concurrent access by multiple threads. Therefore, in most scenarios, only one consumer needs to be initialized for a consumer group in a single process. Avoid initializing multiple consumers with the same configurations during the development phase.

We recommend that you do not create and destroy consumers on a regular basis.

The consumers of ApsaraMQ for RocketMQ are underlying resources that can be reused, like the connection pool of a database. You do not need to create consumers each time you receive messages or destroy the consumers after you consume messages. If you regularly create and destroy consumers, a large number of short connection requests are generated on the broker. This imposes a high level of load on your system.

  • Correct example
    Consumer c = ConsumerBuilder.build();
      for (int i =0;i<n;i++)
        {
          Message m= c.receive();
          //process message
        }
    c.shutdown();
  • Incorrect example
    for (int i =0;i<n;i++)
      {
        Consumer c = ConsumerBuilder.build();
        Message m= c.receive();
        //process message
        c.shutdown();
      }