All Products
Search
Document Center

ApsaraMQ for MQTT:Use ApsaraMQ for MQTT SDK for Java to implement messaging between ApsaraMQ for MQTT clients and backend service applications

Last Updated:Jun 20, 2024

This topic describes how to use ApsaraMQ for MQTT SDK for Java to implement messaging between ApsaraMQ for MQTT clients and backend service applications.

Prerequisites

  • The required resources are created. For more information, see Create resources

  • An AccessKey pair is obtained. For more information, see Obtain an AccessKey pair.

  • The integrated development environment (IDE) is installed. For more information, see IntelliJ IDEA. You can use IntelliJ IDEA or Eclipse. In the examples of this topic, IntelliJ IDEA is used.

  • The Java Development Kit (JDK) is installed. For more information, see Java Downloads.

Background information

The following figure shows the process for implementing messaging between an ApsaraMQ for MQTT client and a backend service application. The ApsaraMQ for MQTT client can use a client SDK and the backend service application can use a cloud SDK to access ApsaraMQ for MQTT and implement two-way communication. 终端云端交互This topic describes how to use Apsara for MQTT SDK for Java to implement messaging between an ApsaraMQ for MQTT client and a backend service application over the Internet. For information about the sample code that is used to send and receive messages, see Demo projects or Demo.

Endpoints

Before you use a client SDK and a cloud SDK to access ApsaraMQ for MQTT, you must specify an endpoint of your ApsaraMQ for MQTT instance in the SDK code. This way, the ApsaraMQ for MQTT client and backend service application can use the endpoint to access the ApsaraMQ for MQTT broker.

  • Formats of endpoints for client SDKs

    When you use a client SDK to connect a client to ApsaraMQ for MQTT, specify the endpoint in one of the following formats:

    • Public Endpoint: ID of the ApsaraMQ for MQTT instance.mqtt.aliyuncs.com

    • VPC Endpoint: ID of the ApsaraMQ for MQTT instance-internal-vpc.mqtt.aliyuncs.com

    You can also view the endpoint for client SDKs on the Endpoints tab of the Instance Details page in the ApsaraMQ for MQTT console.

  • Formats of endpoints for cloud SDKs

    When you use a cloud SDK to connect a backend service application to ApsaraMQ for MQTT, specify the endpoint in one of the following formats:

    Important

    You can use a cloud SDK to connect to only ApsaraMQ for MQTT instances whose kernel version is V3.3.0 and that are deployed in regions in the Chinese mainland.

    • Public Endpoint: ID of the ApsaraMQ for MQTT instance-server-internet.mqtt.aliyuncs.com

    • VPC Endpoint: ID of the ApsaraMQ for MQTT instance-server-internal.mqtt.aliyuncs.com

Note

You can obtain the ID of the ApsaraMQ for MQTT instance in the Basic Information section of the Instance Details page in the ApsaraMQ for MQTT console.

Endpoints for client SDKs and cloud SDKs can be public endpoints or VPC endpoints. A public endpoint is an IP address for access from the Internet and is commonly used in IoT and mobile Internet scenarios. A private endpoint is an IP address for access from a virtual private cloud (VPC) and is commonly used by backend service applications to access ApsaraMQ for MQTT.

Important

When you use an endpoint to connect an SDK to ApsaraMQ for MQTT, use the domain name instead of the IP address. The IP address unexpectedly changes. ApsaraMQ for MQTT technical team is not responsible for faults and direct or indirect losses in the following scenarios:

  • The client or backend service application uses an IP address instead of a domain name to access ApsaraMQ for MQTT. The original IP address becomes invalid after the technical team of ApsaraMQ for MQTT updates the domain name resolution.

  • A firewall policy on IP addresses is set in the network on which your ApsaraMQ for MQTT client or backend service application runs. New IP addresses are blocked due to the firewall policy after the technical team of ApsaraMQ for MQTT updates the domain name resolution.

