All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Step 3: Use SDKs to send and receive messages

Last Updated:Oct 28, 2024

This topic describes how to use an open source SDK to connect to an ApsaraMQ for RabbitMQ broker to send and receive messages. In this topic, the SDK for Java is used.

Before you start

Install Java dependency

  1. Create a Java project in IntelliJ IDEA.

  2. Add the following dependency to the pom.xml file to import the Java dependency library:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
    </dependency>

Produce messages

In the Java project that you created, create a producer program, configure the relevant parameters based on the description in SDK parameters, and then run the program.

Sample code:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        String hostName = "xxx.xxx.aliyuncs.com";
        // The static username and password of the ApsaraMQ for RabbitMQ instance. 
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        // The vhost of the ApsaraMQ for RabbitMQ instance. 
        String virtualHost = "${VirtualHost}";

        // In production environments, we recommend that you create the connection in advance and reuse it when needed. This prevents frequent establishment and closing of connections and improves system performance and stability. 
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        // The binding between the exchange and the queue. 
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String routingKey = "${RoutingKey}";
        // The exchange type. 
        String exchangeType = "${ExchangeType}";

        // To ensure user experience, make sure that the exchange and the queue are created by using a suitable method. 
        // In production environments, we recommend that you create the exchange and the queue in the ApsaraMQ for RabbitMQ console in advance. Do not declare the exchange or the queue in the code. Otherwise, throttling is triggered for the corresponding API operation. 
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, routingKey);
        // Send a message. 
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, routingKey, true, props,
                    ("Sample message body-" + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + routingKey);
        }
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5672);
        // The timeout period. Configure this parameter based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}
Note

Throttling can be triggered for an ApsaraMQ for RabbitMQ instance based on the peak transactions per second (TPS) of the instance. For more information, see Best practices for instance throttling.

Subscribe to messages

In the Java project that you created, create a consumer program, configure the relevant parameters based on the description in SDK parameters, and then run the program.

Sample code:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        String hostName = "xxx.xxx.aliyuncs.com";
        // The static username and password of the ApsaraMQ for RabbitMQ instance. 
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        // The vhost of the ApsaraMQ for RabbitMQ instance. 
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        // Declare the queue. 
        String queueName = "${QueueName}";
        // Create the ${QueueName} queue. Make sure that the queue exists in the ApsaraMQ for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        // Consume a message. 
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                // Process the received message based on the message consumption logic. 
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}

SDK parameters

Parameter

Example

Description

hostName

XXX.net.mq.amqp.aliyuncs.com

The endpoint of the ApsaraMQ for RabbitMQ instance. For information about how to obtain the endpoint, see Step 2: Create resources.

Port

5672

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

userName

MjoxODgwNzcwODY5MD****

The static username that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static username in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

passWord

NDAxREVDQzI2MjA0OT****

The static password that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static password in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

virtualHost

amqp_vhost

The vhost that you created on the ApsaraMQ for RabbitMQ instance. You must create the vhost in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

exchangeName

ExchangeTest

The exchange that you created on the ApsaraMQ for RabbitMQ instance.

You must create the exchange in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

queueName

QueueTest

The queue that you created on the ApsaraMQ for RabbitMQ instance.

You must create the queue in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

routingKey

RoutingKeyTest

The routing key that is used to bind the exchange to the queue in ApsaraMQ for RabbitMQ.

You must create the binding in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

exchangeType

topic

The exchange type. ApsaraMQ for RabbitMQ supports the following types of exchanges. For more information, see Exchange.

  • direct

  • topic

  • fanout

  • headers

  • x-delayed-message

  • x-consistent-hash

Important

Make sure that the exchange type that you specify is the same as the exchange type that you selected when you created the exchange.

References