All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Send and receive messages by using SDKs integrated with the Spring framework

Last Updated:Nov 06, 2024

ApsaraMQ for RabbitMQ supports SDKs that are integrated with the Spring framework. This topic describes how to connect to an ApsaraMQ for RabbitMQ instance by using an SDK that is integrated with the Spring framework to send and receive messages.

Prerequisites

Demo project

To download the demo project, click SpringBootDemo .

Step 1: Configure parameters

Configure parameters in the application.properties or application.yml file. In the following example, parameters are configured in the application.properties file.

# The endpoint of the ApsaraMQ for RabbitMQ instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# The port that is used to connect to the ApsaraMQ for RabbitMQ instance. 
spring.rabbitmq.port=5672
# The static username of the ApsaraMQ for RabbitMQ instance. You can obtain the static username on the Static Accounts page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.username=******
# The static password of the ApsaraMQ for RabbitMQ instance. You can obtain the static password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.password=******
# The vhost of the ApsaraMQ for RabbitMQ instance. vhosts are logically isolated from each other. You can obtain the vhost name on the vhosts page in the ApsaraMQ for RabbitMQ console.  
spring.rabbitmq.virtual-host=test_vhost
# The mode that is used to acknowledge messages. 
# 1. none: After a consumer receives a message, the broker considers that the message is successfully processed, regardless of whether the consumption is successful. This mode is equivalent to the autoAck mode in RabbitMQ. 
# 2. auto: After a message is successfully consumed, the client proactively returns an acknowledgment (ACK) without the need to explicitly call the Channel.basicAck() method. If the message fails to be processed, the client returns a negative acknowledgment (NACK) or throws an exception. 
# 3. manual: After a message is successfully consumed, you must call the Channel.basicAck() method to manually return an ACK. 
spring.rabbitmq.listener.simple.acknowledge-mode=manual

# The Quality of Service (QoS) policy that is used to limit the maximum number of unacknowledged messages that a consumer can process at a time. ApsaraMQ for RabbitMQ brokers use min{prefetch, 100} as the value of the QoS policy. 
spring.rabbitmq.listener.simple.prefetch=10
# The minimum number of concurrent consumers on the listener of the ApsaraMQ for RabbitMQ instance. 
spring.rabbitmq.listener.simple.concurrency=2
# The maximum number of concurrent consumers on the listener of the ApsaraMQ for RabbitMQ instance. If the message consumption rate reaches a specific value, the client launches a number of consumers based on the value that you specify for this parameter to consume messages. 
spring.rabbitmq.listener.simple.max-concurrency=5

The parameters described in the following table are optional.

Optional parameters

Parameter

Description

spring.rabbitmq.addresses

The IP addresses of the brokers to which the client is connected. Separate multiple IP addresses with commas (,).

If you configure the spring.rabbitmq.host parameter and the spring.rabbitmq.addresses parameter at the same time, the value of the spring.rabbitmq.addresses parameter takes effect.

spring.rabbitmq.dynamic

Specify whether to create AmqpAdmin bean. Default value: true.

spring.rabbitmq.connection-timeout

The timeout period of connections. Unit: milliseconds. The value 0 specifies that connections never time out.

spring.rabbitmq.requested-heartbeat

The timeout period of heartbeats. Unit: seconds. Default value: 60.

spring.rabbitmq.publisher-confirms

Specify whether to enable the confirmation mechanism for message publishing.

spring.rabbitmq.publisher-returns

Specify whether to enable the return mechanism for message publishing.

spring.rabbitmq.ssl.enabled

Specify whether to enable Secure Sockets Layer (SSL)-based authentication.

spring.rabbitmq.ssl.key-store

The path of the key store in which the SSL certificate is stored.

spring.rabbitmq.ssl.key-store-password

The password that is used to access the key store.

spring.rabbitmq.ssl.trust-store

The trusted IP address of the SSL certificate.

spring.rabbitmq.ssl.trust-store-password

The password that is used to access the trusted IP address of the SSL certificate.

spring.rabbitmq.ssl.algorithm

The algorithm that is used by SSL. Example: TLSv1.2.

spring.rabbitmq.ssl.validate-server-certificate

Specify whether to enable certificate authentication on brokers.

spring.rabbitmq.ssl.verify-hostname

Specify whether to enable host verification.

spring.rabbitmq.cache.channel.size

The number of channels in the cache.

spring.rabbitmq.cache.channel.checkout-timeout

The timeout period for obtaining a channel from the cache if the specified number of channels in the cache is reached.

Unit: milliseconds. The value 0 specifies that a new channel is created.

spring.rabbitmq.cache.connection.size

The number of connections in the cache. This parameter takes effect only if you set the spring.rabbitmq.cache.connection.mode parameter to CONNECTION.

spring.rabbitmq.cache.connection.mode

The cache mode of connections. Valid values:

  • CHANNEL

  • CONNECTION

We recommend that you set this parameter to CONNECTION.

spring.rabbitmq.listener.type

The type of the listener. Valid values:

  • simple

  • direct

Default value: simple.

spring.rabbitmq.listener.simple.auto-startup

Specify whether to automatically start the listener when the application is started. Default value: true.

spring.rabbitmq.listener.simple.acknowledge-mode

