消费者订阅了某个主题后,云消息队列 RocketMQ 版会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在云消息队列 RocketMQ 版服务端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。
应用场景
云消息队列 RocketMQ 版作为发布订阅模型的消息中间件广泛应用于上下游业务集成场景。在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集。
使用云消息队列 RocketMQ 版的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。
云消息队列 RocketMQ 版主要解决的单个业务域即同一个主题内不同消息子集的过滤问题,一般是基于同一业务下更具体的分类进行过滤匹配。如果是需要对不同业务域的消息进行拆分,建议使用不同主题处理不同业务域的消息。
功能概述
消息过滤定义
过滤的含义指的是将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉。
云消息队列 RocketMQ 版的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在云消息队列 RocketMQ 版服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。
消息过滤原理
消息过滤主要通过以下几个关键流程实现:
生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
服务端:消费者获取消息时会触发服务端的动态过滤计算,云消息队列 RocketMQ 版服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。
消息过滤分类
云消息队列 RocketMQ 版支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:
对比项 | Tag标签过滤 | SQL属性过滤 |
过滤目标 | 消息的Tag标签。 | 消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。 |
过滤能力 | 精准匹配。 | SQL语法匹配。 |
适用场景 | 简单过滤场景、计算逻辑简单轻量。 | 复杂过滤场景、计算逻辑较复杂。 |
订阅关系一致性
过滤表达式属于订阅关系的一部分,云消息队列 RocketMQ 版的领域模型规定,同一消费者分组内的多个消费者的订阅关系包括过滤表达式,必须保持一致,否则可能会导致部分消息消费不到。更多信息,请参见订阅关系(Subscription)。
Tag标签过滤
Tag标签过滤方式是云消息队列 RocketMQ 版提供的基础消息过滤能力,基于生产者为消息设置的Tag标签进行匹配。生产者在发送消息时,设置消息的Tag标签,消费者需指定已有的Tag标签来进行匹配订阅。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息:
订单消息
支付消息
物流消息
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的下游系统所订阅:
支付系统:只需订阅支付消息。
物流系统:只需订阅物流消息。
交易成功率分析系统:需订阅订单和支付消息。
实时计算系统:需要订阅所有和交易相关的消息。
过滤效果如下图所示:
Tag标签设置
Tag由生产者发送消息时设置,每条消息允许设置一个Tag标签。
Tag使用可见字符,建议长度不超过128字符。
Tag匹配时大小写敏感,例如,TAG A和tag a属于两个标签。
Tag标签过滤规则
Tag标签过滤为精准字符串匹配,过滤规则设置格式如下:
单Tag匹配:过滤表达式为目标Tag。表示只有消息标签为指定目标Tag的消息符合匹配条件,会被发送给消费者。
多Tag匹配:多个Tag之间为或的关系,不同Tag间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为Tag1或Tag2或Tag3的消息都满足匹配条件,都会被发送给消费者进行消费。
全部匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。
使用示例
发送消息,设置Tag标签。
Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 //该示例表示消息的Tag设置为“TagA”。 .setTag("TagA") //消息体。 .setBody("messageBody".getBytes()) .build();
订阅消息,匹配单个Tag标签。
String topic = "Your Topic"; //只订阅消息标签为“TagA”的消息。 FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
订阅消息,匹配多个Tag标签。
String topic = "Your Topic"; //只订阅消息标签为“TagA”、“TagB”或“TagC”的消息。 FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
订阅消息,匹配Topic中的所有消息,不进行过滤。
String topic = "Your Topic"; //使用Tag标签过滤消息,订阅所有消息。 FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); pushConsumer.subscribe(topic, filterExpression);
SQL属性过滤
SQL属性过滤是云消息队列 RocketMQ 版提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置SQL语法的过滤表达式过滤多个属性。
Tag是一种系统属性,所以SQL过滤方式也兼容Tag标签过滤。在SQL语法中,Tag的属性名称为TAGS。
场景示例
以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,按照类型将消息分为订单消息和物流消息,其中给物流消息定义地域属性,按照地域分为杭州和上海:
订单消息
物流消息
物流消息且地域为杭州
物流消息且地域为上海
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅:
物流系统1:只需订阅物流消息且消息地域为杭州。
物流系统2:只需订阅物流消息且消息地域为杭州或上海。
订单跟踪系统:只需订阅订单消息。
实时计算系统:需要订阅所有和交易相关的消息。
过滤效果如下图所示:
消息属性设置
生产者发送消息时可以自定义消息属性,每个属性都是一个自定义的键值对(Key-Value)。
每条消息支持设置多个属性。
SQL属性过滤规则
SQL属性过滤使用SQL92语法作为过滤规则表达式,语法规范如下:
语法 | 说明 | 示例 |
IS NULL | 判断属性不存在。 |
|
IS NOT NULL | 判断属性存在。 |
|
| 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明
|
|
BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。 |
|
NOT BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。 |
|
IN (xxx, xxx) | 表示属性的值在某个集合内。集合的元素只能是字符串。 |
|
| 等于和不等于。可用于比较数字和字符串。 |
|
| 逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。 |
|
由于SQL属性过滤是生产者定义消息属性,消费者设置SQL过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:
异常情况处理:如果过滤条件的表达式计算抛出异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。
空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为null。
数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。
使用示例
发送消息,同时设置消息Tag标签和自定义属性。
Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 //该示例表示消息的Tag设置为“messageTag”。 .setTag("messageTag") //消息也可以设置自定义的分类属性,例如环境标签、地域、逻辑分支。 //该示例表示为消息自定义一个属性,该属性为地域,属性值为杭州。 .addProperty("Region", "Hangzhou") //消息体。 .setBody("messageBody".getBytes()) .build();
订阅消息,根据单个自定义属性匹配消息。
String topic = "topic"; //只订阅地域属性为杭州的消息。 FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
订阅消息,同时根据多个自定义属性匹配消息。
String topic = "topic"; //只订阅地域属性为杭州且价格属性大于30的消息。 FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
订阅消息,匹配Topic中的所有消息,不进行过滤。
String topic = "topic"; //订阅所有消息。 FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); simpleConsumer.subscribe(topic, filterExpression);
使用建议
合理划分主题和Tag标签
从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的Tag标签及属性进行筛选。关于拆分方式的选择,应遵循以下原则:
消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题进行拆分,无法通过Tag标签进行分类。
业务域是否相同:不同业务域和部门的消息应该拆分不同的主题。例如物流消息和支付消息应该使用两个不同的主题;同样是一个主题内的物流消息,普通物流消息和加急物流消息则可以通过不同的Tag进行区分。
消息量级和重要性是否一致:如果消息的量级规模存在巨大差异,或者说消息的链路重要程度存在差异,则应该使用不同的主题进行隔离拆分。
消息过滤常见问题
多个消费者订阅同一个Topic下的不同Tag,出现消息丢失情况。
可能原因:若多个消费者是通过同一个消费者分组(Group ID)订阅的指定Topic,则所有消费者的过滤条件即订阅的Tag要一致,否则会出现订阅关系不一致,导致部分消息丢失。
消费者设置了过滤条件,消息消费数量是按照过滤前还是过滤后计算呢?
按照过滤后的消息数量计算。
消费者在线无消费消息,但Group有堆积。
采用SQL/TAG消费过滤的方式,未被过滤条件命中的消息会计算为消息堆积,堆积量的计算如下所示。
SQL消费方式:堆积量 = 已就绪的消息量 + 处理中的消息量 - 未被SQL命中的消息数量。
TAG消费方式:堆积量 =(已就绪的消息量 + 处理中的消息量)* TAG标签消息百分比。
TAG标签消息百分比 = 采样内TAG标签消息量 / 采样消息总量
相关文档
完整的消息收发示例代码,请参见SDK参考概述。