After a consumer subscribes to a topic, the ApsaraMQ for RocketMQ broker delivers all messages in the topic to the consumer. If the consumer needs only some messages from the topic, you can configure message attributes and filter conditions. The ApsaraMQ for RocketMQ broker then delivers only the messages whose attributes match the filter conditions to the consumer. This topic describes the working mechanism, scenarios, and limits of the message filtering feature. The topic also describes how to configure message filtering and provides a sample code for message filtering.
Description
The message filtering feature is implemented by configuring message attributes and filter conditions. You can configure message attributes to classify the messages that producers send to topics. In addition, you need to configure filter conditions so that the consumers can subscribe to messages with specified attributes in the topics. This way, the broker can filter the messages sent from the producers and deliver only messages that meet the specified filter conditions to the consumers.
If a consumer subscribes to a topic but no filter condition is specified, all messages in the topic are delivered to the consumer regardless of whether attributes are configured for the messages when they are sent from the producers.
The following table describes the message filtering methods supported by ApsaraMQ for RocketMQ.
Method | Description | Scenario | Limits on instances | Limits on protocols |
Filter messages by using tags (default method) |
If the tag of a message sent from a producer matches the specified tag of messages to which the consumer subscribes, the broker delivers the message to the consumer. | This method applies to simple filtering scenarios. You can add only one tag to a message. Therefore, this method can be used when you need to classify and filter messages by a single attribute. | None | None |
Filter messages by using SQL expressions |
Messages that meet the filter conditions are delivered to the consumer. | This method applies to complex filtering scenarios. You can configure multiple custom attributes for a message. This method allows you to use custom SQL expressions to filter messages by multiple attributes. | Only Enterprise Platinum Edition instances support this method. | Only the commercial edition of the TCP client SDK supports this method. |
Filter messages by using tags
A tag is a label that classifies messages in a topic. You can add a tag to a message when the ApsaraMQ for RocketMQ producer sends the message. Consumers can subscribe to the message based on the tag attached to it.
Scenarios
The following figure shows an example in the e-commerce transaction scenario. A series of messages are generated in the process, from placing an order to receiving the product. The following information provides examples of related messages:
Order messages
Payment messages
Logistics messages
These messages are sent to the topic named Trade_Topic and subscribed to by different systems. For example:
Payment system: subscribes only to messages related to payment.
Logistics system: subscribes only to messages related to logistics.
Transaction success rate analysis system: subscribes to messages related to orders and payment.
Real-time computing system: subscribes to all messages related to transactions.
The following figure shows the filtering process.
Configuration method
ApsaraMQ for RocketMQ allows you to define codes in client SDKs to filter messages by using tags. You must add tags to messages when the producer sends the messages, and specify the tags of messages to which the consumer wants to subscribe. For more information about client SDKs, see Overview. The following information shows how to define codes for the producer and consumer:
Send messages
Specify a tag for each message before the message is sent.
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
Subscribe to all messages
If you want to configure a consumer to subscribe to all messages in a topic, use an asterisk (*) to indicate all tags.
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Subscribe to messages of a specific type
If you want to configure a consumer to subscribe to messages of a specific type in a topic, specify the corresponding tag.
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Subscribe to messages of multiple types
If you want to configure a consumer to subscribe to messages of multiple types in a topic, specify corresponding tags and separate them with two vertical bars (||).
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Error example
If a consumer has multiple subscriptions to a topic, and each subscription has a different tag, the consumer receives only messages whose tag matches the tag specified in the latest subscription.
// In the following code, a consumer can receive only messages with TagB in MQ_TOPIC but cannot receive messages with TagA. consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Filter messages by using SQL expressions
To use SQL expressions to filter messages, perform the following steps: configure custom attributes for messages when the producer sends the messages, and define a filter expression by using SQL syntaxes to specify the attributes of messages to which a consumer wants to subscribe. ApsaraMQ for RocketMQ selects messages whose custom attributes match the calculated results of the filter expression, and delivers these messages to the consumer.
Tags belong to a special type of message attribute. Therefore, the SQL-expression-based filtering method is compatible with the tag-based filtering method. You can use SQL expressions to specify the tags used for message filtering. In SQL syntaxes, the tag attribute is represented by TAGS.
Limits
When you use SQL expressions to filter messages, take note of the following limits:
Only Enterprise Platinum Edition instances support this feature. Standard Edition instances do not support this feature.
You can use only TCP clients to filter messages by using SQL expressions. HTTP clients do not support this feature.
If the broker does not allow you to filter messages by using SQL expressions but you define a filter expression for a consumer, an error is reported when the consumer is started, or the consumer cannot receive messages.
Scenarios
The following figure shows an example in the e-commerce transaction scenario. A series of messages are generated in the process, from placing an order to receiving the product. The messages are classified into order messages and logistics messages. A region attribute is configured for the logistics messages, and the values of the region attribute are Hangzhou and Shanghai.
Order messages
Logistics messages
Logistics messages whose value of the region attribute is Hangzhou
Logistics messages whose value of the region attribute is Shanghai
These messages are sent to the Trade_Topic topic and subscribed to by different systems. For example:
Logistics System 1: subscribes to only logistics messages whose value of the region attribute is Hangzhou.
Logistics System 2: subscribes to all logistics messages.
Order tracking system: subscribes to only order messages.
Real-time computing system: subscribes to all messages related to transactions.
The following figure shows the filtering process.
Configuration method
ApsaraMQ for RocketMQ allows you to define codes in client SDKs to filter messages by using SQL expressions. You must configure custom message attributes in the code for the producer to send messages, and define a filter expression by using SQL syntaxes in the code for the consumer to subscribe to messages. For more information about client SDKs, see Overview. The following information shows how to define codes for the producer and consumer:
Producer:
Configure custom message attributes.
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes()); // Configure Attribute A and set the attribute value to 1. msg.putUserProperties("A", "1");
Consumer:
Define a filter expression by using SQL syntaxes to filter messages based on the custom attribute.
ImportantTo filter messages based a custom attribute, you must first define logic in the filter expression to check whether the message attribute exists. If the attribute does not exist, the calculated result of the filter expression is NULL and the message will not be delivered to the consumer.
// Subscribe to messages with Attribute A and the attribute value is 1. consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
The following table describes different types of SQL syntaxes that can be used to define the filter expression.
Syntax | Description | Example |
IS NULL | Specifies that an attribute does not exist. |
|
IS NOT NULL | Specifies that an attribute exists. |
|
| Compares numeric values. The syntax cannot be used to compare strings. If it is used to compare strings, an error is reported when the consumer is started. Note Strings that can be converted into numeric values are also considered as numeric values. |
|
BETWEEN xxx AND xxx | Compares numeric values. The syntax cannot be used to compare strings. If it is used to compare strings, an error is reported when the consumer is started. The syntax is equivalent to >= xxx AND <= xxx. It means that the value of the attribute is between two numeric values or equal to either of the two numeric values. |
|
NOT BETWEEN xxx AND xxx | Compares numeric values. The syntax cannot be used to compare strings. If it is used to compare strings, an error is reported when the consumer is started. The syntax is equivalent to < xxx OR > xxx. It means that the value of the attribute is less than the left-side numeric value or greater than the right-side numeric value. |
|
IN (xxx, xxx) | Indicates that the value of the attribute is included in a set. The elements in the set can only be strings. |
|
| Compares numeric values and strings to check whether the attribute value is equal to the given numeric value or string or not. |
|
| The logical AND operator and the logical OR operator. They can be used to combine simple logical functions, and each logical function must be put in parentheses. |
|
SQL-expression-based filtering is implemented by configuring custom message attributes and defining an SQL filter expression. The filter expression may not generate valid results. The Message Queue for Apache RocketMQ broker processes messages based on the following logic:
If an error is reported during the calculation of the filter expression, the broker filters out received messages by default and does not deliver the messages to the consumer. For example, an error occurs when numeric values and non-numeric values are compared.
If the calculated result of the filter expression is NULL or the value is not a Boolean value, the broker filters out received messages by default and does not deliver the messages to the consumer. A Boolean value represents a truth value, which can be true or false. Assume that you did not configure a custom attribute for a message that the producer sends, but this custom attribute is used as a filter condition in the SQL expression. In this case, the calculation result of the filter expression is NULL.
If the values of the custom message attribute are floating-point numbers but the attribute values used in the filter expression are integers, the broker filters out received messages by default and does not deliver the messages to the consumer.
Sample code
Send a message.
Configure a tag and custom attributes for the message.
Producer producer = ONSFactory.createProducer(properties); // Set the value of Tag to tagA. Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes()); // Set the value of the custom attribute region to hangzhou. msg.putUserProperties("region", "hangzhou"); // Set the value of the custom attribute price to 50. msg.putUserProperties("price", "50"); SendResult sendResult = producer.send(msg);
Subscribe to messages based on a custom attribute.
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe to only messages whose value of the custom attribute region is hangzhou. If this attribute is not configured for a message or the attribute value of the message is not hangzhou, the message will not be delivered to the consumer. consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected results: The message sent in the example has the custom attribute region and the attribute value is hangzhou. The message meets the filter conditions and is delivered to the consumer.
Subscribe to messages based on both the tag and custom attribute.
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe to only messages with tagA and their values of the custom attribute price are greater than 30. consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected results: The message sent in the example has both tagA and the custom attribute price, and the value of the custom attribute price is greater than 30. The message meets the filter conditions and is delivered to the consumer.
Subscribe to messages based on multiple custom attributes.
Consumer consumer = ONSFactory.createConsumer(properties); // Subscribe to only messages whose value of the custom attribute region is hangzhou and value of the custom attribute price is less than 20. consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected result: The message is not delivered to the consumer because the message does not meet the filter conditions. The consumer subscribes to messages whose value of the custom attribute price is less than 20, but the value of the custom attribute price set for the message in the producer is 50.
Subscribe to all messages in the topic.
Consumer consumer = ONSFactory.createConsumer(properties); // Set the value of the SQL expression to TRUE to subscribe to all messages in the topic. consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
Expected results: All messages in the topic are delivered to the consumer.
Error example
A custom attribute is not configured for a message when the producer sends the message, and no logic is defined in the filter expression to check whether the custom attribute exists. The custom attribute is directly used as a filter condition in the expression. In this case, the message is not delivered to the consumer.
Consumer consumer = ONSFactory.createConsumer(properties); // The message attribute product is not configured during message sending. The filtering fails and the message will not be delivered to the consumer. consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
References
Consumer instances that use the same group ID must subscribe to the same topics. For more information, see Subscription consistency.
You can use topics and tags to classify messages for different services. For more information, see Best practices of topics and tags.