云消息队列 MQTT 版提供同步查询和异步上下线事件通知两种方式,来获取MQTT客户端在线状态。本文介绍这两种方式的基本原理、应用场景、具体差异以及实现方式。
基本原理
云消息队列 MQTT 版服务端(下文简称为MQTT服务端)提供以下方式获取客户端在线状态:
- 同步查询
该方式相对简单,即通过开放的接入点地址调用HTTP/HTTPS方式的OpenAPI查询某个特定客户端的当前实时状态,适用于对单个或多个客户端的状态判断。
- 异步上下线事件通知
该方式使用消息通知,在客户端上线和下线事件触发时,MQTT服务端支持将上下线消息直接推送到部署在阿里云服务器上的云端应用。
该方式属于异步感知客户端的状态,且感知到的是上下线事件,而非在线状态,云端应用需要根据事件发生的时间序列分析出客户端的状态。
应用场景
两种获取MQTT客户端在线状态的方式分别应用于以下场景:
- 同步查询
- 主业务流程中需要根据客户端是否在线来决定后续运行逻辑。
- 运维过程需要判断特定客户端当前是否在线。
- 异步上下线事件通知
- 服务端需要在客户端上线或者下线时触发一些预定义的动作。
- 服务端需要对客户端的上下线数据进行统计分析,并根据客户端的在线状态推送消息。
同步查询与异步事件通知的差异
两种查询方式的区别如下:
- 同步查询是查询当前客户端的实时状态,理论上比异步通知的方式更精确。
- 异步上下线通知因为采用消息解耦,状态判断更加复杂,且误判可能性更大,但该方法可以基于事件分析多个客户端的运行状态轨迹。异步通知虽然存在一定复杂度和误判,但更加适合大规模的客户端的状态统计。
实现方式
- 同步查询
同步查询方式可以通过调用以下API接口获取客户端状态:
- 异步上下线事件通知
异步上下线事件通知方式下,云端服务可通过调用云消息队列 MQTT 版提供的云端SDK获取客户端上下线消息。SDK下载,请参见版本说明。
获取客户端上下线事件的示例代码如下:说明 在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证。云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example; import com.alibaba.fastjson.JSONObject; import com.alibaba.mqtt.server.ServerConsumer; import com.alibaba.mqtt.server.callback.StatusListener; import com.alibaba.mqtt.server.config.ChannelConfig; import com.alibaba.mqtt.server.config.ConsumerConfig; import com.alibaba.mqtt.server.model.StatusNotice; public class MQTTClientStatusNoticeProcessDemo { public static void main(String[] args) throws Exception { /** * 您创建的云消息队列 MQTT 版的实例接入点。 * 获取云端SDK的接入点,请联系云消息队列 MQTT 版技术支持,钉钉群号:35228338。。 * 接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致服务端异常。 */ String domain = "domain"; /** * 使用的协议和端口必须匹配,该参数值固定为5672。 */ int port = 5672; /** * 您创建的云消息队列 MQTT 版的实例ID。 */ String instanceId = "instanceId"; /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * 您在云消息队列 MQTT 版控制台创建的Group的ID。 * */ String mqttGroupId = "mqttGroupId"; ChannelConfig channelConfig = new ChannelConfig(); channelConfig.setDomain(domain); channelConfig.setPort(port); channelConfig.setInstanceId(instanceId); channelConfig.setAccessKey(accessKey); channelConfig.setSecretKey(secretKey); ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig()); serverConsumer.start(); serverConsumer.subscribeStatus(mqttGroupId, new StatusListener() { @Override public void process(StatusNotice statusNotice) { System.out.println(JSONObject.toJSONString(statusNotice)); } }); } }