Use a client SDK to send messages

  1. Download a third-party open source SDK for Java. Download link: Eclipse Paho Java Client.

  2. Download the demo of the client SDK as a reference for code development. Download link: mqtt-java-demo.

  3. Decompress the demo project package to a specific folder.

  4. In IntelliJ IDEA, import the extracted files to create a project and check whether the pom.xml file contains the following dependencies:

    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. In the MQ4IoTProducerDemo.java class, follow the description in the code annotations to configure the parameters. Most of the parameters are related to ApsaraMQ for MQTT resources that you created. For more information, see Create resources. Use the main() function to run the sample code to send messages.

    Sample code:

    Note Before you use the sample code to send and receive messages, you must configure environment variables to obtain the credentials that are used to access ApsaraMQ for MQTT. For information about how to configure environment variables, see Configure an access credential.

    The environment variable name of the AccessKey ID that is used to access ApsaraMQ for MQTT is MQTT_AK_ENV, and the environment variable name of the AccessKey secret that is used to access ApsaraMQ for MQTT is MQTT_SK_ENV.

    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    
    public class MQ4IoTProducerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * The ID of the ApsaraMQ for MQTT instance that you created. 
             */
            String instanceId = "XXXXX";
            /**
             * The endpoint for the client SDK. You can obtain the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for MQTT console. 
             * Use the domain name instead of the IP address as the endpoint. Otherwise, a client exception may occur. 
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * The AccessKey ID that you created in the Alibaba Cloud Resource Access Management (RAM) console for identity authentication. 
             * The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. To avoid security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. 
             * We strongly recommend that you do not save an AccessKey pair in the project code. Otherwise, the AccessKey pair may be leaked and all resources that are contained in your account may be exposed to potential security risks. 
             * In this example, the AccessKey pair is saved in the environment variables. 
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. The AccessKey secret is required only for signature authentication. 
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
            /**
             * The globally unique ID that the system assigns to the ApsaraMQ for MQTT client. The client ID must vary based on TCP connections. If multiple TCP connections use the same client ID, exceptions occur and the connections are unexpectedly closed. 
             * The client ID consists of a group ID and a device ID and is in the GroupID@@@DeviceID format. The group ID is the ID of the group that you create in the ApsaraMQ for MQTT console. The device ID is a custom ID specified by you. The client ID cannot exceed 64 characters in length. 
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * The parent topic that you created in the ApsaraMQ for MQTT console. 
             * If you specify a topic that does not exist or a topic that the ApsaraMQ for MQTT client is not authorized to access, the ApsaraMQ for MQTT broker closes the connection. 
             */
            final String parentTopic = "XXXXX";
            /**
             * The subtopic. ApsaraMQ for MQTT allows you to use a subtopic to filter messages. You can specify a string as the name of the subtopic. 
             * The value of the mq4IotTopic parameter can be up to 128 characters in length. 
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * The quality of service (QoS) level for message transmission. Valid values: 0, 1, and 2. For more information, see Terms. 
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
             /**
             * The protocol and port that are used by the ApsaraMQ for MQTT client. The protocol and port that are used by the ApsaraMQ for MQTT client must match. If Secure Sockets Layer (SSL) encryption is used, the protocol and port must be specified as ssl://endpoint:8883. 
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
             /**
             * The timeout period during which the ApsaraMQ for MQTT client waits for a response. The timeout period prevents the ApsaraMQ for MQTT client from waiting for a response for an indefinite period of time. 
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * The topic to which the consumer must subscribe at the earliest opportunity after the client connection is established. 
                     */
                    System.out.println("connect success");
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                     /**
                     * The callback that is invoked to consume messages that are published. Make sure that the callback does not throw exceptions. If a response is returned for the callback, the messages are consumed. 
                     * Messages must be consumed in the specified period of time. If a message is not consumed within the timeout period specified by the ApsaraMQ for MQTT broker, the broker may attempt to resend the message. Make sure that deduplication is performed to ensure idempotence for message consumption. 
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                 /**
                 * The topic to which messages are sent. If the messages are normal messages, this topic must be the same as the topic to which the consumer subscribes or can be matched by using wildcards. 
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * ApsaraMQ for MQTT supports point-to-point (P2P) messaging. If the producer knows that a specific message is to be sent to a specific client ID, the producer can use P2P messaging. 
                 * In P2P messaging, the consumer does not need to subscribe to the topic to which the producer sends messages. The logic is simplified at the consumer side. In P2P messaging, specify a topic in the format of {{parentTopic}}/p2p/{{targetClientId}}. 
                 */
                String receiverId = "xxx";
                final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

