Broker upgrades, broker restarts, and network jitters can disconnect a client from the broker. Automatic connection recovery detects these failures and reconnects the client without manual intervention. This guide covers Java, Python, and PHP client configurations with sample code.
Recovery triggers
Automatic connection recovery is triggered when any of the following events occur:
An I/O exception is thrown.
A socket read operation times out.
Missed server heartbeats are detected.
Configure recovery in Java
In Java client (amqp-client) 4.0.0 and later, automatic connection recovery and automatic topology recovery are enabled by default. No code configuration is needed.
Use the following methods to enable automatic connection recovery and automatic topology recovery. Topology recovery includes actions on queues, exchanges, bindings, and consumers.
| amqp-client | Spring AMQP | Description |
|---|---|---|
factory.setAutomaticRecoveryEnabled(boolean) | connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(boolean) | Enable or disable automatic connection recovery. |
factory.setNetworkRecoveryInterval(long) | connectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(long) | Set the interval between consecutive retries. If recovery fails, the client retries after the specified interval. Default: 5 seconds. |
factory.setTopologyRecoveryEnabled(boolean) | connectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(boolean) | Enable or disable automatic topology recovery. Topology recovery includes actions on queues, exchanges, bindings, and consumers. |
amqp-client sample code
The following consumer client enables automatic connection recovery and topology recovery:
ConnectionFactory factory = new ConnectionFactory();
// The endpoint. You can obtain the endpoint of an instance on the Instance Details page in the ApsaraMQ for RabbitMQ console.
factory.setHost("xxx.xxx.aliyuncs.com");
// Replace ${instanceId} with the ID of the ApsaraMQ for RabbitMQ instance. You can obtain the instance ID on the Instances page in the ApsaraMQ for RabbitMQ console.
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
// The virtual host name. Make sure that the virtual host is created in the ApsaraMQ for RabbitMQ console.
factory.setVirtualHost("${VhostName}");
// The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.
factory.setPort(5672);
// The timeout period. Set the value based on the network environment.
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// Specify whether to enable automatic connection recovery.
factory.setAutomaticRecoveryEnabled(true);
// The retry interval. Set the value to 10 seconds.
factory.setNetworkRecoveryInterval(10000);
// Specify whether to enable automatic topology recovery.
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection();Spring AMQP sample code
The following consumer client enables automatic connection recovery and topology recovery. For complete sample code, see SprintBootDemo.
// Initialize a ConnectionFactory object to connect to the RabbitMQ client.
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// The virtual host. You can create a virtual host in the ApsaraMQ for RabbitMQ console or specify the following parameter to automatically create one.
connectionFactory.setVirtualHost(virtualHost);
// Make sure that automatic reconnection is enabled. This way, the client can reconnect to the broker during broker restart.
connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
connectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(10000);
connectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true);
// The cache mode. Set this parameter to CONNECTION.
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// The maximum number of connections that can be cached in CONNECTION mode.
connectionFactory.setConnectionCacheSize(10);
// The maximum number of channels that can be cached in the CONNECTION mode.
connectionFactory.setChannelCacheSize(64);
return connectionFactory;Configure recovery in Python
Pika is a Python client library recommended by open source RabbitMQ. Pika does not support automatic connection recovery through parameters. To implement automatic connection recovery, manually write a callback function.
The following consumer client uses Pika to implement automatic connection recovery:
# -*- coding: utf-8 -*-
import logging
import time
import pika
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
class Consumer(object):
def __init__(self, amqp_url, queue):
self.should_reconnect = False
self._connection = None
self._channel = None
self._closing = False
self._url = amqp_url
self._queue = queue
def connect(self):
'''
Create a connection and configure the following callbacks:
on_open_callback: the callback that is invoked when a connection is created.
on_open_error_callback: the callback that is invoked when a connection fails to be created.
on_close_callback: the callback that is invoked when a connection is closed.
'''
return pika.SelectConnection(
parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, _unused_connection):
'''
The callback that is invoked when a connection is created.
Create a channel and configure the following callback:
on_channel_open: the callback that is invoked when a channel is created.
'''
self._connection.channel(on_open_callback=self.on_channel_open)
def on_connection_open_error(self, _unused_connection, err):
"""
The callback that is invoked when a connection fails to be created.
Print the error message and recreate a connection.
"""
LOGGER.error('Connection open failed: %s', err)
self.reconnect()
def on_connection_closed(self, _unused_connection, reason):
"""
The callback that is invoked when a connection is closed.
The following scenarios may occur:
1. The connection is closed as expected and the client exits.
2. The client is unexpectedly disconnected and tries to recreate a connection.
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reconnect necessary: %s', reason)
self.reconnect()
def close_connection(self):
"""
Close the connection.
"""
if self._connection.is_closing or self._connection.is_closed:
LOGGER.info('Connection is closing or already closed')
else:
LOGGER.info('Closing connection')
self._connection.close()
def reconnect(self):
"""
Set the self.should_reconnect parameter to True and stop the I/O loop.
"""
self.should_reconnect = True
self.stop()
def on_channel_open(self, channel):
"""
The callback that is invoked when a channel is created.
Configure the callback.
on_channel_closed: the callback that is invoked when a channel is closed.
Start message consumption from the queue.
"""
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self.start_consuming()
def on_channel_closed(self, channel, reason):
"""
The callback that is invoked when a channel is closed.
Print channel information and close the connection
"""
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self.close_connection()
def start_consuming(self):
"""
Start message consumption from the queue.
"""
LOGGER.info('start consuming...')
self._channel.basic_consume(
self._queue, self.on_message)
def on_message(self, _unused_channel, basic_deliver, properties, body):
"""
Consume messages and upload acknowledgment (ACK).
"""
LOGGER.info('Received message: %s', body.decode())
# The consumption logic.
self._channel.basic_ack(basic_deliver.delivery_tag)
def run(self):
"""
Create a connection and start the I/O loop.
"""
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
"""
Stop the I/O loop.
"""
if not self._closing:
self._closing = True
self._connection.ioloop.stop()
LOGGER.info('Stopped')
class AutoRecoveryConsumer(object):
def __init__(self, amqp_url, queue):
self._amqp_url = amqp_url
self._queue = queue
self._consumer = Consumer(self._amqp_url, queue)
def run(self):
"""
Perform the while True loop until the KeyboardInterrupt exception is thrown.
In the run() method, the queue to which the I/O loop listens is started and messages are processed. The loop ensures that the consumer can continuously run and automatically reconnect to the broker.
"""
while True:
try:
self._consumer.run()
except KeyboardInterrupt:
self._consumer.stop()
break
self._maybe_reconnect()
def _maybe_reconnect(self):
"""
Determine whether a reconnection is required. The interval between two consecutive reconnections is 1 second.
"""
if self._consumer.should_reconnect:
self._consumer.stop()
time.sleep(1)
self._consumer = Consumer(self._amqp_url, self._queue)
def main():
username = 'MjoxODgwNzcwODY5MD****'
password = 'NDAxREVDQzI2MjA0OT****'
host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com'
port = 5672
vhost = 'vhost_test'
# amqp_url: amqp://<username>:<password>@<host>:<port>/<vhost>
amqp_url = 'amqp://%s:%s@%s:%i/%s' % (username, password, host, port, vhost)
consumer = AutoRecoveryConsumer(amqp_url, 'QueueTest')
consumer.run()
if __name__ == '__main__':
main()Configure recovery in PHP
php-amqplib is a PHP library for publishing and consuming messages in message queues compatible with the Advanced Message Queuing Protocol (AMQP), such as RabbitMQ. The php-amqplib library does not support automatic connection recovery through parameters. To implement automatic connection recovery, manually write code.
The version of the
php-amqpliblibrary must be 3.6.1 or later.If the client connects to RabbitMQ through AMQProxy, the automatic connection recovery code does not take effect when the client disconnects due to idleness. For scenarios with infrequent message traffic, connect the client directly to the RabbitMQ instance endpoint, without AMQProxy.
The following consumer client uses php-amqplib to implement automatic connection recovery:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
const ONE_SECOND = 1;
/**
* Create a connection.
*/
function connect() {
$host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com';
$username = 'NDAxREVDQzI2MjA0OT****';
$password = 'NDAxREVDQzI2MjA0OT****';
$port = 5672;
$vhost = 'vhost_test';
return new AMQPStreamConnection($host, $port, $username, $password, $vhost);
}
/**
* Release a connection.
*/
function cleanup_connection($connection) {
try {
if($connection !== null) {
$connection->close();
}
} catch (\ErrorException $e) {
}
}
$connection = null;
while(true){
try {
$connection = connect();
start_consuming($connection);
} catch (AMQPConnectionClosedException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\RuntimeException $e) {
echo 'Runtime exception ' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\ErrorException $e) {
echo 'Error exception ' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
}
}
/**
* Start consumption.
* @param AMQPStreamConnection $connection
*/
function start_consuming($connection) {
$queue = 'queueTest';
$consumerTag = 'consumer';
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
while ($channel->is_consuming()) {
$channel->wait();
}
}
/**
* Process the message.
* @param \PhpAmqpLib\Message\AMQPMessage $message
*/
function process_message($message)
{
// Process the business logic.
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
}Limitations
Connection failure detection takes time. Use the Publisher Confirms mechanism to prevent message loss during the detection period.
Channel exceptions cannot trigger automatic connection recovery. Channel exceptions are typically application-level issues that application owners must handle.
Automatic connection recovery does not automatically recover channels.
After a connection is disconnected, exclusive queues declared by the connection are deleted and related data is cleared. Message consumption from exclusive queues fails after the connection is automatically recovered.
Each consumer declared by a connection must have a unique consumer tag. If multiple consumers share the same consumer tag in a connection, only one consumer is recovered during automatic recovery. If you do not specify a consumer tag, the broker automatically assigns a unique one.