All Products
Search
Document Center

ApsaraMQ for MQTT:Export data from ApsaraMQ for MQTT to ApsaraMQ for RocketMQ

Last Updated:Nov 07, 2024

If you want to use specific features of ApsaraMQ for RocketMQ, such as ordered messages and transactional messages, in cloud applications, you can use data inbound or outbound rules to exchange data between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ. This topic describes how to export data from ApsaraMQ for MQTT to ApsaraMQ for RocketMQ.

Background information

ApsaraMQ for MQTT supports cloud SDKs. You can connect cloud applications to ApsaraMQ for MQTT brokers to send and receive messages by using cloud SDKs. For information about how to use cloud SDKs, see Overview.

ApsaraMQ for MQTT also supports data exchange between ApsaraMQ for MQTT and other Alibaba Cloud services. Currently, you can exchange data only between ApsaraMQ for MQTT and ApsaraMQ for RocketMQ.

This topic describes how to export data from ApsaraMQ for MQTT to ApsaraMQ for RocketMQ over the Internet by using the SDK for Java.

In this scenario, you can use third-party open source SDKs for multiple programming languages to send and receive messages. For more information, see Download the SDK.

quick_start_data_outflow

Network access

ApsaraMQ for MQTT provides Public Endpoint and VPC Endpoint.

  • Public Endpoint is an IP address that is used to access ApsaraMQ for MQTT over the Internet. In most cases, public endpoints are used in the IoT and mobile Internet scenarios.

  • VPC Endpoint is an IP address that is used to access ApsaraMQ for MQTT in a private virtual cloud (VPC). In most cases, VPC endpoints are used by cloud applications to connect to ApsaraMQ for MQTT.

Important

If you want to use an endpoint to connect a client to ApsaraMQ for MQTT, use the domain name instead of the IP address because the IP address dynamically changes. The ApsaraMQ for MQTT technical team is not liable for faults and direct or indirect losses in the following scenarios:

  • You use an IP address to access your client to ApsaraMQ for MQTT. After the technical team of ApsaraMQ for MQTT updates the domain name resolution, the original IP address becomes invalid.

  • A firewall policy on IP addresses is set in the network in which your client is running. After the technical team of ApsaraMQ for MQTT updates the domain name resolution, new IP addresses are blocked due to the firewall policy.

Prerequisites

  • The integrated development environment (IDE) is installed. For more information, see IDE. You can use IntelliJ IDEA or Eclipse. In the example, IntelliJ IDEA is used.

  • The Java Development Kit (JDK) is installed. For more information, see JDK.

  • An ApsaraMQ for MQTT instance is created, and a topic and a group are created on the instance. For more information, see Create resources.

  • An ApsaraMQ for RocketMQ instance is created, and a topic and a group are created on the instance. For more information, see Step 2: Create resources.

Important
  • You can use the data outbound rules of ApsaraMQ for MQTT to export data only to ApsaraMQ for RocketMQ 4.x instances.

  • You cannot use the data outbound rules of ApsaraMQ for MQTT across regions. When you create a data outbound rule, make sure that the ApsaraMQ for MQTT and ApsaraMQ for RocketMQ resources reside in the same region.

1. Create a data outbound rule

  1. Log on to the ApsaraMQ for MQTT console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select the region where the instance that you want to manage resides. On the Instances page, click the instance name to go to the Instance Details page.

  3. In the left-side navigation pane, click Rules. In the upper-left corner of the Rules page, click Create Rule.

  4. In the Create Rule wizard, perform the following steps:

    1. In the Configure Basic Information step, enter a rule ID and select Data Outbound for the Rule Type parameter.

      image

    2. In the Configure Rule Source step, select a created topic on the ApsaraMQ for MQTT instance.

      image

    3. In the Configure Rule Destination step, select a created ApsaraMQ for RocketMQ instance and a created topic on the instance.

      image

2. Prepare the test code

2.1 Download the sample code

  1. Download the mqtt-java-demo demo project and decompress the demo project package to a folder on your on-premises machine.

  2. In the decompressed demo project, find the lmq-java-demo folder, import the folder to IntelliJ IDEA, and then confirm whether the following dependencies are included in the pom.xml file:

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. Configure access credentials.

    • Obtain an AccessKey pair. For information about how to obtain an AccessKey pair, see Create an AccessKey pair.

    • Configure the environment variables. The environment variable name of the AccessKey ID that is used to access ApsaraMQ for MQTT is MQTT_AK_ENV, and the environment variable name of the AccessKey secret that is used to access ApsaraMQ for MQTT is MQTT_SK_ENV. For information about how to configure environment variables, see Configure an access credential.

2.2 Configure the code for messaging

The MQ4IoTSendMessageToRocketMQ.java class contains the code for sending messages by using ApsaraMQ for MQTT and receiving messages by using ApsaraMQ for RocketMQ. You must specify the parameters for ApsaraMQ for MQTT and ApsaraMQ for RocketMQ resources based on the comments in the code.

