すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:メッセージフィルタリング

最終更新日:Jul 09, 2024

コンシューマーがトピックをサブスクライブすると、ApsaraMQ for RocketMQブローカーはトピック内のすべてのメッセージをコンシューマーに配信します。 コンシューマーがトピックからのメッセージのみを必要とする場合は、メッセージ属性とフィルター条件を設定できます。 ApsaraMQ for RocketMQブローカーは、フィルター条件に一致する属性を持つメッセージのみをコンシューマーに配信します。 このトピックでは、メッセージフィルタリング機能の動作メカニズム、シナリオ、および制限について説明します。 このトピックでは、メッセージフィルタリングの設定方法についても説明し、メッセージフィルタリングのサンプルコードも提供します。

説明

メッセージフィルタリング機能は、メッセージ属性とフィルタ条件を設定することによって実装されます。 メッセージ属性を設定して、プロデューサーがトピックに送信するメッセージを分類できます。 さらに、コンシューマーがトピックで指定された属性を持つメッセージをサブスクライブできるように、フィルター条件を設定する必要があります。 このようにして、ブローカは、プロデューサから送信されたメッセージをフィルタリングし、指定されたフィルタ条件を満たすメッセージのみをコンシューマに配信することができる。

コンシューマーがトピックをサブスクライブしているが、フィルター条件が指定されていない場合、プロデューサーから送信されたメッセージに属性が設定されているかどうかに関係なく、トピック内のすべてのメッセージがコンシューマーに配信されます。

次の表に、ApsaraMQ for RocketMQでサポートされているメッセージフィルタリング方法を示します。

移動方法

説明

シナリオ

インスタンスの制限

プロトコルの制限

タグを使用してメッセージをフィルタリングする (デフォルトの方法)

  • プロデューサーがメッセージを送信するときに、メッセージにタグを追加できます。

  • コンシューマーがサブスクライブするメッセージのタグを指定できます。

プロデューサから送信されたメッセージのタグが、コンシューマがサブスクライブするメッセージの指定されたタグと一致する場合、ブローカは、そのメッセージをコンシューマに配信する。

この方法は、単純なフィルタリングシナリオに適用されます。

メッセージに追加できるタグは1つだけです。 したがって、このメソッドは、単一の属性でメッセージを分類およびフィルタリングする必要がある場合に使用できます。

なし

なし

SQL式を使用したメッセージのフィルタリング

  • プロデューサーがメッセージを送信するときに、カスタムメッセージ属性を設定できます。

  • SQL式を使用して、顧客がサブスクライブするメッセージをフィルタリングするための条件を指定できます。

フィルタ条件を満たすメッセージが消費者に配信されます。

この方法は、複雑なフィルタリングシナリオに適用されます。

メッセージに複数のカスタム属性を設定できます。 この方法では、カスタムSQL式を使用して複数の属性でメッセージをフィルタリングできます。

Enterprise Platinum Editionインスタンスのみがこの方法をサポートしています。

TCPクライアントSDKの商用版のみがこの方法をサポートしています。

タグを使用したメッセージのフィルタリング

タグは、トピック内のメッセージを分類するラベルです。 ApsaraMQ for RocketMQプロデューサーがメッセージを送信するときに、メッセージにタグを追加できます。 消費者は、メッセージに添付されたタグに基づいてメッセージを購読できます。

シナリオ

次の図は、eコマースのトランザクションシナリオの例を示しています。 注文から製品の受け取りまでのプロセスで、一連のメッセージが生成されます。 関連するメッセージの例を次に示します。

  • 注文メッセージ

  • 支払いメッセージ

  • 物流メッセージ

これらのメッセージは、Trade_topicという名前のトピックに送信され、異なるシステムによってサブスクライブされます。 設定例:

  • 支払いシステム: 支払いに関連するメッセージのみを購読します。

  • 物流システム: 物流に関連するメッセージのみを購読します。

  • トランザクション成功率分析システム: 注文と支払いに関連するメッセージを購読します。

  • リアルタイムコンピューティングシステム: トランザクションに関連するすべてのメッセージをサブスクライブします。

