すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:サブスクリプションの一貫性

最終更新日:Jul 09, 2024

ApsaraMQ for RocketMQでは、サブスクリプションの一貫性とは、同じグループIDで識別されるすべてのコンシューマーインスタンスが同じトピックとタグをサブスクライブする必要があることを意味します。 サブスクリプションの一貫性がないと、メッセージ消費ロジックにエラーが発生し、メッセージの消費が繰り返されるか失われる可能性があります。

サブスクリプションの一貫性とは

ApsaraMQ for RocketMQでは、コンシューマーグループはコンシューマーインスタンスのグループです。 ほとんどの分散アプリケーションでは、複数のコンシューマインスタンスがコンシューマグループにアタッチされます。

ApsaraMQ for RocketMQでは、サブスクリプションとは、サブスクライブされたトピックのメッセージをフィルタリングするために使用される条件など、コンシューマーグループと特定のトピックとの間のサブスクリプション関係を指します。

サブスクリプションの一貫性を確保するには、同じグループIDで識別されるすべてのコンシューマーインスタンスが次の点で一貫している必要があります。

  • コンシューマーインスタンスがサブスクライブするトピックは一貫している必要があります。

    例えば、消費者1がトピックAおよびトピックBに加入している場合、同じグループ内の消費者2もトピックAおよびトピックBに加入しなければならない。

  • 消費者インスタンスがトピックでサブスクライブするタグは、タグの数と順序を含めて一貫している必要があります。

    たとえば、コンシューマー1がトピックBのTag1 | | Tag2を購読している場合、同じグループのコンシューマー2もトピックBのTag1 | | Tag2を購読する必要があります。コンシューマー2はTag1またはTag2のみを購読することも、トピックBのTag2 | | Tag1を購読することもできません。

  • コンシューマグループ内のコンシューマインスタンスが複数のトピックをサブスクライブする場合、トピックのメッセージタイプは一貫している必要があります。

    たとえば、コンシューマ1およびコンシューマ2がトピックAおよびトピックBにサブスクライブする場合、トピックは、通常のメッセージタイプまたは順序付けられたメッセージタイプなど、同じメッセージタイプでなければなりません。 ApsaraMQ For RocketMQでサポートされているメッセージタイプについては、「メッセージタイプ」をご参照ください。

次の図は、一貫したサブスクリプションの例を示しています。 異なるIDのグループは、異なるトピックをサブスクライブします。 各グループは、消費者インスタンスC1、C2、及びC3を含む。 同じグループ内のすべてのコンシューマーインスタンスは、同じトピックとタグをサブスクライブします。正确订阅关系

重要

ApsaraMQ for RocketMQでは、TCPおよびHTTPクライアントSDKを使用してメッセージを送受信できます。 同じグループ内のコンシューマーインスタンスのサブスクリプションの一貫性を確保することに加えて、メッセージをサブスクライブするコンシューマーグループのプロトコルバージョンとSDKのプロトコルバージョンの一貫性も確保する必要があります。 たとえば、TCPクライアントSDKを使用してメッセージを送受信する場合は、TCPプロトコルを使用してメッセージをサブスクライブするコンシューマグループも使用する必要があります。 それ以外の場合、メッセージは消費されません。

グループ内のコンシューマーインスタンスのサブスクリプションが一貫しているかどうかを確認するにはどうすればよいですか。

グループ内のコンシューマーインスタンスのサブスクリプションが一貫しているかどうかを確認するには、次の操作を実行します。

  1. ApsaraMQ for RocketMQコンソールにログインします。 インスタンスリスト ページで、管理するインスタンスを見つけ、インスタンスの名前をクリックして インスタンスの詳細 ページに移動します。

  2. 左側のナビゲーションウィンドウで、グループ管理 をクリックします。 表示されるページで、管理するコンシューマーグループの名前をクリックします。

  3. グループ詳細 ページの サブスクリプション関係 セクションで、消費者グループのサブスクリプションに一貫性があるかどうかを確認します。

一貫したサブスクリプション1: 1つのトピックに1つのタグ

次の図では、コンシューマインスタンスC1、C2、およびC3が同じグループに属しています。 コンシューマーインスタンスは、トピックAのタグ1にサブスクライブします。このシナリオは、サブスクリプションの一貫性の要件を満たしています。

正确示例1一貫したサブスクリプションのサンプルコード1

C1、C2、およびC3のサブスクリプションは一貫しています。 したがって、C1、C2、およびC3がメッセージにサブスクライブするために書かれたコードは同一でなければなりません。 サンプルコード:

Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID、"GID_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "Tag1", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

一貫したサブスクリプション2: 1つのトピックで複数のタグ

次の図では、コンシューマインスタンスC1、C2、およびC3が同じグループに属しています。 コンシューマーインスタンスは、トピックBのTag1 | | Tag2をサブスクライブします。このように、すべてのコンシューマーインスタンスは、トピックBのタグ1またはタグ2を含むメッセージを同じ順序でサブスクライブします。 このシナリオは、サブスクリプションの一貫性の要件を満たしています。

正确示例2一貫したサブスクリプション2のサンプルコード

