All Products
Search
Document Center

ApsaraMQ for RocketMQ:Subscription consistency

更新時間:Dec 20, 2023

In ApsaraMQ for RocketMQ, subscription consistency means that all consumer instances identified by the same group ID must subscribe to the same topics and tags. Inconsistent subscriptions can cause errors in message consumption logic and repeated or missing consumption of messages.

What is subscription consistency?

In ApsaraMQ for RocketMQ, a consumer group is a group of consumer instances. For most distributed applications, multiple consumer instances are attached to a consumer group.

In ApsaraMQ for RocketMQ, a subscription refers to the subscription relationship between a consumer group and a specific topic, including the conditions that are used to filter messages in the subscribed topic.

To ensure that subscriptions are consistent, all consumer instances that are identified by the same group ID must be consistent in the following aspects:

  • The topics to which the consumer instances subscribe must be consistent.

    For example, if Consumer 1 subscribes to Topic A and Topic B, Consumer 2 in the same group must also subscribe to Topic A and Topic B. Consumer 2 cannot subscribe only to Topic A or Topic B, or subscribe to Topic A and Topic C.

  • The tags to which the consumer instances subscribe in the topic must be consistent, including the number and order of the tags.

    For example, if Consumer 1 subscribes to Tag1||Tag2 in Topic B, Consumer 2 in the same group must also subscribe to Tag1||Tag2 in Topic B. Consumer 2 cannot subscribe only to Tag1 or Tag2, or subscribe to Tag2||Tag1 in Topic B.

  • If the consumer instances in a consumer group subscribe to multiple topics, the message type of the topics must be consistent.

    For example, if Consumer 1 and Consumer 2 subscribe to Topic A and Topic B, the topics must be of the same message type, such as the normal message type or the ordered message type. For information about the message types that are supported by ApsaraMQ for RocketMQ, see Message types.

The following figure shows an example of consistent subscriptions. Groups of different IDs subscribe to different topics. Each group contains consumer instances C1, C2, and C3. All consumer instances in the same group subscribe to the same topics and tags.正确订阅关系

Important

ApsaraMQ for RocketMQ allows you to use the TCP and HTTP client SDKs to send and receive messages. In addition to ensuring the consistency of subscriptions of consumer instances in the same group, you must also ensure the consistency between the protocol version of the consumer group that subscribes to messages and the protocol version of the SDK. For example, if you use a TCP client SDK to send and receive messages, you must also use a consumer group that uses the TCP protocol to subscribe to messages. Otherwise, messages fail to be consumed.

How do I check whether the subscriptions of consumer instances in a group are consistent?

To check whether the subscriptions of consumer instances in a group are consistent, perform the following operations:

  1. Log on to the ApsaraMQ for RocketMQ console. On the Instances page, find the instance that you want to manage and click the name of the instance to go to the Instance Details page.

  2. In the left-side navigation pane, click Groups. On the page that appears, click the name of the consumer group that you want to manage.

  3. In the Subscriptions section of the Group Details page, check whether the subscriptions of the consumer group are consistent.

Consistent subscription 1: One tag in one topic

In the following figure, consumer instances C1, C2, and C3 belong to the same group. The consumer instances subscribe to Tag 1 in Topic A. This scenario meets the requirements of subscription consistency.

正确示例1Sample code of consistent subscription 1

The subscriptions of C1, C2, and C3 are consistent. Therefore, the code that is written for C1, C2, and C3 to subscribe to messages must be identical. Sample code:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "Tag1", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

Consistent subscription 2: Multiple tags in one topic

In the following figure, consumer instances C1, C2, and C3 belong to the same group. The consumer instances subscribe to Tag1||Tag2 in Topic B. This way, all consumer instances subscribe to the messages with Tag 1 or Tag 2 in Topic B in the same order. This scenario meets the requirements of subscription consistency.

正确示例2Sample code of consistent subscription 2

The subscriptions of C1, C2, and C3 are consistent. Therefore, the code that is written for C1, C2, and C3 to subscribe to messages must be identical. Sample code:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

Consistent subscription 3: Multiple tags in multiple topics

In the following figure, consumer instances C1, C2, and C3 belong to the same group. The consumer instances subscribe to Topic A and Topic B. No tags in Topic A are specified whereas Tag1||Tag2 in Topic B are specified. In this case, the consumer instances subscribe to all messages in Topic A and the messages with Tag 1 or Tag 2 in Topic B in the same order. This scenario meets the requirements of subscription consistency.

正确示例3Sample code of consistent subscription 3

The subscriptions of C1, C2, and C3 are consistent. Therefore, the code that is written for C1, C2, and C3 to subscribe to messages must be identical. Sample code:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });     
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                   

Inconsistent subscriptions

You use ApsaraMQ for RocketMQ to send and receive messages. The consumer instances do not receive the messages as expected and the subscriptions of the consumer instances displayed in the ApsaraMQ for RocketMQ console are inconsistent. This can be caused by one of the following issues that occur on the consumer instances:

Inconsistent subscription 1: Consumer instances that belong to the same group subscribe to different topics

In the following figure, consumer instances C1, C2, and C3 belong to the same group. The consumer instances subscribe to different topics. C1 subscribes to Topic A. C2 subscribes to Topic B. C3 subscribes to Topic C. This scenario does not meet the requirements of subscription consistency.

错误示例1

Sample code of inconsistent subscription 1

  • Consumer instance 1:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicA", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 2:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer instance 3:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicB", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    

Inconsistent subscription 2: Consumer instances that belong to the same group subscribe to different tags in the same topic

In the following figure, consumer instances C1, C2, and C3 belong to the same group. The consumer instances subscribe to Topic A. However, C1 subscribes to Tag 1 in Topic A whereas C2 and C3 subscribe to Tag 2 in Topic A. The consumer instances subscribe to different tags in the same topic. This scenario does not meet the requirements of subscription consistency.

错误示例2

Sample code of inconsistent subscription 2

  • Consumer instance 1:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer instance 2:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer instance 3:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

Inconsistency subscription 3: Consumer instances that belong to the same group subscribe to the same topic and the same tags of the topic, but the tags are specified in different orders

In the following figure, consumer instances C1, C2, and C3 belong to the same group. The consumer instances subscribe to Topic A and Topic B. No tags are specified for Topic A. Tag 1 and Tag 2 are specified for Topic B. However, C1 subscribes to Tag1||Tag2 in Topic B whereas C2 and C3 subscribe to Tag2||Tag1 in Topic B. This scenario does not meet the requirements of subscription consistency.

错误示例3

Sample code of inconsistent subscription 3

  • Consumer instance 1:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer instance 2:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer Instances 3:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

FAQ

Can a consumer group subscribe to multiple topics?

Yes, a consumer group can subscribe to multiple topics. If a consumer group subscribes to a specific topic, a subscription is established. Make sure that all consumer instances in the consumer group subscribe to the topic.

Can I delete a subscription in the ApsaraMQ for RocketMQ console?

No, you cannot delete a subscription in the ApsaraMQ for RocketMQ console. You must delete a subscription by configuring the corresponding parameter in the code of the client.