次の図は、フィルタリングプロセスを示しています。 filtermessage

設定方法

ApsaraMQ for RocketMQでは、タグを使用してメッセージをフィルタリングするためのコードをクライアントSDKで定義できます。 プロデューサーがメッセージを送信するときにメッセージにタグを追加し、コンシューマーがサブスクライブするメッセージのタグを指定する必要があります。 クライアントSDKの詳細については、「」をご参照ください。 次の情報は、プロデューサーとコンシューマーのコードを定義する方法を示しています。

  • メッセージを送信する

    メッセージを送信する前に、各メッセージのタグを指定します。

        Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());                
  • すべてのメッセージを購読する

    トピック内のすべてのメッセージをサブスクライブするようにコンシューマーを構成する場合は、アスタリスク (*) を使用してすべてのタグを示します。

    consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 特定のタイプのメッセージを購読する

    トピック内の特定のタイプのメッセージをサブスクライブするようにコンシューマーを構成する場合は、対応するタグを指定します。

    consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • 複数のタイプのメッセージを購読する

    トピック内の複数のタイプのメッセージをサブスクライブするようにコンシューマーを構成する場合は、対応するタグを指定し、2つの縦棒 (| |) で区切ります。

    consumer.subscribe("MQ_TOPIC", "TagA | | TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                
  • エラーの例

    コンシューマーにトピックへの複数のサブスクリプションがあり、各サブスクリプションに異なるタグがある場合、コンシューマーは、タグが最新のサブスクリプションで指定されたタグと一致するメッセージのみを受信します。

        // In the following code, a consumer can receive only messages with TagB in MQ_TOPIC but cannot receive messages with TagA. 
        consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });
        consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println(message.getMsgID());
                return Action.CommitMessage;
            }
        });                

SQL式を使用したメッセージのフィルタリング

SQL式を使用してメッセージをフィルターするには、次の手順を実行します。プロデューサーがメッセージを送信するときにメッセージのカスタム属性を設定し、SQL構文を使用してフィルター式を定義し、コンシューマーがサブスクライブするメッセージの属性を指定します。 ApsaraMQ for RocketMQは、カスタム属性がフィルター式の計算結果と一致するメッセージを選択し、これらのメッセージをコンシューマーに配信します。

説明

タグは特殊なタイプのメッセージ属性に属します。 したがって、SQL式ベースのフィルタリング方法は、タグベースのフィルタリング方法と互換性があります。 SQL式を使用して、メッセージのフィルタリングに使用するタグを指定できます。 SQL構文では、タグ属性はTAGSで表されます。

制限事項

SQL式を使用してメッセージをフィルタリングする場合、次の制限事項に注意してください。

  • Enterprise Platinum Editionインスタンスのみがこの機能をサポートしています。 Standard Editionインスタンスはこの機能をサポートしていません。

  • SQL式を使用してメッセージをフィルタリングするには、TCPクライアントのみを使用できます。 HTTPクライアントはこの機能をサポートしていません。

  • ブローカーがSQL式を使用してメッセージをフィルタリングすることを許可していないが、コンシューマーのフィルタ式を定義している場合、コンシューマーの起動時にエラーが報告されるか、コンシューマーがメッセージを受信できません。

シナリオ

次の図は、eコマースのトランザクションシナリオの例を示しています。 注文から製品の受け取りまでのプロセスで、一連のメッセージが生成されます。 メッセージは、注文メッセージと物流メッセージとに分類される。 リージョン属性は物流メッセージに設定されており、リージョン属性の値は杭州と上海です。

  • 注文メッセージ

  • 物流メッセージ

    • リージョン属性の値が杭州であるロジスティクスメッセージ

    • リージョン属性の値が上海である物流メッセージ

これらのメッセージは、Trade_Topicトピックに送信され、異なるシステムによってサブスクライブされます。 設定例:

  • ロジスティクスシステム1: リージョン属性の値が杭州であるロジスティクスメッセージのみをサブスクライブします。

  • 物流システム2: すべての物流メッセージを購読する。

  • 注文追跡システム: 注文メッセージのみを購読します。

  • リアルタイムコンピューティングシステム: トランザクションに関連するすべてのメッセージをサブスクライブします。

