本文介绍如何快速使用云消息队列 MQTT 版的Java SDK实现MQTT终端和云端服务的消息收发。
前提条件
背景信息
接入点说明
终端和云端服务与云消息队列 MQTT 版通信时,需要在各自的SDK代码中设置云消息队列 MQTT 版实例的接入点信息,通过接入点和云消息队列 MQTT 版服务端连接。
终端SDK接入点格式
使用终端SDK接入云消息队列 MQTT 版时,需要填写的接入点格式如下:
公网接入点:
MQTT实例ID.mqtt.aliyuncs.com
VPC 接入点:
MQTT实例ID-internal-vpc.mqtt.aliyuncs.com
终端SDK接入点也可以直接在云消息队列 MQTT 版控制台实例详情页面的接入点页签中查看。
云端SDK接入点格式
使用云端SDK接入云消息队列 MQTT 版时,需要填写的接入点格式如下:
重要仅实例地域属于中国内地的实例支持云端SDK接入。
公网接入点:
MQTT实例ID-server-internet.mqtt.aliyuncs.com
VPC 接入点:
MQTT实例ID-server-internal.mqtt.aliyuncs.com
MQTT实例ID可在云消息队列 MQTT 版控制台实例详情页面的基础信息区域查看。
终端SDK接入点和云端SDK接入点同时支持公网接入点和VPC 接入点。公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版。
SDK使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:
终端或云端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
终端或云端网络侧对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
调用终端SDK发送消息
- 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client。
下载终端SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo。
- 解压该Demo工程包至您指定的文件夹。
在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
<dependencies> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
在MQ4IoTProducerDemo.java类中,按代码注释说明填写相应参数,主要涉及您已在创建资源中所创建的MQTT资源信息。然后执行Main函数运行代码完成消息发送。
示例代码如下。
说明在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证。
云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example.demo; import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MQ4IoTProducerDemo { public static void main(String[] args) throws Exception { /** * 您创建的云消息队列 MQTT 版的实例ID。 */ String instanceId = "XXXXX"; /** * 设置终端SDK的接入点,进入云消息队列 MQTT 版控制台实例详情页面的接入点页签查看。 * 接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致客户端异常。 */ String endPoint = "XXXXX.mqtt.aliyuncs.com"; /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。 * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。 */ String clientId = "GID_XXXXX@@@XXXXX"; /** * 云消息队列 MQTT 版消息的一级Topic,需要在控制台创建才能使用。 * 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。 */ final String parentTopic = "XXXXX"; /** * 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任意字符串。 * 需要注意的是,完整的Topic长度不得超过128个字符。 */ final String mq4IotTopic = parentTopic + "/" + "testMq4Iot"; /** * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。 */ final int qosLevel = 0; ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId); final MemoryPersistence memoryPersistence = new MemoryPersistence(); /** * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。 */ final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence); /** * 设置客户端发送超时时间,防止无限阻塞。 */ mqttClient.setTimeToWait(5000); final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { /** * 客户端连接成功后就需要尽快订阅需要的Topic。 */ System.out.println("connect success"); } @Override public void connectionLost(Throwable throwable) { throwable.printStackTrace(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { /** * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。 */ System.out.println( "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]); } }); mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions()); for (int i = 0; i < 10; i++) { MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes()); message.setQos(qosLevel); /** * 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。 */ mqttClient.publish(mq4IotTopic, message); /** * 云消息队列 MQTT 版支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的clientId,则可以直接发送点对点消息。 * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的Topic格式规范是 {{parentTopic}}/p2p/{{targetClientId}}。 */ String receiverId = "xxx"; final String p2pSendTopic = parentTopic + "/p2p/" + receiverId; message = new MqttMessage("hello mq4Iot p2p msg".getBytes()); message.setQos(qosLevel); mqttClient.publish(p2pSendTopic, message); } Thread.sleep(Long.MAX_VALUE); } }
调用云端SDK接收消息
下载云消息队列 MQTT 版提供的云端SDK。下载地址为云端SDK版本说明。
下载云端SDK的Demo示例做为您代码开发的参考。下载地址为mqtt-server-sdk-demo。
- 解压该Demo工程包至您指定的文件夹。
在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
<dependencies> <dependency> <groupId>com.alibaba.mqtt</groupId> <artifactId>server-sdk</artifactId> <version>1.0.0.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies>
在MQTTConsumerDemo.java类中,按代码注释说明填写相应参数,主要涉及您已在创建资源中所创建好的MQTT资源信息。然后执行Main函数运行代码完成消息接收。
示例代码如下。
说明在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证。
云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example; import com.alibaba.fastjson.JSONObject; import com.alibaba.mqtt.server.ServerConsumer; import com.alibaba.mqtt.server.callback.MessageListener; import com.alibaba.mqtt.server.config.ChannelConfig; import com.alibaba.mqtt.server.config.ConsumerConfig; import com.alibaba.mqtt.server.model.MessageProperties; public class MQTTConsumerDemo { public static void main(String[] args) throws Exception { /** * 设置云端SDK的接入点,请参见接入点说明中的云端SDK接入点格式。 * 接入点地址必须填写分配的域名,不得使用IP地址直接连接,否则可能会导致服务端异常。 */ String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com"; /** * 使用的协议和端口必须匹配,该参数值固定为5672。 */ int port = "5672"; /** * 您创建的云消息队列 MQTT 版的实例ID。 */ String instanceId = "post-cn-jaj3h8i****"; /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * 云消息队列 MQTT 版消息的一级Topic,需要在控制台创建才能使用。 * 由于云端SDK订阅消息一般用于云上应用进行消息汇总和分析等场景,因此,云端SDK订阅消息不支持设置子级Topic。 * 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。 */ String firstTopic = "firstTopic"; ChannelConfig channelConfig = new ChannelConfig(); channelConfig.setDomain(domain); channelConfig.setPort(port); channelConfig.setInstanceId(instanceId); channelConfig.setAccessKey(accessKey); channelConfig.setSecretKey(secretKey); ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig()); serverConsumer.start(); serverConsumer.subscribeTopic(firstTopic, new MessageListener() { @Override public void process(String msgId, MessageProperties messageProperties, byte[] payload) { System.out.println("Receive:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload)); } }); } }
说明云端SDK消息发送的示例代码,请参见MQTTProducerDemo.java。