背景信息
云消息队列 MQTT 版支持云端SDK,云上应用可直接通过云端SDK接入云消息队列 MQTT 版服务端进行消息收发。云端SDK使用,请参见云端开发概述。
同时云消息队列 MQTT 版支持和其他云产品进行互通,当前支持的云产品有云消息队列 RocketMQ 版。
本文以公网环境中的Java SDK为例说明如何将云消息队列 RocketMQ 版数据流入至云消息队列 MQTT 版。
网络访问
云消息队列 MQTT 版同时提供了
公网接入点和
VPC 接入点。
- 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
- VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版。
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题
云消息队列 MQTT 版产品方概不负责:
- 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
- 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
1.创建数据流入规则
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则。
在创建规则页面完成以下操作。
配置基本信息。输入规则ID,选择数据流入的规则类型。
配置规则源。选择已经创建好的云消息队列 RocketMQ 版的实例和Topic。
配置规则目标。选择已经创建好的云消息队列 MQTT 版的Topic。
2.准备测试代码
2.1下载示例代码
下载mqtt-java-demo,并解压该Demo工程包至您指定的文件夹。
在解压的Demo工程中找到lmq-java-demo文件夹,将此文件夹导入IntelliJ IDEA,并确认pom.xml中已包含以下依赖。
<dependencies>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.5.Final</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
配置访问凭证。
2.2收发消息代码
在RocketMQSendMessageToMQ4IoT.java
类中包含了发送RocketMQ消息和使用MQTT消费的代码,按代码注释说明填写云消息队列 RocketMQ 版和云消息队列 MQTT 版资源的参数。
测试中可以将其中发送P2P消息的相关代码注释掉,示例代码如下。
收发送消息代码示例
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class RocketMQSendMessageToMQ4IoT {
public static void main(String[] args) throws Exception {
/**
* 初始化云消息队列 RocketMQ 版发送客户端,实际业务中一般部署在服务端应用中。
*/
Properties properties = new Properties();
/**
* 设置云消息队列 RocketMQ 版Group ID,在云消息队列 RocketMQ 版控制台创建。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
/**
* AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
* 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
* 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
* 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* 设置TCP接入点,该接入点为云消息队列 RocketMQ 版实例的接入点。进入云消息队列 RocketMQ 版控制台实例详情页面获取。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* 设置云消息队列 RocketMQ 版的Topic,在云消息队列 RocketMQ 版控制台创建。
* 云消息队列 RocketMQ 版和云消息队列 MQTT 版配合使用时,RocketMQ客户端仅操作一级Topic。
*/
final String parentTopic = "XXXXX";
Producer producer = ONSFactory.createProducer(properties);
producer.start();
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* 初始化云消息队列 MQTT 版接收客户端,实际业务中云消息队列 MQTT 版一般部署在移动终端环境。
*/
/**
* 您在控制台创建的云消息队列 MQTT 版的实例ID。
*/
String instanceId = "XXXXX";
/**
* 设置接入点,进入云消息队列 MQTT 版控制台实例详情页面获取。
*/
String endPoint = "XXXXXX.mqtt.aliyuncs.com";
/**
* AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
* 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
* 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
* 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。
* clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。
*/
String clientId = "GID_XXXX@@@XXXXX";
/**
* 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任何字符串。
* 需要注意的是,完整的Topic长度不得超过128个字符。
*/
final String subTopic = "/testMq4Iot";
final String mq4IotTopic = parentTopic + subTopic;
/**
* QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 设置客户端发送超时时间,防止无限阻塞。
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客户端连接成功后就需要尽快订阅需要的Topic。
*/
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
* 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
*/
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
/**
* 使用RocketMQ客户端发消息给MQTT客户端时,Topic指定为一级父Topic,Tag指定为MQ2MQTT。
*/
Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
/**
* 使用RocketMQ客户端发消息给MQTT客户端时,可以通过MqttSecondTopic属性设置MQTT的子级Topic属性。
*/
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
SendResult result = producer.send(msg);
System.out.println(result);
// /**
// * 发送P2P消息,设置子级Topic。
// */
// msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
// result = producer.send(msg);
// System.out.println(result);
}
Thread.sleep(Long.MAX_VALUE);
}
}
3.结果验证
执行RocketMQSendMessageToMQ4IoT.java
类中的Main函数运行代码。可以根据下面操作验证消息的发送和消费情况。
代码验证
如下图所示,RocketMQ消息已经成功发送,MQTT客户端也已经成功消费。
控制台验证