基本原理
在用戶端上線和下線事件觸發時,MQTT伺服器會根據您配置的用戶端上下線通知規則,向後端其他雲產品推送一條上下線訊息。業務應用一般部署在阿里雲的伺服器上,業務應用通過向後端雲產品訂閱這條訊息來擷取所有用戶端的上下線動作。
該方式屬於非同步感知用戶端的狀態,且感知到的是上下線事件,而非線上狀態,雲端應用需要根據事件發生的時間序列分析出用戶端的狀態。
非同步上下線通知因為採用訊息解耦,狀態判斷更加複雜,且誤判可能性更大,但該方法可以基於事件分析多個用戶端的運行狀態軌跡。
應用情境
用戶端上下線通知主要的應用情境為業務應用需要在用戶端上線或者下線時觸發一些預定義的動作。
例如,用戶端線上狀態彙總。此情境中,MQTT用戶端產生上下線等狀態變更,雲Message QueueTT 版會根據您配置的用戶端狀態通知規則,將狀態變更封裝後轉寄到雲訊息佇列 RocketMQ 版訊息的方式來實現用戶端狀態資料的彙總和統計。
使用限制
限制項 | 限制值 | 說明 |
單一實例規則數量 | 100 | 如果預設限制不滿足,請聯絡雲Message QueueTT 版支援人員,DingTalk群號:35228338。 |
規則去重限制 | 同一個內部資源同種規則只能建立一個規則。 | 例如一個Group ID只能建立一個上下線通知規則,一個MQTT Topic只能建立一個資料流入規則和一個資料流出規則。 |
地區限制 | 不支援跨地區建立規則,規則的資料來源和資料目標所屬的執行個體必須處於同一地區。 | 例如,建立資料流出規則,資料來源雲Message QueueTT 版執行個體屬於華東1(杭州)地區,則資料目標雲訊息佇列 RocketMQ 版只能選擇華東1(杭州)地區的執行個體。 |
雲Message QueueTT 版執行個體版本 | | 新購的雲Message QueueTT 版執行個體預設為新版本執行個體,舊版執行個體已不支援購買。 |
雲訊息佇列 RocketMQ 版執行個體版本 | 僅4.0系列執行個體支援 | 雲Message QueueTT 版和雲訊息佇列 RocketMQ 版通過訊息流程入或訊息流程出規則進行資料互連時,雲訊息佇列 RocketMQ 版僅4.0系列執行個體支援訊息流程入或流出規則,5.0系列執行個體不支援。 |
資源地圖方式
同一個雲Message QueueTT 版Group ID下的所有用戶端的狀態變更通知都會轉寄到您配置的同一個其他阿里雲產品的資源裡。
操作流程
如上文所述,如果使用非同步上下線通知的方式,您需建立用戶端上下線事件通知規則,將上下線通知的訊息匯出至後端雲產品中。下文以使用雲訊息佇列 RocketMQ 版後端雲產品為例進行說明。
建立上下線通知規則。
您需關注哪些Group ID分組的裝置,就在雲Message QueueTT 版控制台建立規則時,選定相應的Group ID。建立規則的詳細步驟,請參見建立上下線通知規則。
業務應用訂閱該類通知訊息。
通過步驟1中建立的規則,即可收到關注的用戶端的上下線事件。雲訊息佇列 RocketMQ 版的接收程式,請參見訂閱訊息。範例程式碼詳細資料,請參見MQTTClientStatusNoticeProcessDemo.java。
事件類型放在雲訊息佇列 RocketMQ 版的Tag中,代表上線或下線。資料格式如下:
MQ Tag:connect/disconnect/tcpclean
其中:
connect事件代表用戶端上線動作。
disconnect事件代表用戶端主動中斷連線。按照MQTT協議,用戶端主動斷開TCP串連之前應該發送disconnect 報文,MQTT伺服器在收到disconnect 報文後觸發該類型訊息。如果某些用戶端SDK沒有按照協議發送disconnect 報文,MQTT伺服器相應無法收到該訊息。
tcpclean事件代表實際的TCP串連斷開。無論用戶端是否顯示發送過disconnect 報文,只要當前TCP串連斷開就會觸發tcpclean事件。
說明 tcpclean訊息代表用戶端網路層串連的真實斷開。對應的,disconnect訊息僅僅代表用戶端是主動發送了下線報文。受限於用戶端的實現,有時候用戶端異常退出會導致disconnect訊息並沒有正常發送。因此判斷用戶端下線請使用tcpclean事件。
資料內容為JSON類型,相關的Key說明如下:
樣本如下:
clientId:GID_XXX@@@YYYYY
time:1212121212
eventType:connect/disconnect/tcpclean
channelId:2b9b1281046046faafe5e0b458e4XXXX
clientIp:192.168.XX.XX:133XX
判斷用戶端當前是否線上不能僅僅根據收到的最後一條訊息的狀態,而需要結合上下線訊息的前後關聯來判斷。
具體判斷規則如下:
同一個clientId的用戶端,產生上下線事件的先後順序以時間為準,基本原則為時間戳記越大則越新。
同一個clientId的用戶端,可能存在多次閃斷,因此,當收到下線訊息時,一定要根據channelId欄位判斷是否是當前的TCP串連。簡而言之,下線訊息只能覆蓋channelId相同的下線訊息,如果下線訊息的channelId不一樣,儘管time較新,也不能覆蓋。一個channelId代表一個TCP串連,只會存在一個connect事件和一個close事件。
非同步上下線通知Java範例程式碼
package com.aliyun.openservices.lmq.example.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* 初始化訊息佇列RocketMQ版接收用戶端,實際業務中一般部署在服務端應用中。
*/
Properties properties = new Properties();
/**
* 設定RocketMQ用戶端的Group ID,注意此處的groupId和MQTT執行個體中的groupId是兩個概念,請按照各自產品的說明申請填寫。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* 帳號AccessKey ID,從控制台擷取。
*/
properties.put(PropertyKeyConst.AccessKey, "XXXX");
/**
* 帳號AccessKey Secret,從控制台擷取,僅在Signature鑒權模式下需要設定。
*/
properties.put(PropertyKeyConst.SecretKey, "XXXX");
/**
* 設定TCP接入網域名稱。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
/**
* 使用RocketMQ消費端來處理MQTT用戶端的上下線通知時,訂閱的Topic為上下線通知Topic,請遵循控制台文檔提前建立。
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* 用戶端狀態資料,實際生產環境中建議使用資料庫或者Redis等外部持久化儲存來儲存該資訊,避免應用重啟丟失狀態,本Demo以單機記憶體版實現做示範。
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* 此處僅處理用戶端是否線上,因此只需要關注connect事件和tcpclean事件即可。
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXxXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 處理上下線通知的邏輯。
* 實際部署過程中,消費上下線通知的應用可能部署多台機器,因此用戶端線上狀態的資料可以使用資料庫或者Redis等外部共用儲存來維護。
* 其次需要單獨做訊息等冪處理,以免重複接收訊息導致狀態機器判斷錯誤。
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* 首先儲存新的事件。
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* 讀取當前channel的事件列表。
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* 如果事件列表裡上線和下線事件都已經收到,則當前channel已經掉線,可以清理掉這個channel的資料。
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* 根據狀態表判斷一個clientId是否有活躍的TCP串連。
* 1.如果沒有channel表,則一定不線上。
* 2.如果channel表非空,檢查一下channel資料中是否僅包含上線事件,如果有則代表有活躍串連線上。
* 如果全部的channel都有掉線斷開事件則一定是不線上。
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}