All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Use the SDK for Java to send and receive messages

Last Updated:Oct 28, 2024

This topic describes how to use ApsaraMQ for RabbitMQ to quickly send and receive messages.

Procedure

image

Step 1: (Optional) Grant permissions to a RAM user

By default, Resource Access Management (RAM) users do not have the permissions to perform operations on ApsaraMQ for RabbitMQ resources. If you use a RAM user, you must grant the required permissions to the RAM user before you can use ApsaraMQ for RabbitMQ. If you use an Alibaba Cloud account, you have all permissions on ApsaraMQ for RabbitMQ by default.

For information about how to grant permissions to a RAM user, see Grant permissions to a RAM user.

Step 2: Create resources

Create an instance

In ApsaraMQ for RabbitMQ, an instance is an independent resource entity that includes basic resources such as vhosts, exchanges, and queues.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select a region. On the Instances page, click Create Instance.

  3. On the buy page, follow the on-screen instructions to complete the configurations, read and select the terms of service, and then click Buy Now.

  4. Follow the on-screen instructions to complete the payment.

    In the top navigation bar of the Instances page, select the region where the instance was created. On the page that appears, view the instance that you created.

    Note
    • After you purchase a Professional Edition or Enterprise Edition instance, the instance immediately enters the Running state.

    • After you purchase an Enterprise Platinum Edition instance, the instance enters the Deploying state. After the instance is deployed, the instance enters the Running state.

Obtain the endpoint of the instance

Before you send and receive messages, you must specify the endpoint that a producer or a consumer can use to access the ApsaraMQ for RabbitMQ instance.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the 复制 icon next to the endpoint to copy the endpoint.

    Type

    Description

    Example

    Public endpoint

    You can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use the public endpoint of a subscription instance, you must enable Internet access when you create the subscription instance.

    XXX.net.mq.amqp.aliyuncs.com

    VPC endpoint

    You can access an instance in a virtual private cloud (VPC) to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints.

    XXX.vpc.mq.amqp.aliyuncs.com

Create a vhost

A vhost is used to logically isolate resources. Each vhost manages its own exchanges, queues, and bindings. Applications can run on independent vhosts in a secure manner. This way, the business of an application is not affected by other applications. An instance can contain multiple vhosts, and a vhost can contain multiple exchanges and queues. You must specify a vhost before you connect a producer or consumer to ApsaraMQ for RabbitMQ.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click vhosts.

  4. On the vhosts page, click Create vhost.

  5. In the Create vhost panel, enter a vhost name in the vhost Name field and click OK.

Create an exchange

An exchange is used to route a message that is sent by a producer to one or more queues or to discard the message based on the routing key. Different routing rules are used for different types of exchanges. For more information, see Exchange.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Exchanges.

  4. On the Exchanges page, click the drop-down arrow next to vhost. From the Change drop-down list, select the vhost in which you want to create an exchange. Then, click Create Exchange.

  5. In the Create Exchange panel, configure the Exchange Name, Type, and Internal parameters. Then, click OK. The following table describes the parameters.

    Parameter

    Description

    Exchange Name

    The exchange name. amq. is a reserved field and cannot be used as the prefix of an exchange name. For example, you cannot use amq.test as the name of an exchange.

    Type

    The exchange type. Valid values:

    • direct: An exchange of this type routes a message to the queue whose routing key is the same as the routing key of the message.

    • topic: An exchange of this type is similar to an exchange of the direct type. However, the routing rule for a topic exchange is less demanding than that for a direct exchange. A topic exchange routes a message to one or more bound queues based on the result of fuzzy match or multi-condition match between the routing key of the message and the routing keys that are used to bind the queues to the exchange.

    • fanout: An exchange of this type routes all received messages to all queues that are bound to the exchange. A fanout exchange works in the similar way to the broadcasting feature.

    • headers: An exchange of this type is similar to an exchange of the direct type. The only difference between a headers exchange and a direct exchange is that a headers exchange routes messages based on the headers attributes instead of routing keys. When you bind a headers exchange to a queue, you must configure binding attributes in the key-value format. When you send a message to a headers exchange, you must configure the headers attributes in the key-value format for the message. After a headers exchange receives a message, the exchange routes the message based on the matching result between the headers attributes of the message and the binding attributes of the bound queue.

    • x-delayed-message: You can declare an exchange of this type and configure the x-delay header attribute of a message to specify the period of time after which the message is sent. The period of time is in milliseconds. The message is sent to the corresponding queue after the period of time specified by the x-delay header attribute elapses. The exchange routes messages based on the routing rule that corresponds to the exchange type specified by x-delayed-type.

    • x-consistent-hash: An exchange of this type allows you to perform hash calculation on routing keys or header values and use consistent hashing to route a message to different queues.

    x-delayed-type

    If you set the Type parameter to x-delayed-message, you must configure this parameter to specify the routing rule for the exchange.

    Hash value

    If you set the Type parameter to x-consistent-hash, you must configure this parameter to specify the type of the input value for hash calculation. Valid values:

    • RoutingKey

    • Header Value: If you use headers as the input value for hash calculation, you must specify the value for the hash-header parameter.

    hash-header

    If you set the Type parameter to x-consistent-hash and the Hash value parameter to Header Value, you must configure this parameter as the input value for hash calculation.

    Internal

    Specifies whether the exchange is an internal exchange. Default value: No. Valid values:

    • Yes: The exchange is an internal exchange and is bound to another exchange.

    • No: The exchange is not an internal exchange and is bound to a queue.

