ApsaraMQ for RabbitMQ supports SDKs that are integrated with the Spring framework. This topic describes how to connect to an ApsaraMQ for RabbitMQ instance by using an SDK that is integrated with the Spring framework to send and receive messages.
Prerequisites
Resources are created in the ApsaraMQ for RabbitMQ console. The resources include an instance, a vhost, an exchange, and a queue. For more information, see Step 2: Create resources.
IntelliJ IDEA is installed. For more information, see IntelliJ IDEA.
JDK 1.8 or later is installed. For more information, see Java Downloads.
Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.
Demo project
To download the demo project, click SpringBootDemo .
Step 1: Configure parameters
Configure parameters in the application.properties
or application.yml
file. In the following example, parameters are configured in the application.properties file.
# The endpoint of the ApsaraMQ for RabbitMQ instance. You can obtain the endpoint on the Instance Details page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# The port that is used to connect to the ApsaraMQ for RabbitMQ instance.
spring.rabbitmq.port=5672
# The static username of the ApsaraMQ for RabbitMQ instance. You can obtain the static username on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.username=******
# The static password of the ApsaraMQ for RabbitMQ instance. You can obtain the static password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.password=******
# The vhost of the ApsaraMQ for RabbitMQ instance. vhosts are logically isolated from each other. You can obtain the vhost name on the vhosts page in the ApsaraMQ for RabbitMQ console.
spring.rabbitmq.virtual-host=test_vhost
# The mode that is used to acknowledge messages.
# 1. none: After a consumer receives a message, the broker considers that the message is successfully processed, regardless of whether the consumption is successful. This mode is equivalent to the autoAck mode in RabbitMQ.
# 2. auto: After a message is successfully consumed, the client proactively returns an acknowledgment (ACK) without the need to explicitly call the Channel.basicAck() method. If the message fails to be processed, the client returns a negative acknowledgment (NACK) or throws an exception.
# 3. manual: After a message is successfully consumed, you must call the Channel.basicAck() method to manually return an ACK.
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# The Quality of Service (QoS) policy that is used to limit the maximum number of unacknowledged messages that a consumer can process at a time. ApsaraMQ for RabbitMQ brokers use min{prefetch, 100} as the value of the QoS policy.
spring.rabbitmq.listener.simple.prefetch=10
# The minimum number of concurrent consumers on the listener of the ApsaraMQ for RabbitMQ instance.
spring.rabbitmq.listener.simple.concurrency=2
# The maximum number of concurrent consumers on the listener of the ApsaraMQ for RabbitMQ instance. If the message consumption rate reaches a specific value, the client launches a number of consumers based on the value that you specify for this parameter to consume messages.
spring.rabbitmq.listener.simple.max-concurrency=5
The parameters described in the following table are optional.
Step 2: Use an SDK to send and receive messages
Create connections
Recommended cache mode
When you establish a connection between a client and a broker, we recommend that you specify the cache mode as CONNECTION.
In this mode, you can create multiple connections. The program caches a specific number of connections and each connection caches a specific number of channels.
ApsaraMQ for RabbitMQ adopts a distributed cluster architecture. In CONNECTION mode, you can create multiple connections to connect clients to multiple service nodes in a cluster to efficiently send and consume messages.
The following code provides an example on how to configure the cache mode:
// Specify CONNECTION as the cache mode.
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// The maximum number of connections that can be cached in CONNECTION mode.
connectionFactory.setConnectionCacheSize(10);
// The maximum number of channels that can be cached in CONNECTION mode.
connectionFactory.setChannelCacheSize(64);
The following sample code provides an example on how to create connections:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Bean
public ConnectionFactory connectionFactory()
{
// Initialize ConnectionFactory.
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// The vhost. You can manually create the vhost in the ApsaraMQ for RabbitMQ console or use the following code to automatically create the vhost.
connectionFactory.setVirtualHost(virtualHost);
// Specify whether to enable the automatic reconnection feature. Make sure that this feature is enabled. This way, the client can reconnect to the broker when messages are published on the broker.
connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
// The cache mode. We recommend that you set this parameter to CONNECTION.
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// The maximum number of connections that can be cached in CONNECTION mode.
connectionFactory.setConnectionCacheSize(10);
// The maximum number of channels that can be cached in CONNECTION mode.
connectionFactory.setChannelCacheSize(64);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
{
// The RabbitMQ message template. The template encapsulates various message operations.
return new RabbitTemplate(connectionFactory);
}
}
Produce messages
Obtain RabbitMQTemplate by performing dependency injection in the RabbitMQService public class and call the send() method to send a message.
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// Specify the message ID.
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// Create the message.
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* Call the send() method to send the message.
* exchange: the exchange name.
* routingKey: the routing key.
* message: the message content.
* correlationData: used to correlate confirmations with the message publisher.
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}
Consume messages
Use the @RabbitListener
annotation to consume messages.
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* Receive a message.
* @param message: the message that you want to receive.
* @param channel: the channel that you want to receive.
* @throws IOException.
* Replace queues with the name of the queue that you created.
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
// Use the logic for message consumption.
...
// An ACK must be returned within the validity period of the ACK. Otherwise, the message is redelivered. The validity period of an ACK is the consumption timeout period.
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
The following table describes optional parameters for RabbitListener
.