次の図は、フィルタリングプロセスを示しています。 Message filtering by using SQL expressions

設定方法

ApsaraMQ for RocketMQでは、クライアントSDKでコードを定義して、SQL式を使用してメッセージをフィルタリングできます。 プロデューサーがメッセージを送信するためのコードでカスタムメッセージ属性を構成し、コンシューマーがメッセージをサブスクライブするためのコードでSQL構文を使用してフィルター式を定義する必要があります。 クライアントSDKの詳細については、「」をご参照ください。 次の情報は、プロデューサーとコンシューマーのコードを定義する方法を示しています。

  • プロデューサー:

    カスタムメッセージ属性を設定します。

    Message msg = new Message("topic", "tagA", "Hello MQ".getBytes());
    // Configure Attribute A and set the attribute value to 1. 
    msg.putUserProperties("A", "1");
  • 消費者:

    カスタム属性に基づいてメッセージをフィルタリングするSQL構文を使用して、フィルタ式を定義します。

    重要

    カスタム属性に基づいてメッセージをフィルタリングするには、まずフィルタ式にロジックを定義して、メッセージ属性が存在するかどうかを確認する必要があります。 属性が存在しない場合、フィルター式の計算結果はNULLになり、メッセージはコンシューマーに配信されません。

    // Subscribe to messages with Attribute A and the attribute value is 1. 
    consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

次の表に、フィルター式の定義に使用できるさまざまな種類のSQL構文を示します。

構文

説明

IS NULL

属性が存在しないことを指定します。

a IS NULL: 属性aは存在しません。

IS NOT NULL

属性が存在することを指定します。

a IS NOT NULL: 属性が存在します。

  • >

  • >=

  • <

  • <=

数値を比較します。 構文を使用して文字列を比較することはできません。 文字列の比較に使用する場合、コンシューマーの起動時にエラーが報告されます。

説明

数値に変換できる文字列も数値と見なされます。

  • a IS NOT NULL AND a > 100: 属性aが存在し、属性aの値が100より大きい。

  • a IS NOT NULL AND a > 'abc': エラー例。 abcは文字列です。 したがって、aとabcを比較することはできません。

xxxとxxxの間

数値を比較します。 構文を使用して文字列を比較することはできません。 文字列の比較に使用する場合、コンシューマーの起動時にエラーが報告されます。 構文は>= xxx AND <= xxxと同等です。 これは、属性の値が2つの数値の間、または2つの数値のいずれかに等しいことを意味します。

a IS NOT NULL AND (a BETWEEN 10 AND 100): 属性aが存在し、属性aの値は少なくとも10、最大で100である。

xxxとxxxの間ではありません

数値を比較します。 構文を使用して文字列を比較することはできません。 文字列の比較に使用する場合、コンシューマーの起動時にエラーが報告されます。 構文は <xxx OR > xxxと同じです。 属性の値が左側の数値より小さいか、右側の数値より大きいことを意味します。

a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): 属性aが存在し、属性aの値が10未満または100より大きい。

IN (xxx, xxx)

属性の値がセットに含まれることを示します。 セット内の要素は文字列のみです。

a IS NOT NULL AND (a IN ('abc', 'def')): 属性aが存在し、属性aの値はabcまたはdefです。

  • =

  • <>

数値と文字列を比較して、属性値が指定された数値または文字列と等しいかどうかを確認します。

a IS NOT NULL AND (a = 'abc' OR a<>'def'): 属性aが存在し、属性aの値がabcであるか、属性aの値がdefでない。

  • AND

  • OR

論理AND演算子と論理OR演算子。 これらは単純な論理関数を組み合わせるために使用でき、各論理関数は括弧内に入れる必要があります。

(a IS NOT NULL AND a > 100) OR (b IS NULL): 属性aが存在し、属性aの値が100より大きい。 あるいは、属性bは存在しない。

