消費者訂閱了某個Topic後,雲訊息佇列 RocketMQ 版會將該Topic中的所有訊息投遞給消費端進行消費。若消費者只需要關注部分訊息,可通過設定過濾條件在雲訊息佇列 RocketMQ 版服務端完成訊息過濾,只消費需要關注的訊息。本文介紹訊息過濾的功能描述、應用情境、使用限制、配置方式及範例程式碼。
功能描述
訊息過濾功能指訊息生產者向Topic中發送訊息時,設定訊息屬性對訊息進行分類,消費者訂閱Topic時,根據訊息屬性設定過濾條件對訊息進行過濾,只有符合過濾條件的訊息才會被投遞到消費端進行消費。
消費者訂閱Topic時若未設定過濾條件,無論訊息發送時是否有設定過濾屬性,Topic中的所有訊息都將被投遞到消費端進行消費。
雲訊息佇列 RocketMQ 版支援的訊息過濾方式如下:
過濾方式 | 說明 | 情境 | 執行個體限制 | 協議限制 |
Tag過濾(預設過濾方式) |
訂閱的Tag和寄件者設定的訊息Tag一致,則訊息被投遞給消費端進行消費。 | 簡單過濾情境。 一條訊息支援設定一個Tag,僅需要對Topic中的訊息進行一級分類並過濾時使用此方式。 | 無。 | 無。 |
SQL屬性過濾 |
滿足過濾運算式的訊息被投遞給消費端進行消費。 | 複雜過濾情境。 一條訊息支援設定多個自訂屬性,可根據SQL文法自訂群組合多種類型的運算式對訊息進行多級分類並實現多維度過濾。 | 僅企業鉑金版執行個體支援該功能。 | 僅商業版TCP協議的SDK支援該功能。 |
Tag過濾
Tag,即訊息標籤,用於對某個Topic下的訊息進行分類。雲訊息佇列 RocketMQ 版的生產者在發送訊息時,指定訊息的Tag,消費者需根據已經指定的Tag來進行訂閱。
情境樣本
以下圖電商交易情境為例,從客戶下單到收到商品這一過程會生產一系列訊息,以如下訊息為例:
訂單訊息
支付訊息
物流訊息
這些訊息會發送到名稱為Trade_Topic的Topic中,被各個不同的系統所訂閱,以如下系統為例:
支付系統:只需訂閱支付訊息。
物流系統:只需訂閱物流訊息。
交易成功率分析系統:需訂閱訂單和支付訊息。
Realtime Compute系統:需要訂閱所有和交易相關的訊息。
過濾示意圖如下所示。
配置方式
雲訊息佇列 RocketMQ 版支援通過SDK配置Tag過濾功能,分別在訊息發送和訂閱代碼中設定訊息Tag和訂閱訊息Tag。SDK詳細資料,請參見SDK參考概述。訊息發送端和消費端的代碼配置方法如下:
發送訊息
發送訊息時,每條訊息必須指明Tag。
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
訂閱所有Tag
消費者如需訂閱某Topic下所有類型的訊息,Tag用星號(*)表示。
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
訂閱單個Tag
消費者如需訂閱某Topic下某一種類型的訊息,請明確標明Tag。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
訂閱多個Tag
消費者如需訂閱某Topic下多種類型的訊息,請在多個Tag之間用兩個豎線(||)分隔。
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
錯誤樣本
同一個消費者多次訂閱某個Topic下的Tag,以最後一次訂閱的Tag為準。
//如下錯誤碼中,Consumer只能訂閱到MQ_TOPIC下TagB的訊息,而不能訂閱TagA的訊息。 consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
SQL屬性過濾
SQL屬性過濾是在訊息發送時設定訊息的自訂屬性,消費者訂閱時使用SQL文法設定過濾運算式,根據自訂屬性過濾訊息,雲訊息佇列 RocketMQ 版根據運算式的邏輯進行計算,將合格訊息投遞到消費端。
Tag屬於一種特殊的訊息屬性,所以SQL過濾方式也相容Tag過濾方法,支援通過Tag屬性過濾訊息。在SQL文法中,Tag的屬性值為TAGS。
使用限制
使用SQL屬性過濾訊息時,有以下限制:
只有企業鉑金版執行個體支援SQL屬性過濾,標準版執行個體不支援該功能。
只有TCP協議的用戶端支援SQL屬性過濾,HTTP協議的用戶端不支援該功能。
若服務端不支援SQL過濾時,用戶端使用SQL過濾訊息,則用戶端啟動會報錯或收不到訊息。
情境樣本
以下圖電商交易情境為例,從客戶下單到收到商品這一過程會生產一系列訊息,按照類型將訊息分為訂單訊息和物流訊息,其中給物流訊息定義地區屬性,按照地區分為杭州和上海:
訂單訊息
物流訊息
物流訊息且地區為杭州
物流訊息且地區為上海
這些訊息會發送到名稱為Trade_Topic的Topic中,被各個不同的系統所訂閱,以如下系統為例:
物流系統1:只需訂閱物流訊息且訊息地區為杭州。
物流系統2:只需訂閱物流訊息且訊息地區為杭州或上海。
訂單跟蹤系統:只需訂閱訂單訊息。
Realtime Compute系統:需要訂閱所有和交易相關的訊息。
過濾示意圖如下所示。
配置方式
雲訊息佇列 RocketMQ 版支援通過SDK配置SQL屬性過濾。發送端需要在發送訊息的代碼中設定訊息的自訂屬性;消費端需要在訂閱訊息代碼中設定SQL文法的過濾運算式。SDK詳細資料,請參見SDK參考概述。訊息發送端和消費端的代碼配置方法如下:
訊息發送端:
設定訊息的自訂屬性。
Message msg = new Message("topic", "tagA", "Hello MQ".getBytes()); // 設定自訂屬性A,屬性值為1。 msg.putUserProperties("A", "1");
訊息消費端
使用SQL文法設定過濾運算式,並根據自訂屬性過濾訊息。
重要使用屬性時,需要先判斷屬性是否存在。若屬性不存在則過濾運算式的計算結果為NULL,訊息不會被投遞到消費端。
// 訂閱自訂屬性A存在且屬性值為1的訊息。 consumer.subscribe("topic", MessageSelector.bySql("A IS NOT NULL AND TAGS IS NOT NULL AND A = '1'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
SQL文法如下:
文法 | 說明 | 樣本 |
IS NULL | 判斷屬性不存在。 |
|
IS NOT NULL | 判斷屬性存在。 |
|
| 用於比較數字,不能用於比較字串,否則Consumer用戶端啟動會報錯。 說明 可轉化為數位字串也被認為是數字。 |
|
BETWEEN xxx AND xxx | 用於比較數字,不能用於比較字串,否則Consumer用戶端啟動會報錯。等價於>= xxx AND <= xxx。表示屬性值在兩個數字之間。 |
|
NOT BETWEEN xxx AND xxx | 用於比較數字,不能用於比較字串,否則Consumer用戶端啟動會報錯。等價於< xxx OR > xxx,表示屬性值在兩個值的區間之外。 |
|
IN (xxx, xxx) | 表示屬性的值在某個集合內。集合的元素只能是字串。 |
|
| 等於和不等於。可用於比較數字和字串。 |
|
| 邏輯與和邏輯或。可用於組合任意簡單的邏輯判斷,需要將每個邏輯判斷內容放入括弧內。 |
|
由於SQL屬性過濾是發送端定義訊息屬性,消費端設定SQL過濾條件,因此過濾條件的計算結果具有不確定性,服務端的處理方式為:
如果過濾條件的運算式計算拋異常,訊息預設被過濾,不會被投遞給消費端。例如比較數字和非數字類型的值。
如果過濾條件的運算式計算值為null或不是布爾類型(true和false),則訊息預設被過濾,不會被投遞給消費端。例如發送訊息時未定義某個屬性,在訂閱時過濾條件中直接使用該屬性,則過濾條件的運算式計算結果為null。
如果訊息自訂屬性為浮點型,但過濾條件中使用整數進行判斷,則訊息預設被過濾,不會被投遞給消費端。
範例程式碼
發送訊息
同時設定訊息Tag和自訂屬性。
Producer producer = ONSFactory.createProducer(properties); // 設定Tag的值為tagA。 Message msg = new Message("topicA", "tagA", "Hello MQ".getBytes()); // 設定自訂屬性region為hangzhou。 msg.putUserProperties("region", "hangzhou"); // 設定自訂屬性price為50。 msg.putUserProperties("price", "50"); SendResult sendResult = producer.send(msg);
根據單個自訂屬性訂閱訊息。
Consumer consumer = ONSFactory.createConsumer(properties); // 只訂閱屬性region為hangzhou的訊息,若訊息中未定義屬性region或屬性值不是hangzhou,則訊息不會被投遞到消費端。 consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND region = 'hangzhou'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:樣本中發送的訊息屬性符合訂閱的過濾條件,訊息被投遞到消費端。
同時根據Tag和自訂屬性訂閱訊息。
Consumer consumer = ONSFactory.createConsumer(properties); // 只訂閱Tag的值為tagA且屬性price大於30的訊息。 consumer.subscribe("topicA", MessageSelector.bySql("TAGS IS NOT NULL AND price IS NOT NULL AND TAGS = 'tagA' AND price > 30 "), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:樣本中發送的訊息Tag和自訂屬性符合訂閱的過濾條件,訊息被投遞到消費端。
同時根據多個自訂屬性訂閱訊息。
Consumer consumer = ONSFactory.createConsumer(properties); // 只訂閱region為hangzhou且屬性price小於20的訊息。 consumer.subscribe("topicA", MessageSelector.bySql("region IS NOT NULL AND price IS NOT NULL AND region = 'hangzhou' AND price < 20"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:訊息不會被投遞到消費端。訂閱的過濾條件中price小於20,發送的訊息中price屬性值為50,不符合訂閱過濾條件。
訂閱Topic中的所有訊息,不進行過濾。
Consumer consumer = ONSFactory.createConsumer(properties); // 若需要訂閱Topic中的所有訊息,需要將SQL運算式的值設定為“TRUE”。 consumer.subscribe("topicA", MessageSelector.bySql("TRUE"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
預期結果:Topic中的所有訊息都將被投遞到消費端進行消費。
錯誤樣本
訊息發送時未自訂某屬性,消費端在訂閱時未判斷該屬性是否存在直接使用,則訊息不會被投遞給消費端。
Consumer consumer = ONSFactory.createConsumer(properties); // 屬性product在發送訊息時未定義,過濾失敗,訊息不會被投遞至消費端。 consumer.subscribe("topicA", MessageSelector.bySql("product = 'MQ'"), new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.printf("Receive New Messages: %s %n", message); return Action.CommitMessage; } });
更多資訊
同一個Group ID下的消費者執行個體與Topic的訂閱關係需保持一致,更多資訊,請參見訂閱關係一致。
合理使用Topic和Tag來過濾訊息可以讓業務更清晰,更多資訊,請參見Topic與Tag最佳實務。