The mode that is used to acknowledge messages. Valid values:

  • none: After a consumer receives a message, the broker considers that the message is successfully processed, regardless of whether the consumption is successful. This mode is equivalent to the autoAck mode in RabbitMQ.

  • manual: After a message is successfully consumed, you must explicitly call the Basic.ack method to manually return an ACK.

  • auto: After a message is successfully consumed, the client proactively returns an ACK without the need to explicitly call the Basic.ack method. If the message fails to be processed, the client returns a NACK or throws an exception.

Default value: auto.

spring.rabbitmq.listener.simple.concurrency

The minimum number of consumers.

spring.rabbitmq.listener.simple.max-concurrency

The maximum number of consumers.

spring.rabbitmq.listener.simple.prefetch

The maximum number of unacknowledged messages that a consumer can process at a time. This parameter achieves the same result as the setting of QoS by calling the Basic.qos method. If transactions exist, the value of this parameter must be greater than the number of transactions.

spring.rabbitmq.listener.simple.transaction-size

The number of messages processed by transactions.

spring.rabbitmq.listener.simple.default-requeue-rejected

Specify whether rejected messages are redelivered to the queue. Default value: true.

spring.rabbitmq.listener.simple.missing-queues-fatal

Specify whether to fail the listener if the queues that are declared by the listener are not available on the broker or whether to stop the listener if one or more queues are deleted in the runtime. Default value: true.

spring.rabbitmq.listener.simple.idle-event-interval

The interval at which idle listener events are published. Unit: milliseconds.

spring.rabbitmq.template.mandatory

Specify whether to enable mandatory information. Default value: false.

spring.rabbitmq.template.receive-timeout

The timeout period for the receive() method.

spring.rabbitmq.template.reply-timeout

The timeout period for the sendAndReceive() method.

Step 2: Use an SDK to send and receive messages

Create connections

Recommended cache mode

When you establish a connection between a client and a broker, we recommend that you specify the cache mode as CONNECTION.

In this mode, you can create multiple connections. The program caches a specific number of connections and each connection caches a specific number of channels.

ApsaraMQ for RabbitMQ adopts a distributed cluster architecture. In CONNECTION mode, you can create multiple connections to connect clients to multiple service nodes in a cluster to efficiently send and consume messages.

The following code provides an example on how to configure the cache mode:

// Specify CONNECTION as the cache mode. 
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// The maximum number of connections that can be cached in CONNECTION mode. 
connectionFactory.setConnectionCacheSize(10);
// The maximum number of channels that can be cached in CONNECTION mode. 
connectionFactory.setChannelCacheSize(64);

The following sample code provides an example on how to create connections:

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean
    public ConnectionFactory connectionFactory() 
    {
        // Initialize ConnectionFactory. 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        // The vhost. You can manually create the vhost in the ApsaraMQ for RabbitMQ console or use the following code to automatically create the vhost. 
        connectionFactory.setVirtualHost(virtualHost);
        // Specify whether to enable the automatic reconnection feature. Make sure that this feature is enabled. This way, the client can reconnect to the broker when messages are published on the broker. 
        connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
		    // The cache mode. We recommend that you set this parameter to CONNECTION. 
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
        // The maximum number of connections that can be cached in CONNECTION mode. 
        connectionFactory.setConnectionCacheSize(10);
        // The maximum number of channels that can be cached in CONNECTION mode. 
        connectionFactory.setChannelCacheSize(64);
        
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
    {
        // The RabbitMQ message template. The template encapsulates various message operations. 
        return new RabbitTemplate(connectionFactory);     
    }

}

Produce messages

Obtain RabbitMQTemplate by performing dependency injection in the RabbitMQService public class and call the send() method to send a message.

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Service
public class RabbitMQService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String content) {
        // Specify the message ID. 
        String msgId = UUID.randomUUID().toString();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(msgId);
        // Create the message. 
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        /*
         * Call the send() method to send the message.
         * exchange: the exchange name.
         * routingKey: the routing key.
         * message: the message content.
         * correlationData: used to correlate confirmations with the message publisher.
         */
        rabbitTemplate.send(exchange, routingKey, message, null);
    }
}

Consume messages

Use the @RabbitListener annotation to consume messages.

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class MessageListener {

     /**
     * Receive a message.
     * @param message: the message that you want to receive.
     * @param channel: the channel that you want to receive.
     * @throws IOException.
     * Replace queues with the name of the queue that you created.
     */
    @RabbitListener(queues = "myQueue")
    public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
        // Use the logic for message consumption. 
        ...
        // An ACK must be returned within the validity period of the ACK. Otherwise, the message is redelivered. The validity period of an ACK is the consumption timeout period. 
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

The following table describes optional parameters for RabbitListener.

Optional parameters

Parameter

Description

ackMode

The custom mode that is used to acknowledge messages. The value of this parameter overwrites the value of the spring.rabbitmq.listener.simple.acknowledge-mode parameter.

admin

Reference to AmqpAdmin to manage AMQP resources.

autoStartup

Specify whether to automatically start the listener when the application is started, the value of this parameter overwrites the value of the spring.rabbitmq.listener.simple.auto-startup parameter.

bindings

The bindings between queues and vSwitches.

concurrency

The number of concurrent threads on the listener.

errorHandler

The error handler that is used to process exceptions thrown by the listener.

exclusive

Specify whether to enable the exclusive mode for queues. If you enable the mode, only the specified consumer can consume messages from the queue. This parameter is not supported by ApsaraMQ for RabbitMQ.

queues

The queues on which the listener listens.

queuesToDeclare

The queues that must be explicitly declared.