Create a queue

In ApsaraMQ for RabbitMQ, a queue is a message queue. All messages from ApsaraMQ for RabbitMQ are delivered to one or more queues.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Queues.

  4. On the Queues page, click the drop-down arrow next to vhost. From the Change drop-down list, select the vhost on which you want to create a queue. Then, click Create Queue.

  5. In the Create Queue panel, enter a queue name in the Queue Name field and configure the Auto Delete parameter. Click Advanced Settings and configure the queue parameters that are displayed. Then, click OK.

    Table 1. Parameters

    Parameter

    Description

    Usage notes

    Queue Name

    The queue name.

    • The name can contain letters, digits, hyphens (-), underscores (_), periods (.), number signs (#), forward slashes (/), and at signs (@).

    • The name must be 1 to 255 characters in length.

    • After a queue is created, you cannot change its name. If you want to change the name of a queue, delete the queue and create another queue.

    • amq. is a reserved field and cannot be used as the prefix of an exchange name. For example, you cannot use amq.test as the name of an exchange.

    Auto Delete

    Specifies whether the queue is automatically deleted after the last subscription from consumers to the queue is canceled.

    • true: The queue is automatically deleted after the last subscription from consumers to this queue is canceled.

    • false: The queue is not automatically deleted after the last subscription from consumers to this queue is canceled.

    Advanced Settings

    Other parameters for the queue, such as the dead-letter exchange, dead-letter routing key, and message time-to-live (TTL).

    • DeadLetterExchange: the exchange to which dead-letter messages are delivered.

    • DeadLetterRoutingKey: the routing key of dead-letter messages. A dead-letter exchange sends dead-letter messages to the queue whose routing key matches the routing key of the dead-letter messages.

    • MessageTTL: the message TTL, in milliseconds. A message that is not consumed within the specified message TTL is a dead-letter message and is sent to a dead-letter exchange. For more information, see Message TTL.

Create a binding

If you bind an exchange to a queue, messages in the exchange are routed to the queue based on the specified routing rule.

  1. On the Queues page, find the queue that you want to manage and click Details in the Actions column.

  2. On the Queue Details page, click the Bound as Destination tab. Then, click Add Binding.

  3. In the Add Binding panel, configure the Source Exchange and Routing Key parameters. Then, click Confirm.

    Note

    If the bound exchange is of the x-consistent-hash type, the routing key indicates the weight of the queue. The value of a weight must be an integer that ranges from 1 to 20.

Create a pair of username and password

When you access an ApsaraMQ for RabbitMQ broker from an open source RabbitMQ client, you must specify the username and password for authentication. You can access the ApsaraMQ for RabbitMQ broker only after the authentication is passed.

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Static Accounts.

  4. On the Static Accounts page, click Create Username/Password.

  5. In the Create Username/Password panel, configure the AccessKey ID and AccessKey Secret parameters and click OK.

    Note

    You can obtain the values of the AccessKey ID and AccessKey Secret parameters in the RAM console. For more information, see Create an AccessKey pair.

    On the Static Accounts page, the created pair of static username and password appears. The password is masked.用户名密码

  6. In the Password column of the created pair of static username and password, click Display to view the password.

Step 3: Use an SDK to send and receive messages

Note

You must install IntelliJ IDEA, Java Development Kit, and Maven in advance. You can use IntelliJ IDEA or Eclipse. In the examples of this topic, IntelliJ IDEA Ultimate is used.

Install the Java dependency

  1. Create a Java project in IntelliJ IDEA.

  2. Add the following dependency to the pom.xml file to import the Java dependency library:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
    </dependency>

Produce messages

In the Java project that you created, create a producer program, configure the relevant parameters based on the description in SDK parameters, and then run the program.

Sample code:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        String hostName = "xxx.xxx.aliyuncs.com";
        // The static username and password of the ApsaraMQ for RabbitMQ instance. 
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        // The vhost of the ApsaraMQ for RabbitMQ instance. 
        String virtualHost = "${VirtualHost}";

        // In production environments, we recommend that you create the connection in advance and reuse it when needed. This prevents frequent establishment and closing of connections and improves system performance and stability. 
        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        Channel channel = connection.createChannel();

        // The binding between the exchange and the queue. 
        String exchangeName = "${ExchangeName}";
        String queueName = "${QueueName}";
        String bindingKey = "${BindingKey}";
        // The exchange type. 
        String exchangeType = "${ExchangeType}";

        // To ensure user experience, make sure that the exchange and the queue are created by using a suitable method. 
        // In production environments, we recommend that you create the exchange and the queue in the ApsaraMQ for RabbitMQ console in advance. Do not declare the exchange and the queue in the code. Otherwise, throttling is triggered for the corresponding API operation. 
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(queueName, exchangeName, bindingKey);
        // Send a message. 
        for (int i = 0; i < 10; i++  ) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish(exchangeName, bindingKey, true, props,
                    ("Sample message body-" + i).getBytes(StandardCharsets.UTF_8));
            System.out.println("[SendResult] Message sent successfully, messageId: " + props.getMessageId() + ", exchange: " + exchangeName + ", routingKey: " + bindingKey);
        }
        channel.close();
        connection.close();
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. Port 5672 is used for non-encrypted connections. Port 5671 is used for encrypted connections. 
        factory.setPort(5672);
        // The timeout period. Configure this parameter based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    }
}
Note