Use a cloud SDK to receive messages

  1. Download a cloud SDK provided by ApsaraMQ for MQTT. For more information about the download URL, see Release notes.

  2. Download the demo of the cloud SDK as a reference for code development. Download link: mqtt-server-sdk-demo.

  3. Decompress the demo project package to a specific folder.

  4. In IntelliJ IDEA, import the extracted files to create a project and check whether the pom.xml file contains the following dependencies:

    <dependencies>
            <dependency>
                <groupId>com.alibaba.mqtt</groupId>
                <artifactId>server-sdk</artifactId>
                <version>1.0.0.Final</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
    </dependencies>
  5. In the MQTTConsumerDemo.java class, follow the description in the code annotations to configure the parameters. Most of the parameters are related to ApsaraMQ for MQTT resources that you created. For more information, see Create resources. Use the main() function to run the sample code to receive messages.

    Sample code:

    Note Before you use the sample code to send and receive messages, you must configure environment variables to obtain the credentials that are used to access ApsaraMQ for MQTT. For information about how to configure environment variables, see Configure an access credential.

    The environment variable name of the AccessKey ID that is used to access ApsaraMQ for MQTT is MQTT_AK_ENV, and the environment variable name of the AccessKey secret that is used to access ApsaraMQ for MQTT is MQTT_SK_ENV.

    package com.aliyun.openservices.lmq.example;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.mqtt.server.ServerConsumer;
    import com.alibaba.mqtt.server.callback.MessageListener;
    import com.alibaba.mqtt.server.config.ChannelConfig;
    import com.alibaba.mqtt.server.config.ConsumerConfig;
    import com.alibaba.mqtt.server.model.MessageProperties;
    
    public class MQTTConsumerDemo {
        public static void main(String[] args) throws Exception {
            /**
             * The endpoint for the cloud SDK. For information about the format of the endpoint, see the "Endpoints" section of this topic. 
             * Use a domain name instead of an IP address as the endpoint. Otherwise, a broker exception may occur. 
             */
            String domain = "post-cn-jaj3h8i****.mqtt.aliyuncs.com";
    
            /**
             * The port that is used by the cloud SDK. The protocol and port that are used by the cloud SDK must match. Set the value to 5672. 
             */
            int port = "5672";
    
            /**
             * The ID of the ApsaraMQ for MQTT instance that you created. 
             */
            String instanceId = "post-cn-jaj3h8i****";
    
            /**
             * The AccessKey ID that you created in the Alibaba Cloud RAM console for identity authentication. 
             * The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. To avoid security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. 
             * We strongly recommend that you do not save an AccessKey pair in the project code. Otherwise, the AccessKey pair may be leaked and all resources that are contained in your account may be exposed to potential security risks. 
             * In this example, the AccessKey pair is saved in the environment variables. 
             */
            String accessKey = System.getenv("MQTT_AK_ENV");
            /**
             * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. The AccessKey secret is required only for signature authentication. 
             */
            String secretKey = System.getenv("MQTT_SK_ENV");
    
            /**
             * The parent topic that you created in the ApsaraMQ for MQTT console. 
             * In most cases, you use the cloud SDK to subscribe to messages in scenarios in which cloud applications collect and analyze messages. As a result, you cannot specify subtopics when you use the cloud SDK to subscribe to messages. 
             * If you specify a topic that does not exist or a topic that the ApsaraMQ for MQTT client is not authorized to access, the ApsaraMQ for MQTT broker closes the connection. 
             */
            String firstTopic = "firstTopic";
    
            ChannelConfig channelConfig = new ChannelConfig();
            channelConfig.setDomain(domain);
            channelConfig.setPort(port);
            channelConfig.setInstanceId(instanceId);
            channelConfig.setAccessKey(accessKey);
            channelConfig.setSecretKey(secretKey);
    
            ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig());
            serverConsumer.start();
            serverConsumer.subscribeTopic(firstTopic, new MessageListener() {
                @Override
                public void process(String msgId, MessageProperties messageProperties, byte[] payload) {
                    System.out.println("Receive:" + msgId + "," + JSONObject.toJSONString(messageProperties) + "," + new String(payload));
                }
            });
        }
    
    }
    Note

    For information about the sample code used to send messages by using a cloud SDK, see MQTTProducerDemo.java.