全部产品
Search
文档中心

云消息队列 MQTT 版:MQTT数据流入规则的实现

更新时间:Oct 15, 2024

如果您的云端应用需要使用云消息队列 RocketMQ 版产品的某些功能,例如顺序消息特性、事务消息特性等,您可以通过消息流入或流出规则将云消息队列 MQTT 版云消息队列 RocketMQ 版数据进行流转。本文介绍如何将云消息队列 RocketMQ 版产品的数据导入云消息队列 MQTT 版

背景信息

云消息队列 MQTT 版支持云端SDK,云上应用可直接通过云端SDK接入云消息队列 MQTT 版服务端进行消息收发。云端SDK使用,请参见云端开发概述

同时云消息队列 MQTT 版支持和其他云产品进行互通,当前支持的云产品有云消息队列 RocketMQ 版

本文以公网环境中的Java SDK为例说明如何将云消息队列 RocketMQ 版数据流入至云消息队列 MQTT 版

quick_start_data_inflow

网络访问

云消息队列 MQTT 版同时提供了公网接入点VPC 接入点
  • 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
  • VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:
  • 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
  • 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。

前提条件

  • 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
  • 下载安装JDK
  • 已创建云消息队列 MQTT 版实例、Topic和Group ID,具体操作,请参见创建资源

  • 已创建云消息队列 RocketMQ 版实例、Topic和Group ID,具体操作,请参见步骤二:创建资源

重要
  • 云消息队列 MQTT 版数据流入规则仅支持云消息队列 RocketMQ 版4.x系列实例。

  • 云消息队列 MQTT 版数据流入规则不能跨地域使用,因此,云消息队列 MQTT 版云消息队列 RocketMQ 版的资源都必须创建在同一地域。

1.创建数据流入规则

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。

  3. 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则

  4. 创建规则页面完成以下操作。

    1. 配置基本信息。输入规则ID,选择数据流入的规则类型。

      image

    2. 配置规则源。选择已经创建好的云消息队列 RocketMQ 版的实例和Topic。

      image

    3. 配置规则目标。选择已经创建好的云消息队列 MQTT 版的Topic。

      image

2.准备测试代码

2.1下载示例代码

  1. 下载mqtt-java-demo,并解压该Demo工程包至您指定的文件夹。

  2. 在解压的Demo工程中找到lmq-java-demo文件夹,将此文件夹导入IntelliJ IDEA,并确认pom.xml中已包含以下依赖。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置访问凭证。

    • 获取AccessKey信息。获取方式,请参见创建AccessKey

    • 配置环境变量。云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV。关于配置环境变量的方法,请参见配置访问凭证

2.2收发消息代码

RocketMQSendMessageToMQ4IoT.java类中包含了发送RocketMQ消息和使用MQTT消费的代码,按代码注释说明填写云消息队列 RocketMQ 版云消息队列 MQTT 版资源的参数。

测试中可以将其中发送P2P消息的相关代码注释掉,示例代码如下。

收发送消息代码示例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class RocketMQSendMessageToMQ4IoT {
    public static void main(String[] args) throws Exception {
        /**
         * 初始化云消息队列 RocketMQ 版发送客户端,实际业务中一般部署在服务端应用中。
         */
        Properties properties = new Properties();
        /**
         * 设置云消息队列 RocketMQ 版Group ID,在云消息队列 RocketMQ 版控制台创建。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        /**
         * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
         * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
         * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
         * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * 设置TCP接入点,该接入点为云消息队列 RocketMQ 版实例的接入点。进入云消息队列 RocketMQ 版控制台实例详情页面获取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * 设置云消息队列 RocketMQ 版的Topic,在云消息队列 RocketMQ 版控制台创建。
         * 云消息队列 RocketMQ 版和云消息队列 MQTT 版配合使用时,RocketMQ客户端仅操作一级Topic。
         */
        final String parentTopic = "XXXXX";
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * 初始化云消息队列 MQTT 版接收客户端,实际业务中云消息队列 MQTT 版一般部署在移动终端环境。
         */

        /**
         * 您在控制台创建的云消息队列 MQTT 版的实例ID。
         */
        String instanceId = "XXXXX";
        /**
         * 设置接入点,进入云消息队列 MQTT 版控制台实例详情页面获取。
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
         * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
         * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
         * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。
         * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。
         */
        String clientId = "GID_XXXX@@@XXXXX";
        /**
         * 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任何字符串。
         * 需要注意的是,完整的Topic长度不得超过128个字符。
         */
        final String subTopic = "/testMq4Iot";
        final String mq4IotTopic = parentTopic + subTopic;
        /**
         * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 设置客户端发送超时时间,防止无限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的Topic。
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            /**
             * 使用RocketMQ客户端发消息给MQTT客户端时,Topic指定为一级父Topic,Tag指定为MQ2MQTT。
             */
            Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
            /**
             * 使用RocketMQ客户端发消息给MQTT客户端时,可以通过MqttSecondTopic属性设置MQTT的子级Topic属性。
             */
            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
            SendResult result = producer.send(msg);
            System.out.println(result);
//            /**
//             * 发送P2P消息,设置子级Topic。
//             */
//            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
//            result = producer.send(msg);
//            System.out.println(result);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3.结果验证

执行RocketMQSendMessageToMQ4IoT.java类中的Main函数运行代码。可以根据下面操作验证消息的发送和消费情况。

代码验证

如下图所示,RocketMQ消息已经成功发送,MQTT客户端也已经成功消费。

image

控制台验证

  • 查询消息发送情况。在云消息队列 RocketMQ 版控制台消息查询页面,根据Topic和Message ID查询到消息已经成功发送,如下图所示。

    image

  • 查询消息消费情况。在云消息队列 MQTT 版控制台消息轨迹查询页面,根据Message ID查询消息已经被消费,如下图所示。

    image

更多信息