コンシューマーがトピックをサブスクライブすると、ApsaraMQ for RocketMQはトピック内のすべてのメッセージをコンシューマーに配信します。 コンシューマーがトピック内の特定のメッセージのみを受信するようにする場合は、フィルタリング条件を設定して、ApsaraMQ for RocketMQブローカーでメッセージをフィルタリングできます。 これは、コンシューマが多数の無効なメッセージを受信することを防止します。
シナリオ
ApsaraMQ for RocketMQは、パブリッシュサブスクライブ (pub/sub) モデルに基づいたミドルウェアサービスで、アップストリームとダウンストリームビジネスの統合に使用されます。 実際のビジネスシナリオでは、トピック内のメッセージは、異なる消費ロジックを持つ異なるダウンストリームアプリケーションによって処理されます。 各アプリケーションは、その消費ロジックを満たすメッセージのみを消費する必要があります。
ApsaraMQ for RocketMQが提供するメッセージフィルタリング機能を使用して、コンシューマーが必要とするメッセージを効率的にフィルタリングできます。 これにより、多数の無効なメッセージがコンシューマに配信されるのを防ぎ、ダウンストリームシステムの作業負荷を軽減します。
ApsaraMQ for RocketMQのメッセージフィルタリング機能は、トピック内のメッセージをフィルタリングする際の問題点を解決し、同じ事業分野で詳細な分類が必要なシナリオで使用されます。 異なる事業分野のメッセージを管理する場合は、異なるトピックを使用することを推奨します。
概要
メッセージフィルタリングとは何ですか?
ApsaraMQ for RocketMQのメッセージフィルタリング機能は、コンシューマーが設定した条件に基づいてメッセージをフィルタリングし、条件を満たすメッセージをコンシューマーに送信します。
メッセージ属性とタグがプロデューサーとコンシューマーに定義されると、メッセージはApsaraMQ for RocketMQブローカーのフィルタリング条件に基づいてフィルタリングされ、照合されます。 フィルタリング条件を満たすメッセージのみがコンシューマに配信されます。
メッセージフィルタリングはどのように機能しますか?
メッセージのフィルタリングには次の手順があります。
プロデューサ: プロデューサがメッセージを初期化する前に、プロデューサは、コンシューマによって設定されたフィルタリング条件に一致するために使用される属性およびタグをメッセージに添付します。
コンシューマー: メッセージの初期化と使用中に、コンシューマーはサブスクリプション登録操作を呼び出して、特定のトピックでサブスクライブするメッセージをApsaraMQ for RocketMQブローカーに通知します。 これは、フィルタリング条件の報告としても知られています。
ブローカー: コンシューマーがメッセージを消費すると、ApsaraMQ for RocketMQブローカーは、報告されたフィルタリング条件の式に基づいてメッセージを動的に照合し、フィルタリング条件を満たすメッセージをコンシューマーに配信します。
分類
ApsaraMQ for RocketMQは、タグベースのフィルタリング方法と属性ベースのSQLフィルタリング方法をサポートしています。 次の表にメソッドを示します。
項目 | タグベースのフィルタリング | 属性ベースのSQLフィルタリング |
Target | メッセージタグ。 | カスタム属性とシステム属性を含むメッセージ属性。 メッセージタグは、システム属性の一種です。 |
容量 | 完全一致。 | SQL構文ベースの一致。 |
シナリオ | シンプルで軽量なコンピューティングロジックを含むフィルタリングシナリオ。 | 複雑なコンピューティングロジックを含むフィルタリングシナリオ。 |
詳細については、「タグベースのフィルタリング」および「属性SQLベースのフィルタリング」をご参照ください。
サブスクリプションの一貫性
フィルタリング式は、サブスクリプションの一部を構成します。 ApsaraMQ for RocketMQのドメインモデルに基づいて、同じコンシューマーグループ内のコンシューマーのサブスクリプションのフィルタリング式は同じである必要があります。 そうしないと、特定のメッセージを消費できません。 詳細については、「サブスクリプション」をご参照ください。
タグベースのフィルタリング
タグベースのフィルタリングは、ApsaraMQ for RocketMQが提供する基本的なフィルタリング方法です。 このメソッドでは、プロデューサによって定義されたタグに基づいてメッセージが照合されます。 消費者はタグを使用して、消費するメッセージを指定します。
サンプルシナリオ
次の項目は、eコマースのトランザクションシナリオで生成されるメッセージについて説明します。
注文メッセージ
支払いメッセージ
物流メッセージ
メッセージは、次のダウンストリームシステムによってサブスクライブされているTrade_Topicという名前のトピックに送信されます。
支払いシステム: 支払いメッセージのみを購読します。
物流システム: 物流メッセージのみを購読します。
トランザクション成功率分析システム: 注文と支払いのメッセージを購読します。
リアルタイムコンピューティングシステム: すべてのメッセージをサブスクライブします。
次の図は、フィルタリング効果を示しています。
タグ設定
プロデューサがメッセージを送信する前に、プロデューサは各メッセージに1つのタグのみを添付できます。
各タグは文字列で構成されています。 各タグの長さは128文字を超えないようにすることを推奨します。
タグでは大文字と小文字が区別されます。 例えば、TAG Aとタグaは異なるタグです。
フィルタリングルール
タグベースのフィルタリングは、文字列に基づく正確なフィルタリングを実装します。 次のフィルタリングルールがサポートされています。
単一タグ一致: フィルター式で単一のタグを指定して、タグが添付されているメッセージのみを受信できます。
マルチタグ一致: フィルター式に複数のタグを指定して、タグの1つがアタッチされたメッセージを受信できます。 複数のタグは2本の縦棒 (| |) で区切ります。 たとえば、Tag1 | | Tag2 | | Tag3は、Tag1、Tag2、またはTag3が添付されたメッセージがすべてコンシューマに送信されることを指定します。
すべて一致: アスタリスク (*) をワイルドカード文字として使用して、すべてのタグを一致させることができます。 これは、トピック内のすべてのメッセージがコンシューマーに送信されることを指定します。
サンプルコード
メッセージ送信前にタグを指定する
Message message = messageBuilder.setTopic("topic") // The message key. You can use the key to search for the message. .setKeys("messageKey") // The message tag. Consumers can use the tag to filter messages. // In this example, the tag of the message is set to TagA. .setTag("TagA") // The message body. .setBody("messageBody".getBytes()) .build();
単一のタグに一致するメッセージを購読する
String topic = "Your Topic"; // Subscribe to messages to which TagA is attached. FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
複数のタグに一致するメッセージを購読する
String topic = "Your Topic"; // Subscribe to messages to which TagA, TagB, or TagC is attached. FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
トピック内のすべてのメッセージを購読する
String topic = "Your Topic"; // Subscribe to all messages. FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
属性ベースのSQLフィルタリング
属性ベースのSQLフィルタリングは、ApsaraMQ for RocketMQが提供する高度なフィルタリング方法です。 このメソッドでは、メッセージはプロデューサによって指定されたキーと値のペアに基づいて照合されます。 プロデューサがメッセージを送信するとき、プロデューサは、各メッセージに対して複数の属性を指定できます。 次に、コンシューマは、特定のメッセージを受信するためにSQL式の属性を指定できます。
タグはシステム属性の一種であるため、タグベースのフィルタリングも属性ベースのSQLフィルタリングの一種です。 SQL構文では、タグ属性はTAGSで表されます。
サンプルシナリオ
以下の項目では、電子商取引のトランザクションシナリオで生成されるメッセージについて説明します。 メッセージは、注文メッセージと物流メッセージとに分類される。 地域属性は、物流メッセージに対して指定される。 リージョン属性の値は、杭州と上海です。
注文メッセージ
物流メッセージ
リージョン属性値が杭州の物流メッセージ
リージョン属性値が上海の物流メッセージ
メッセージは、次のダウンストリームシステムによってサブスクライブされているTrade_Topicという名前のトピックに送信されます。
ロジスティクスシステム1: リージョン属性値が杭州であるロジスティクスメッセージのみをサブスクライブします。
物流システム2: すべての物流メッセージを購読します。
注文追跡システム: 注文メッセージのみを購読します。
リアルタイムコンピューティングシステム: すべてのメッセージをサブスクライブします。
次の図は、フィルタリング効果を示しています。
メッセージ属性の設定
プロデューサがメッセージを送信する前に、プロデューサは各メッセージのカスタム属性を指定することができる。 各属性は、カスタムのキーと値のペアです。
メッセージごとに複数の属性を指定できます。
フィルタリングルール
フィルター式を記述するときは、SQL-92の構文に従う必要があります。 次の表に、構文を示します。
構文 | 説明 | 例 |
IS NULL | 属性が存在しないことを指定します。 |
|
IS NOT NULL | 属性が存在することを指定します。 |
|
| 数値を比較します。 構文を使用して文字列を比較することはできません。 構文を使用して文字列を比較すると、コンシューマーの起動時にエラーが報告されます。 説明
|
|
xxxとxxxの間 | 数値を比較します。 構文を使用して文字列を比較することはできません。 構文を使用して文字列を比較すると、コンシューマーの起動時にエラーが報告されます。 構文は>= xxx AND <= xxxと同等で、属性の値が2つの数値の間、または2つの数値のいずれかに等しいことを指定します。 |
|
xxxとxxxの間ではありません | 数値を比較します。 構文を使用して文字列を比較することはできません。 構文を使用して文字列を比較すると、コンシューマーの起動時にエラーが報告されます。 構文は <xxx OR > xxxと同等で、属性の値が左側の数値より小さいか、右側の数値より大きいことを指定します。 |
|
IN (xxx, xxx) | 属性の値をセットに含めることを指定します。 セット内の要素は文字列のみです。 |
|
| 等しい演算子と等しくない演算子。 演算子を使用して、数値と文字列を比較できます。 |
|
| 論理AND演算子と論理OR演算子。 演算子を使用して、単純な論理関数を組み合わせることができます。 各論理関数は括弧で囲む必要があります。 |
|
SQL属性ベースのフィルタリングでは、プロデューサはカスタムメッセージ属性を指定し、コンシューマはメッセージを消費するためのSQLフィルタ式を定義します。 その結果、フィルタリング式の計算結果が不確定となる可能性がある。 この場合、ApsaraMQ for RocketMQブローカーは、次のロジックに基づいてメッセージを処理します。
例外処理: フィルター式の計算時に例外が報告された場合、ブローカーは受信したメッセージを自動的に除外し、メッセージをコンシューマーに配信しません。 たとえば、数値と非数値が比較されるときに例外が発生します。
null値の処理: フィルター式の計算結果がNULLの場合、または値がブール値でない場合、ブローカーは受信したメッセージを自動的に除外し、メッセージをコンシューマーに配信しません。 ブール値は、真値または偽値を表す。 例えば、コンシューマがメッセージを購読する場合、コンシューマは、プロデューサがフィルタ条件として指定しない属性を使用する。 この場合、フィルタ式の計算結果はNULLとなる。
一貫性のない数値の処理: カスタムメッセージ属性の値が浮動小数点数であるが、フィルター式で使用される属性値が整数である場合、ブローカーは受信したメッセージを自動的に除外し、メッセージをコンシューマーに配信しません。
サンプルコード
メッセージ送信前にタグとカスタム属性を指定する
Message message = messageBuilder.setTopic("topic") // The message key. You can use the key to search for the message. .setKeys("messageKey") // The message tag. Consumers can use the tag to filter messages. // In this example, the message tag is set to messageTag. .setTag("messageTag") // You can also specify custom attributes for the messages, such as environment, region, and logical branch. // In this example, the custom attribute is region and the attribute value is Hangzhou. .addProperty("Region", "Hangzhou") // The message body. .setBody("messageBody".getBytes()) .build();
単一の属性に一致するメッセージを購読する
String topic = "topic"; // Subscribe only to messages whose region attribute value is Hangzhou. FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
複数の属性に一致するメッセージを購読する
String topic = "topic"; // Subscribe to messages whose region attribute value is Hangzhou and price attribute value is greater than 30. FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
トピック内のすべてのメッセージを購読する
String topic = "topic"; // Subscribe to all messages. FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
使用上の注意
トピックを適切に計画し、タグを指定する
トピック、タグ、属性を使用してメッセージを配布できます。 メッセージを配布するときは、次の原則に注意してください。
メッセージタイプ: 順序付けられたメッセージと通常のメッセージなど、異なるタイプのメッセージは、異なるトピックに配布する必要があります。 タグを使用してメッセージを分類しないでください。
ビジネスドメイン: 異なるビジネスドメインと部門は異なるトピックを使用する必要があります。 たとえば、物流メッセージと支払いメッセージのトピックは異なる必要があります。 物流メッセージは、タグを使用して通常のメッセージと緊急メッセージに分けることができます。
メッセージの量と重要性の一貫性: 量またはリンクの重要性が異なるメッセージは、異なるトピックに配布する必要があります。
よくある質問
複数のコンシューマーがトピック内の異なるタグをサブスクライブすると、メッセージが失われるのはなぜですか?
考えられる原因: コンシューマが同じコンシューマグループに属している場合、コンシューマがサブスクライブするタグは同じでなければなりません。 そうしないと、サブスクリプションの不整合が発生し、特定のメッセージが失われます。
フィルター条件を指定したときにコンシューマーが消費するメッセージの数を計算するにはどうすればよいですか。
コンシューマーが消費するメッセージの数は、フィルタリング条件を指定した後に計算されます。
オンライン消費者がメッセージを消費しなかったにもかかわらず、メッセージがグループに蓄積されるのはなぜですか?
SQLフィルタリングまたはタグベースのフィルタリングを使用してメッセージをフィルタリングすると、フィルタリング条件を満たさないメッセージが蓄積されます。 次の項目では、上記のシナリオでメッセージの蓄積数を計算する方法について説明します。
SQLフィルタリング: 累積メッセージ数=準備完了メッセージ数 + 機内メッセージ数-フィルタリング条件を満たさないメッセージ数
タグベースのフィルタリング: 累積メッセージ数=(準備完了メッセージ数 + 機内メッセージ数) × タグに一致するメッセージの割合
タグに一致するメッセージの割合=サンプル内のタグに一致するメッセージの数 /サンプリングされたメッセージの総数
関連ドキュメント
メッセージングの完全なサンプルコードについては、「概要」をご参照ください。