C1、C2、およびC3のサブスクリプションは一貫しています。 したがって、C1、C2、およびC3がメッセージにサブスクライブするために書かれたコードは同一でなければなりません。 サンプルコード:

Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID、"GID_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicB", "Tag1 | | Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                    

一貫したサブスクリプション3: 複数のトピックの複数のタグ

次の図では、コンシューマインスタンスC1、C2、およびC3が同じグループに属しています。 コンシューマーインスタンスはトピックAとトピックBにサブスクライブします。トピックAのタグは指定されていませんが、トピックBのTag1 | | Tag2は指定されています。 この場合、コンシューマーインスタンスは、トピックAのすべてのメッセージと、トピックBのタグ1またはタグ2のメッセージを同じ順序でサブスクライブします。 このシナリオは、サブスクリプションの一貫性の要件を満たしています。

正确示例3一貫したサブスクリプション3のサンプルコード

C1、C2、およびC3のサブスクリプションは一貫しています。 したがって、C1、C2、およびC3がメッセージにサブスクライブするために書かれたコードは同一でなければなりません。 サンプルコード:

Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID、"GID_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("TopicA", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });     
    consumer.subscribe("TopicB", "Tag1 | | Tag2", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });                   

一貫性のないサブスクリプション

ApsaraMQ for RocketMQを使用してメッセージを送受信します。 コンシューマーインスタンスが期待どおりにメッセージを受信せず、ApsaraMQ for RocketMQコンソールに表示されるコンシューマーインスタンスのサブスクリプションに一貫性がありません。 これは、コンシューマーインスタンスで発生する次のいずれかの問題が原因で発生する可能性があります。

一貫性のないサブスクリプション1: 同じグループに属するコンシューマーインスタンスは、異なるトピックをサブスクライブします

次の図では、コンシューマインスタンスC1、C2、およびC3が同じグループに属しています。 コンシューマインスタンスは、異なるトピックをサブスクライブします。 C1はトピックAにサブスクライブし、C2はトピックBにサブスクライブし、C3はトピックCにサブスクライブします。このシナリオは、サブスクリプションの一貫性の要件を満たしていません。

错误示例1

一貫性のないサブスクリプションのサンプルコード1

  • 消費者インスタンス1:

  • Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID、"GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicA", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • コンシューマインスタンス2:

  • Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID、"GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    
  • 消費者インスタンス3:

  • Properties properties = new Properties();
        properties.put(PropertyKeyConst.GROUP_ID、"GID_test_1");
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe("TopicB", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                    

一貫性のないサブスクリプション2: 同じグループに属するコンシューマーインスタンスは、同じトピック内の異なるタグをサブスクライブします

次の図では、コンシューマインスタンスC1、C2、およびC3が同じグループに属しています。 しかし、C1は、トピックAのタグ1に加入し、C2およびC3は、トピックAのタグ2に加入する。消費者インスタンスは、同じトピックの異なるタグに加入する。 このシナリオは、サブスクリプションの一貫性の要件を満たしていません。

错误示例2

一貫性のないサブスクリプション2のサンプルコード

  • 消費者インスタンス1:

    Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID、"GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • コンシューマインスタンス2:

    Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID、"GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • 消費者インスタンス3:

    Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID、"GID_test_2");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

一貫性のないサブスクリプション3: 同じグループに属するコンシューマーインスタンスは、同じトピックとトピックの同じタグをサブスクライブしますが、タグは異なる順序で指定されます

次の図では、コンシューマインスタンスC1、C2、およびC3が同じグループに属しています。 トピックAにはタグが指定されていません。トピックBにはタグ1タグ2が指定されています。ただし、C1はトピックBのTag1 | | Tag2にサブスクライブしていますが、C2とC3はトピックBのTag2 | | | Tag1にサブスクライブしています。このシナリオは、サブスクリプションの一貫性の要件を満たしていません。

错误示例3

一貫性のないサブスクリプション3のサンプルコード

  • 消費者インスタンス1:

    Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID、"GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag1 | | Tag2", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • コンシューマインスタンス2:

    Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID、"GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2 | | Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
  • コンシューマーインスタンス3:

    Properties properties = new Properties();
     properties.put(PropertyKeyConst.GROUP_ID、"GID_test_3");
     Consumer consumer = ONSFactory.createConsumer(properties);
     consumer.subscribe("TopicA", "*", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 
     consumer.subscribe("TopicB", "Tag2 | | Tag1", new MessageListener() {
     public Action consume(Message message, ConsumeContext context) {
     System.out.println(message.getMsgID());
     return Action.CommitMessage;
     }
     }); 

よくある質問

消費者グループは複数のトピックを購読できますか?

はい、消費者グループは複数のトピックを購読できます。 消費者グループが特定のトピックをサブスクライブする場合、サブスクリプションが確立されます。 コンシューマーグループ内のすべてのコンシューマーインスタンスがトピックをサブスクライブしていることを確認します。

ApsaraMQ for RocketMQコンソールでサブスクリプションを削除できますか?

ApsaraMQ for RocketMQコンソールでサブスクリプションを削除することはできません。 クライアントのコードで対応するパラメーターを設定して、サブスクリプションを削除する必要があります。