Throttling can be triggered for an ApsaraMQ for RabbitMQ instance based on the peak transactions per second (TPS) of the instance. For more information, see Best practices for instance throttling.

Subscribe to messages

In the Java project that you created, create a consumer program, configure the relevant parameters based on the description in SDK parameters, and then run the program.

Sample code:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        String hostName = "xxx.xxx.aliyuncs.com";
        // The static username and password of the ApsaraMQ for RabbitMQ instance. 
        String userName = "${UserName}";
        String passWord = "${PassWord}";
        // The vhost of the ApsaraMQ for RabbitMQ instance. 
        String virtualHost = "${VirtualHost}";

        Connection connection = createConnection(hostName, userName, passWord, virtualHost);
        final Channel channel = connection.createChannel();

        // Declare the queue. 
        String queueName = "${QueueName}";
        // Create the ${QueueName} queue. Make sure that the queue exists in the ApsaraMQ for RabbitMQ console. 
        AMQP.Queue.DeclareOk queueDeclareOk = channel.queueDeclare(queueName, true, false, false, new HashMap<String, Object>());

        // Consume a message. 
        channel.basicConsume(queueName, false, "ConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
            throws IOException {
                // Process the received message based on the message consumption logic. 
                System.out.println("Received: "  + new String(body, StandardCharsets.UTF_8) +  ", deliveryTag: "  + envelope.getDeliveryTag()  + ", messageId: " +  properties.getMessageId());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

    public static Connection createConnection(String hostName, String userName, String passWord, String virtualHost) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(hostName);
        factory.setUsername(userName);
        factory.setPassword(passWord);
        // Specify whether to enable automatic connection recovery. If you set this parameter to true, automatic connection recovery is enabled. If you set this parameter to false, automatic connection recovery is disabled. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        factory.setVirtualHost(virtualHost);
        // The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
        factory.setPort(5672);
        factory.setConnectionTimeout(300 * 1000);
        factory.setHandshakeTimeout(300 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        return connection;
    };
}

SDK parameters

Parameter

Example

Description

hostName

XXX.net.mq.amqp.aliyuncs.com

The endpoint of the ApsaraMQ for RabbitMQ instance. For information about how to obtain the endpoint, see Step 2: Create resources.

Port

5672

The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

userName

MjoxODgwNzcwODY5MD****

The static username that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static username in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

passWord

NDAxREVDQzI2MjA0OT****

The static password that is used for permission authentication when you connect the client to the ApsaraMQ for RabbitMQ broker.

You must create the static password in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

virtualHost

amqp_vhost

The vhost that you created on the ApsaraMQ for RabbitMQ instance. You must create the vhost in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

exchangeName

ExchangeTest

The exchange that you created on the ApsaraMQ for RabbitMQ instance.

You must create the exchange in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

queueName

QueueTest

The queue that you created on the ApsaraMQ for RabbitMQ instance.

You must create the queue in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

bindingKey

BindingKeyTest

The binding key that is used to bind an ApsaraMQ for RabbitMQ exchange to the queue.

You must create the binding in the ApsaraMQ for RabbitMQ console in advance.

For more information, see Step 2: Create resources.

exchangeType

topic

The exchange type. ApsaraMQ for RabbitMQ supports the following types of exchanges. For more information, see Exchange.

  • direct

  • topic

  • fanout

  • headers

  • x-jms-topic

  • x-delayed-message

  • x-consistent-hash

Important

Make sure that the exchange type that you specify is the same as the exchange type that you selected when you created the exchange.

Step 4: Configure alerting

You can create alert rules by using CloudMonitor to monitor the usage and status of ApsaraMQ for RabbitMQ. If resource metrics meet specific alert conditions, alerts are triggered and CloudMonitor sends alert notifications. This way, you can identify and handle monitoring data exceptions at the earliest opportunity. For more information, see Monitoring and alerting. For information about metrics provided by Managed Service for Prometheus and Managed Service for Grafana, see Dashboard.