SQL式ベースのフィルタリングは、カスタムメッセージ属性を構成し、SQLフィルタ式を定義することによって実装されます。 フィルター式は有効な結果を生成しない場合があります。 Apache RocketMQブローカーのMessage Queueは、次のロジックに基づいてメッセージを処理します。

  • フィルター式の計算中にエラーが報告された場合、ブローカーは受信したメッセージをデフォルトで除外し、メッセージをコンシューマーに配信しません。 たとえば、数値と非数値を比較するとエラーが発生します。

  • フィルター式の計算結果がNULLの場合、または値がブール値でない場合、ブローカーは受信したメッセージをデフォルトで除外し、メッセージをコンシューマーに配信しません。 ブール値は、真値または偽値を表す。 プロデューサーが送信するメッセージにカスタム属性を設定しなかったが、このカスタム属性がSQL式のフィルター条件として使用されているとします。 この場合、フィルタ式の計算結果はNULLとなる。

  • カスタムメッセージ属性の値が浮動小数点数であるが、フィルター式で使用される属性値が整数である場合、ブローカーは受信したメッセージをデフォルトで除外し、メッセージをコンシューマーに配信しません。

サンプルコード

  • メッセージを送信します。

    メッセージのタグとカスタム属性を設定します。

    Producer producer = ONSFactory.createProducer(properties);
    // Set the value of Tag to tagA. 
    Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes());
    // Set the value of the custom attribute region to hangzhou. 
    msg.putUserProperties("region", "hangzhou");
    // Set the value of the custom attribute price to 50. 
    msg.putUserProperties("price", "50");
    SendResult sendResult = producer.send(msg);
  • カスタム属性に基づいてメッセージをサブスクライブします。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Subscribe to only messages whose value of the custom attribute region is hangzhou. If this attribute is not configured for a message or the attribute value of the message is not hangzhou, the message will not be delivered to the consumer. 
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    期待される結果: この例で送信されるメッセージにはカスタム属性領域があり、属性値はhangzhouです。 メッセージはフィルタ条件を満たし、消費者に配信されます。

  • タグとカスタム属性の両方に基づいてメッセージをサブスクライブします。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Subscribe to only messages with tagA and their values of the custom attribute price are greater than 30. 
    consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    期待される結果: この例で送信されるメッセージにはtagAとカスタム属性価格の両方があり、カスタム属性価格の値は30を超えています。 メッセージはフィルタ条件を満たし、消費者に配信されます。

  • 複数のカスタム属性に基づいてメッセージを購読します。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Subscribe to only messages whose value of the custom attribute region is hangzhou and value of the custom attribute price is less than 20. 
    consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    期待される結果: メッセージがフィルター条件を満たしていないため、メッセージはコンシューマーに配信されません。 消費者は、カスタム属性価格の値が20未満のメッセージを購読しますが、作成者のメッセージに設定されたカスタム属性価格の値は50です。

  • トピック内のすべてのメッセージを購読します。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // Set the value of the SQL expression to TRUE to subscribe to all messages in the topic. 
    consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });

    期待される結果: トピック内のすべてのメッセージがコンシューマーに配信されます。

  • エラーの例

    カスタム属性は、プロデューサーがメッセージを送信するときにメッセージに対して構成されておらず、カスタム属性が存在するかどうかをチェックするためのロジックはフィルター式に定義されていません。 カスタム属性は、式のフィルター条件として直接使用されます。 この場合、メッセージは消費者に配信されません。

    Consumer consumer = ONSFactory.createConsumer(properties);
    // The message attribute product is not configured during message sending. The filtering fails and the message will not be delivered to the consumer. 
    consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.printf("Receive New Messages: %s %n", message);
            return Action.CommitMessage;
        }
    });               

関連ドキュメント

  • 同じグループIDを使用するコンシューマインスタンスは、同じトピックをサブスクライブする必要があります。 詳細については、「サブスクリプションの一貫性」をご参照ください。

  • トピックとタグを使用して、さまざまなサービスのメッセージを分類できます。 詳細については、「トピックとタグのベストプラクティス」をご参照ください。