Sample code for messaging

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToRocketMQ {
    public static void main(String[] args) throws Exception {
        /**
         * Initialize the ApsaraMQ for RocketMQ client as a receiver. In most business scenarios, the receiver is deployed on a backend application. 
         */
        Properties properties = new Properties();
        /**
         * The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        
        /**
         * The AccessKey ID that you created in the Alibaba Cloud Resource Access Management (RAM) console for identity authentication. 
         * The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. To prevent security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. 
         * We strongly recommend that you do not save an AccessKey pair in the project code. Otherwise, the AccessKey pair may be leaked and all resources in your account may be exposed to potential security risks. 
         * In this example, the AccessKey ID and AccessKey secret are stored in environment variables. 
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. This parameter is required only if you use the signature authentication mode. 
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
      
         /**
         * The TCP endpoint that you want to use to access the ApsaraMQ for RocketMQ instance. You can obtain the TCP endpoint on the Instance Details page in the ApsaraMQ for RocketMQ console. 
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
        /**
         * The topic that you created in the ApsaraMQ for RocketMQ console. 
         * When you exchange data between ApsaraMQ for RocketMQ and ApsaraMQ for MQTT, only parent topics can be used by the ApsaraMQ for RocketMQ client. 
         */
        final String parentTopic = "XXXXX";
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(parentTopic, "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext consumeContext) {
                System.out.println("recv msg:" + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * Initialize the ApsaraMQ for MQTT client as a sender. In most business scenarios, the sender is deployed on a mobile terminal. 
         */

        /**
         * The ID of the ApsaraMQ for MQTT instance that you created in the console. 
         */
        String instanceId = "XXXXX";
         /**
         * The endpoint that you want to use to access the ApsaraMQ for MQTT instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for MQTT console. 
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * The AccessKey ID that you created in the Alibaba Cloud RAM console for identity authentication. 
         * The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. To prevent security risks, we recommend that you use a RAM user to call API operations or perform routine O&M. 
         * We strongly recommend that you do not save an AccessKey pair in the project code. Otherwise, the AccessKey pair may be leaked and all resources in your account may be exposed to potential security risks. 
         * In this example, the AccessKey ID and AccessKey secret are stored in environment variables. 
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * The AccessKey secret that you created in the Alibaba Cloud RAM console for identity authentication. This parameter is required only if you use the signature authentication mode. 
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * The globally unique ID that the system assigns to the ApsaraMQ for MQTT client. The client ID must vary based on the TCP connection. If multiple TCP connections use the same client ID, exceptions occur and the connections are unexpectedly closed. 
         * The client ID consists of a group ID and a device ID and is in the GroupID@@@DeviceID format. The group ID is the ID of the group that you created in the ApsaraMQ for MQTT console. The device ID is a custom ID that you specify. The client ID cannot exceed 64 characters in length. 
         */
        String clientId = "GID_XXXX@@@XXXXX";
       /**
         * ApsaraMQ for MQTT allows you to use a subtopic to filter messages. You can specify a string as the name of the subtopic. 
         * The value of the mq4IotTopic parameter can be up to 128 characters in length. 
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * The quality of service (QoS) level for message transmission. Valid values: 0, 1, and 2. For more information, see Terms. 
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
         /**
         * The protocol and port that are used by the ApsaraMQ for MQTT client. The protocol and port that are used by the ApsaraMQ for MQTT client must match. If Secure Sockets Layer (SSL) encryption is used, specify ssl://endpoint:8883 as the protocol and port. 
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * The timeout period during which the ApsaraMQ for MQTT client waits for a response. The timeout period prevents the ApsaraMQ for MQTT client from waiting for a response for an indefinite period of time. 
         */
        mqttClient.setTimeToWait(5000);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * The topic to which the consumer must subscribe at the earliest opportunity after the client connection is established. 
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             * The topic to which messages are sent. If the messages that are sent are normal messages, the topic must be the topic to which the consumer subscribes or the topic that can be matched by using wildcards. 
             */
            mqttClient.publish(mq4IotTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3. Verify the result

You can execute the main function in the MQ4IoTSendMessageToRocketMQ.java class and then use one of the following methods to verify the sending and consumption of a message.

Use code

If code similar to the code in the following figure is displayed, the message is sent by ApsaraMQ for MQTT and consumed by ApsaraMQ for RocketMQ.

image

Use the console

  • Check whether the message is sent. Go to the instance details page in the ApsaraMQ for MQTT console and then click Message Trace Query in the left-side navigation pane. On the Message Trace Query page, check whether the message is sent by using the group ID and device ID.

    image

  • Check whether the message is consumed. Go to the instance details page in the ApsaraMQ for RocketMQ console and then click Message Query in the left-side navigation pane. On the Message Query page, check whether the message is forwarded to ApsaraMQ for RocketMQ based on the topic, as shown in the following figure.

    image

    Click Message Trace in the Actions column of the queried message. If information similar to the information in the following figure is displayed, the message is consumed.

    image

References