全部產品
Search
文件中心

ApsaraMQ for RocketMQ:訂閱關係一致

更新時間:Dec 27, 2024

訂閱關係一致指的是同一個消費者分組Group ID下,所有Consumer執行個體所訂閱的Topic和Tag必須完全一致。如果訂閱關係不一致,可能導致訊息消費邏輯混亂,訊息被重複消費或遺漏。

什麼是訂閱關係一致?

雲訊息佇列 RocketMQ 版裡的一個消費者分組Group ID代表一個Consumer執行個體群組。對於大多數分布式應用來說,一個Group ID下通常會掛載多個Consumer執行個體。

雲訊息佇列 RocketMQ 版的訂閱關係指的是指定某個消費者分組Group ID對於某個主題Topic的訂閱,包括訂閱Topic時的過濾條件。

因此,訂閱關係一致性的範圍是同一Group ID下所有消費者對於指定Topic的訂閱都相同。其中包括:

  • 訂閱的Topic必須一致。

    例如:Consumer1訂閱TopicA和TopicB,Consumer2也必須訂閱TopicA和TopicB,不能只訂閱TopicA、只訂閱TopicB或訂閱TopicA和TopicC。

  • 訂閱的同一個Topic中的Tag必須一致,包括Tag的數量和Tag的順序。

    例如:Consumer1訂閱TopicB且Tag為Tag1||Tag2,Consumer2訂閱TopicB的Tag也必須是Tag1||Tag2,不能只訂閱Tag1、只訂閱Tag2或者訂閱Tag2||Tag1

  • 訂閱多個Topic時Topic的類型一致。

    例如,Consumer1和Consumer2都同時訂閱TopicA和TopicB,則這兩個Topic的類型必須一致,必須都是普通訊息或者都是順序訊息。雲訊息佇列 RocketMQ 版支援的訊息類型,請參見訊息類型列表

正確的訂閱關係如下,多個Group ID分別訂閱了不同的Topic,但是同一個Group ID下的多個Consumer執行個體C1、C2、C3訂閱的Topic和Tag都一致。正確訂閱關係

重要

雲訊息佇列 RocketMQ 版支援使用TCP協議和HTTP協議的SDK用戶端收發訊息,除了保證同一Group ID下的Consumer執行個體訂閱關係一致,還必須保證訂閱訊息的Group ID的協議版本和SDK的協議版本一致,例如,使用TCP協議的SDK收發訊息,訂閱訊息時也必須使用建立的TCP協議的Group ID,否則會導致訊息消費失敗。

如何查看訂閱關係是否一致?

操作入口如下:

  1. 登入雲訊息佇列 RocketMQ 版控制台,在实例列表中單擊目標執行個體名稱,進入实例详情頁面。

  2. 在左側導覽列單擊Group 管理,然後在Group列表單擊目標Group名稱。

  3. Group 详情頁面的订阅关系地區查看指定Group的訂閱關係是否一致。

正確訂閱關係一:訂閱一個Topic且訂閱一個Tag

如下圖所示,同一Group ID下的三個Consumer執行個體C1、C2和C3分別都訂閱了TopicA,且訂閱TopicA的Tag也都是Tag1,符合訂閱關係一致原則。

正確樣本1正確範例程式碼一

C1、C2、C3的訂閱關係一致,即C1、C2、C3訂閱訊息的代碼必須完全一致,程式碼範例如下:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "Tag1", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

正確訂閱關係二:訂閱一個Topic且訂閱多個Tag

如下圖所示,同一Group ID下的三個Consumer執行個體C1、C2和C3分別都訂閱了TopicB,訂閱TopicB的Tag也都是Tag1Tag2,表示訂閱TopicB中所有Tag為Tag1Tag2的訊息,且順序一致都是Tag1||Tag2,符合訂閱關係一致性原則。

正確樣本2正確範例程式碼二

C1、C2、C3的訂閱關係一致,即C1、C2、C3訂閱訊息的代碼必須完全一致,程式碼範例如下:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

正確訂閱關係三:訂閱多個Topic且訂閱多個Tag

如下圖所示,同一Group ID下的三個Consumer執行個體C1、C2和C3分別都訂閱了TopicA和TopicB,且訂閱的TopicA都未指定Tag,即訂閱TopicA中的所有訊息,訂閱的TopicB的Tag都是Tag1Tag2,表示訂閱TopicB中所有Tag為Tag1Tag2的訊息,且順序一致都是Tag1||Tag2,符合訂閱關係一致原則。

正確樣本3正確範例程式碼三

C1、C2、C3的訂閱關係一致,即C1、C2、C3訂閱訊息的代碼必須完全一致,程式碼範例如下:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });     
    consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                   

常見訂閱關係不一致情境

使用雲訊息佇列 RocketMQ 版收發訊息時,Consumer收到的訊息不符合預期並且在雲訊息佇列 RocketMQ 版控制台查看到訂閱關係不一致,則Consumer執行個體可能存在以下問題:

錯誤樣本一:同一Group ID下的Consumer執行個體訂閱的Topic不同

如下圖所示,同一Group ID下的三個Consumer執行個體C1、C2和C3分別訂閱了TopicA、TopicB和TopicC,訂閱的Topic不一致,不符合訂閱關係一致性原則。

錯誤樣本1

錯誤範例程式碼一

  • Consumer執行個體1-1:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicA", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer執行個體1-2:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • Consumer執行個體1-3:

  •     Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicB", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    

錯誤樣本二:同一Group ID下的Consumer執行個體訂閱的Topic相同,但訂閱Topic的Tag不同

如下圖所示,同一Group ID下的三個Consumer執行個體C1、C2和C3分別都訂閱了TopicA,但是C1訂閱TopicA的Tag為Tag1,C2和C3訂閱的TopicA的Tag為Tag2,訂閱同一Topic的Tag不一致,不符合訂閱關係一致性原則。

錯誤樣本2

錯誤範例程式碼二

  • Consumer執行個體2-1:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer執行個體2-2:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer執行個體2-3:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

錯誤樣本三:同一Group ID下的Consumer執行個體訂閱的Topic及Topic的Tag都相同,但訂閱的Tag順序不同

如下圖所示,同一Group ID下的三個Consumer執行個體C1、C2和C3分別都訂閱了TopicA和TopicB,並且訂閱的TopicA都沒有指定Tag,訂閱TopicB的Tag都是Tag1Tag2,但是C1訂閱TopicB的Tag為Tag1||Tag2,C2和C3訂閱的Tag為Tag2||Tag1,順序不一致,不符合訂閱關係一致性原則。

錯誤樣本3

錯誤範例程式碼三

  • Consumer執行個體3-1:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag1||Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer執行個體3-2:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • Consumer執行個體3-3:

     Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2||Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

訂閱關係常見問題

一個Group ID是否可以和多個Topic存在訂閱關係?

支援,一個Group可以訂閱多個Topic,該Group對於指定一個Topic的訂閱算1個訂閱關係,您需要確保每一個訂閱關係中,Group下所有Consumer對於指定Topic的訂閱都一致。

訂閱關係是否支援手動刪除?

訂閱關係需要在消費者用戶端代碼中設定。