After a consumer subscribes to a topic, ApsaraMQ for RocketMQ delivers all messages in the topic to the consumer. If you want the consumer to receive only specific messages in the topic, you can configure filtering conditions to filter messages on the ApsaraMQ for RocketMQ broker. This prevents the consumer from receiving a large number of invalid messages.
Scenarios
ApsaraMQ for RocketMQ is a middleware service that is based on the publish-subscribe (pub/sub) model and used to integrate upstream and downstream business. In actual business scenarios, messages in a topic are processed by different downstream applications that have different consumption logic. Each application needs to consume only messages that meet its consumption logic.
You can use the message filtering feature provided by ApsaraMQ for RocketMQ to efficiently filter the messages that a consumer requires. This prevents a large number of invalid messages from being delivered to the consumer and reduces the workload in the downstream systems.
The message filtering feature of ApsaraMQ for RocketMQ solves the pain point of filtering messages in a topic and is used in scenarios in which detailed categorization is required for the same line of business. If you want to manage messages for different lines of business, we recommend that you use different topics.
Overview
What is message filtering?
The message filtering feature of ApsaraMQ for RocketMQ filters messages based on consumer-configured conditions and sends the messages that meet the conditions to the consumers.
After message attributes and tags are defined for producers and consumers, messages are filtered and matched based on the filtering conditions on the ApsaraMQ for RocketMQ broker. Only messages that meet the filtering conditions are delivered to consumers.
How does message filtering work?
Message filtering involves the following steps:
Producer: Before a producer initializes messages, the producer attaches attributes and tags that are used to match the filtering conditions configured by a consumer to the messages.
Consumer: During message initialization and consumption, a consumer calls the subscription registration operation to inform the ApsaraMQ for RocketMQ broker of the messages to which the consumer wants to subscribe in a specific topic. This is also known as reporting the filtering conditions.
Broker: When a consumer consumes messages, the ApsaraMQ for RocketMQ broker dynamically matches messages based on the expressions of the reported filtering conditions and delivers the messages that meet the filtering conditions to the consumer.
Classification
ApsaraMQ for RocketMQ supports the tag-based filtering and attribute-based SQL filtering methods. The following table describes the methods.
Item | Tag-based filtering | Attribute-based SQL filtering |
Target | Message tags. | Message attributes, which include custom attributes and system attributes. Message tags are a type of system attribute. |
Capacity | Exact match. | SQL syntax-based match. |
Scenario | Filtering scenarios that involve simple and lightweight computing logic. | Filtering scenarios that involve complex computing logic. |
For more information, see Tag-based filtering and Attribute SQL-based filtering.
Subscription consistency
Filtering expressions constitute a part of a subscription. Based on the domain model of ApsaraMQ for RocketMQ, the filtering expressions of subscriptions of consumers in the same consumer group must be the same. Otherwise, specific messages cannot be consumed. For more information, see Subscriptions.
Tag-based filtering
Tag-based filtering is a basic filtering method provided by ApsaraMQ for RocketMQ. In this method, messages are matched based on the tags that are defined by producers. Consumers use the tags to specify the messages that they want to consume.
Sample scenario
The following items describe messages that are produced in an e-commerce transaction scenario:
Order messages
Payment messages
Logistics messages
The messages are sent to a topic named Trade_Topic, which is subscribed to by the following downstream systems:
Payment system: subscribes only to payment messages.
Logistics system: subscribes only to logistics messages.
Transaction success rate analysis system: subscribes to order and payment messages.
Real-time computing system: subscribes to all messages.
The following figure shows the filtering effect.
Tag setting
Before a producer sends messages, the producer can attach only one tag to each message.
Each tag consists of a string of characters. We recommend that each tag do not exceed 128 characters in length.
Tags are case-sensitive. For example, TAG A and tag a are different tags.
Filtering rules
Tag-based filtering implements precise filtering based on character strings. The following filtering rules are supported:
Single-tag match: You can specify a single tag in the filter expression to receive only messages to which the tag is attached.
Multi-tag match: You can specify multiple tags in the filter expression to receive messages to which one of the tags is attached. Separate multiple tags with two vertical bars (||). For example, Tag1||Tag2||Tag3 specifies that messages to which Tag1, Tag2, or Tag3 is attached are all sent to the consumer.
All match: You can use an asterisk (*) as a wildcard character to match all tags. This specifies that all messages in the topic are sent to the consumer.
Sample code
Specify a tag before message sending
Message message = messageBuilder.setTopic("topic") // The message key. You can use the key to search for the message. .setKeys("messageKey") // The message tag. Consumers can use the tag to filter messages. // In this example, the tag of the message is set to TagA. .setTag("TagA") // The message body. .setBody("messageBody".getBytes()) .build();
Subscribe to messages that match a single tag
String topic = "Your Topic"; // Subscribe to messages to which TagA is attached. FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
Subscribe to messages that match multiple tags
String topic = "Your Topic"; // Subscribe to messages to which TagA, TagB, or TagC is attached. FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
Subscribe to all messages in the topic
String topic = "Your Topic"; // Subscribe to all messages. FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
Attribute-based SQL filtering
Attribute-based SQL filtering is an advanced filtering method provided by ApsaraMQ for RocketMQ. In this method, messages are matched based on the key-value pair that is specified by producers. When producers send messages, the producers can specify multiple attributes for each message. Then, the consumer can specify attributes in SQL expressions to receive specific messages.
Tag-based filtering is also a type of attribute-based SQL filtering because tags are a type of system attribute. In SQL syntax, the tag attribute is represented by TAGS.
Sample scenario
The following items describe messages that are generated in an e-commerce transaction scenario. The messages are classified into order messages and logistics messages. A region attribute is specified for the logistics messages. The values of the region attribute are Hangzhou and Shanghai.
Order messages
Logistics messages
Logistics messages whose region attribute value is Hangzhou
Logistics messages whose region attribute value is Shanghai
The messages are sent to a topic named Trade_Topic, which is subscribed to by the following downstream systems:
Logistics system 1: subscribes only to logistics messages whose region attribute value is Hangzhou.
Logistics system 2: subscribes to all logistics messages.
Order tracking system: subscribes only to order messages.
Real-time computing system: subscribes to all messages.
The following figure shows the filtering effect.
Message attribute setting
Before a producer sends messages, the producer can specify custom attributes for each message. Each attribute is a custom key-value pair.
Multiple attributes can be specified for each message.
Filtering rules
You must follow the SQL-92 syntax when you write filter expressions. The following table describes the syntax.
Syntax | Description | Example |
IS NULL | Specifies that an attribute does not exist. |
|
IS NOT NULL | Specifies that an attribute exists. |
|
| Compares numeric values. You cannot use the syntax to compare strings. If you use the syntax to compare strings, an error is reported when the consumer is started. Note
|
|
BETWEEN xxx AND xxx | Compares numeric values. You cannot use the syntax to compare strings. If you use the syntax to compare strings, an error is reported when the consumer is started. The syntax is equivalent to >= xxx AND <= xxx, which specifies that the value of the attribute is between two numeric values or equal to one of the two numeric values. |
|
NOT BETWEEN xxx AND xxx | Compares numeric values. You cannot use the syntax to compare strings. If you use the syntax to compare strings, an error is reported when the consumer is started. The syntax is equivalent to < xxx OR > xxx, which specifies that the value of the attribute is less than the left-side numeric value or greater than the right-side numeric value. |
|
IN (xxx, xxx) | Specifies that the value of the attribute is included in a set. The elements in the set can only be strings. |
|
| The equal to operator and the not equal to operator. You can use the operators to compare numeric values and strings. |
|
| The logical AND operator and the logical OR operator. You can use the operators to combine simple logical functions. Each logical function must be enclosed in parentheses. |
|
In SQL attribute-based filtering, producers specify custom message attributes, and then consumers define SQL filter expressions to consume messages. As a result, the calculation results of filtering expressions may be uncertain. In this case, the ApsaraMQ for RocketMQ broker processes messages based on the following logic:
Exception handling: If an exception is reported when a filter expression is being calculated, the broker automatically filters out the received messages and does not deliver the messages to the consumer. For example, an exception occurs when numeric values and non-numeric values are compared.
Handling of null values: If the calculated result of the filter expression is NULL or the value is not a Boolean value, the broker automatically filters out the received messages and does not deliver the messages to the consumer. A Boolean value represents a truth value, which can be true or false. For example, when a consumer subscribes to a message, the consumer uses an attribute that the producer does not specify as a filter condition. In this case, the calculation result of the filter expression is NULL.
Handling of inconsistent numeric values: If the values of a custom message attribute are floating-point numbers, but the attribute values that are used in the filter expression are integers, the broker automatically filters out the received messages and does not deliver the messages to the consumer.
Sample code
Specify a tag and custom attributes before message sending
Message message = messageBuilder.setTopic("topic") // The message key. You can use the key to search for the message. .setKeys("messageKey") // The message tag. Consumers can use the tag to filter messages. // In this example, the message tag is set to messageTag. .setTag("messageTag") // You can also specify custom attributes for the messages, such as environment, region, and logical branch. // In this example, the custom attribute is region and the attribute value is Hangzhou. .addProperty("Region", "Hangzhou") // The message body. .setBody("messageBody".getBytes()) .build();
Subscribe to messages that match a single attribute
String topic = "topic"; // Subscribe only to messages whose region attribute value is Hangzhou. FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
Subscribe to messages that match multiple attributes
String topic = "topic"; // Subscribe to messages whose region attribute value is Hangzhou and price attribute value is greater than 30. FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
Subscribe to all messages in the topic
String topic = "topic"; // Subscribe to all messages. FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
Usage notes
Properly plan topics and specify tags
You can use topics, tags, and attributes to distribute messages. Take note of the following principles when you distribute messages:
Message type: Messages of different types, such as ordered messages and normal messages, must be distributed to different topics. Do not use tags to categorize messages.
Business domain: Different business domains and departments must use different topics. For example, the topics for logistics messages and payment messages must be different. Logistics messages can be divided into ordinary messages and urgent messages by using tags.
Consistency between message quantity and importance: Messages that differ in quantity or link importance must be distributed to different topics.
FAQ
Why are messages lost when multiple consumers subscribe to different tags in a topic?
Possible cause: If the consumers belong to the same consumer group, the tags to which the consumers subscribe must be the same. Otherwise, subscription inconsistency occurs and specific messages are lost.
How do I calculate the number of messages that are consumed by a consumer when filtering conditions are specified?
The number of messages that a consumer consumes is calculated after the filtering conditions are specified.
Why are messages accumulated in a group even though online consumers did not consume messages?
If you use SQL filtering or tag-based filtering to filter messages, messages that do not meet the filtering conditions are accumulated. The following items describe how to calculate the number of accumulated messages in the preceding scenarios:
SQL filtering: Number of accumulated messages = Number of ready messages + Number of inflight messages - Number of messages that do not meet the filtering conditions
Tag-based filtering: Number of accumulated messages = (Number of ready messages + Number of inflight messages) × Percentage of messages that match the tags
Percentage of messages that match the tags = Number of messages that match the tags in the sample/Total number of sampled messages
References
For information about the complete sample code for messaging, see Overview.