After a consumer subscribes to a topic, the ApsaraMQ for RocketMQ broker delivers all messages in the topic to the consumer. If a consumer wants to receive only specific messages from the topic, you can configure message attributes and filter conditions. This way, the ApsaraMQ for RocketMQ broker delivers only messages whose attributes match the filter conditions to the consumer. This topic describes the working mechanism, use scenarios, and limits of the message filtering feature. The topic also describes how to configure message filtering and provides sample code for message filtering.
Feature description
You can implement the message filtering feature by configuring message attributes and filter conditions. Message attributes are configured to classify messages sent by producers to topics, and filter conditions are configured to filter messages with specific attributes in topics. This allows the ApsaraMQ for RocketMQ broker to filter messages from producers and deliver only messages that meet the specified filter conditions to consumers.
If a consumer subscribes to a topic without specifying a filter condition, the consumer receives all messages in the topic, regardless of whether attributes are configured for the messages sent by the producers.
The following table describes the filtering methods that are supported by ApsaraMQ for RocketMQ.
Method | Description | Scenario | Instance limit | Protocol limit |
Method | Description | Scenario | Instance limit | Protocol limit |
Tag-based filtering (default) |
If the tag of a message sent by the producer matches the specified tag of the messages to which the consumer subscribes, the broker delivers the message to the consumer. | This method is suitable for simple filtering scenarios. You can add only one tag to a message. You can use this method to classify and filter messages by a single attribute. | None. | None. |
Attribute-based SQL filtering |
Messages that meet the filter conditions are delivered to the consumer. | This method is suitable for complex filtering scenarios. You can configure multiple custom attributes for a message. This method allows you to use custom SQL expressions to filter messages by multiple attributes. | Only Enterprise Platinum Edition instances support this method. | Only ApsaraMQ for RocketMQ TCP client SDKs support this method. |
Tag-based filtering
A tag is a label that classifies messages in a topic. Before an ApsaraMQ for RocketMQ producer sends a message, you can add a tag to the message. Then, a consumer can subscribe to the message based on the added tag.
Sample scenario
In an e-commerce transaction scenario, the following messages are produced:
Order messages
Payment messages
Logistics messages
The messages are sent to a topic named Trade_Topic to which the following downstream systems subscribe:
Payment system: subscribes only to payment messages.
Logistics system: subscribes only to logistics messages.
Transaction success rate analysis system: subscribes to order and payment messages.
Real-time computing system: subscribes to all messages.
The following figure shows the filtering process.
Configuration method
ApsaraMQ for RocketMQ allows you to define code in client SDKs to filter messages by using tags. Before the producer sends messages, you must add tags to the messages and specify the tags of messages to which the consumer wants to subscribe. For information about SDKs, see Overview. The following section describes how to define code for the producer and consumer:
Send messages
Before you send messages, specify a tag for each message.
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
Subscribe to all messages
If a consumer wants to subscribe to all messages in a topic, use an asterisk (*) to specify all tags.
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Subscribe to a specific type of message
If a consumer wants to subscribe to a specific type of message in a topic, specify the corresponding tag.
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Subscribe to multiple types of messages
If a consumer wants to subscribe to multiple types of messages in a topic, specify the corresponding tags. Separate multiple tags with two vertical bars (||).
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Incorrect example
If a consumer has multiple subscriptions to a topic, and each subscription has a different tag, the consumer receives only messages whose tag matches the tag specified in the most recent subscription.
// In the following code, a consumer can receive messages with TagB in MQ_TOPIC but cannot receive messages with TagA. consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Attribute-based SQL filtering
To use the attribute-based SQL filtering method, perform the following steps: configure custom attributes for messages before the producer sends the messages and define a filter expression by using SQL syntax to specify the attributes of messages to which a consumer wants to subscribe. ApsaraMQ for RocketMQ filters messages whose custom attributes match the calculated results of the filter expression and delivers the messages to the consumer.
A tag is a special type of message attribute. The attribute-based SQL filtering method is compatible with the tag-based filtering method. You can use SQL expressions to specify the tags used for message filtering. In SQL syntax, the tag attribute is represented by TAGS.
Limits
When you use the attribute-based SQL filtering method to filter messages, take note of the following limits:
Only Enterprise Platinum Edition instances support this method.
Only TCP clients support this method.
If the broker does not support the attribute-based SQL filtering method and you define a filter expression for a consumer, an error is reported when the consumer is started or the consumer may not receive messages.
Sample scenario
The following items describe the messages that are generated in an e-commerce transaction scenario. The messages are classified into order messages and logistics messages. A region attribute is specified for logistics messages. The values of the region attribute are Hangzhou and Shanghai.
Order messages
Logistics messages
Logistics messages whose value of the region attribute is Hangzhou
Logistics messages whose value of the region attribute is Shanghai
The messages are sent to a topic named Trade_Topic to which the following downstream systems subscribe:
Logistics system 1: subscribes only to logistics messages whose region attribute value is Hangzhou.
Logistics System 2: subscribes to all logistics messages.
Order tracking system: subscribes only to order messages.
Real-time computing system: subscribes to all messages.
The following figure shows the filtering process.
Configuration method
ApsaraMQ for RocketMQ allows you to define code in client SDKs to filter messages by using SQL expressions. You must configure custom message attributes in the producer code to send messages and define a filter expression by using SQL syntax in the consumer code to subscribe to messages.
The following limits are imposed on message attributes:
Before a producer sends messages, the producer can specify custom attributes for each message. Each attribute is a custom key-value pair.
The key in an attribute can contain letters, digits, and underscores (_).
The key in an attribute must start with a letter or an underscore (_).
You can specify multiple attributes for each message.
For information about SDKs, see Overview. The following section describes how to define code for the producer and consumer:
Producer
Configure custom message attributes.
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes()); // Configure Attribute A and set the attribute value to 1. msg.putUserProperties("A", "1");
Consumer
Define a filter expression by using SQL syntax to filter messages based on the custom attribute.
To filter messages based on a custom attribute, you must define logic in the filter expression to check whether the message attribute exists. If the attribute does not exist, the calculated result of the filter expression is NULL and the message is not delivered to the consumer.
// Subscribe to messages that have Attribute A and whose attribute value is 1. consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
The following table describes different types of SQL syntax that can be used to define filter expressions.
Syntax | Description | Example |
Syntax | Description | Example |
IS NULL | Specifies that an attribute does not exist. |
|
IS NOT NULL | Specifies that an attribute exists. |
|
| Compares numeric values. You cannot use the syntax to compare strings. Otherwise, an error is reported when the consumer is started. Strings that can be converted into numeric values are considered numeric values. |
|
BETWEEN xxx AND xxx | Compares numeric values. You cannot use the syntax to compare strings. Otherwise, an error is reported when the consumer is started. The syntax is equivalent to >= xxx AND <= xxx, which specifies that the value of the attribute is between two numeric values or equal to one of the two numeric values. |
|
NOT BETWEEN xxx AND xxx | Compares numeric values. You cannot use the syntax to compare strings. Otherwise, an error is reported when the consumer is started. The syntax is equivalent to < xxx OR > xxx, which specifies that the value of the attribute is less than the left-side numeric value or greater than the right-side numeric value. |
|
IN (xxx, xxx) | Specifies that the value of the attribute is included in a set. The elements in the set can only be strings. |
|
| The equal to operator and the not equal to operator. You can use the operators to compare numeric values and strings. |
|
| The logical AND operator and the logical OR operator. You can use the operators to combine simple logical functions. Each logical function must be enclosed in parentheses. |
|
You can implement attribute-based SQL filtering by configuring custom message attributes and defining an SQL filter expression. The filter expression may not generate valid results. The ApsaraMQ for RocketMQ broker processes messages based on the following logic:
If an error is reported during the calculation of the filter expression, the broker automatically filters out received messages and does not deliver the messages to the consumer. For example, an exception occurs when numeric and non-numeric values are compared.
If the calculated result of the filter expression is NULL or the value is not a Boolean value, the broker automatically filters out received messages and does not deliver the messages to the consumer. A Boolean value represents a truth value, which can be true or false. For example, when a consumer subscribes to a message, the consumer uses an attribute that is not specified by the producer as a filter condition. In this case, the calculation result of the filter expression is NULL.
If the values of the custom message attribute are floating-point numbers but the attribute values used in the filter expression are integers, the broker automatically filters out received messages and does not deliver the messages to the consumer.
Sample code
Send messages
Configure a tag and custom attributes for the message.
Producer producer = ONSFactory.createProducer(properties); // Set the value of Tag to tagA. Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes()); // Set the value of the custom attribute region to hangzhou. msg.putUserProperties("region", "hangzhou"); // Set the value of the custom attribute price to 50. msg.putUserProperties("price", "50"); SendResult sendResult = producer.send(msg);
Subscribe to messages based on a custom attribute.
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe to only messages whose value of the custom attribute region is hangzhou. If you do not configure this attribute for a message or the attribute value of the message is not hangzhou, the message is not delivered to the consumer. consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected results: The message sent in the example has the custom attribute region and the attribute value is hangzhou. The message meets the filter conditions and is delivered to the consumer.
Subscribe to messages based on the tag and custom attribute.
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe only to messages that have tagA and whose value of the custom attribute price is greater than 30. consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected results: The message sent in the example has tagA and the custom attribute price, and the value of the custom attribute price is greater than 30. The message meets the filter conditions and is delivered to the consumer.
Subscribe to messages based on multiple custom attributes.
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe only to messages whose value of the custom attribute region is hangzhou and the value of the custom attribute price is less than 20. consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected result: The message is not delivered to the consumer because the message does not meet the filter conditions. The consumer subscribes to messages whose value of the custom attribute price is less than 20, but the value of the custom attribute price specified for the message in the producer is 50.
Subscribe to all messages in the topic.
Consumer consumer = ONSFactory.createConsumer(properties); // To subscribe to all messages in the topic, set the value of the SQL expression to TRUE. consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected results: All messages in the topic are delivered to the consumer.
Incorrect example
Before a producer sends the message, a custom attribute is not configured for a message, and no logic is defined in the filter expression to check whether the custom attribute exists. The custom attribute is directly used as a filter condition in the expression. In this case, the message is not delivered to the consumer.
Consumer consumer = ONSFactory.createConsumer(properties); // The message attribute product is not configured during message sending. The filtering fails and the message is not delivered to the consumer. consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
References
Consumer instances that use the same group ID must subscribe to the same topics. For more information, see Subscription consistency.
You can use topics and tags to classify messages for different services. For more information, see Best practices of topics and tags.