All Products
Search
Document Center

ApsaraMQ for RabbitMQ:Configure automatic connection recovery for clients

Last Updated:Sep 11, 2024

A client may disconnect from the broker due to broker upgrades, broker restarts, or network jitters. This topic describes how to configure automatic connection recovery for Java, Python, and PHP clients. This topic also provides sample code.

Causes

  • An I/O exception is thrown.

  • A socket read operation times out.

  • Missing broker heartbeats are detected.

Solutions

Java

Important

In Java client 4.0.0 and later versions, automatic connection recovery and automatic topology recovery are automatically enabled. You do not need to configure parameters in the code.

You can configure the following parameters in the code to enable automatic connection recovery and automatic topology recovery. Topology recovery includes actions on queues, exchanges, bindings, and consumers.

  • factory.setAutomaticRecoveryEnabled(boolean): specifies whether to enable automatic connection recovery.

  • factory.setNetworkRecoveryInterval(long): specifies the interval between two consecutive retries. If the connection fails to be recovered, the client attempts to reconnect after the specified interval elapses. The default interval is 5 seconds.

  • factory.setTopologyRecoveryEnabled(boolean): specifies whether to enable automatic topology recovery. Topology recovery includes actions on queues, exchanges, bindings, and consumers.

Python

Pika is a Python client library recommended by open source RabbitMQ. Pika does not allow you to configure parameters to implement automatic connection recovery. To implement automatic connection recovery in Python, you must manually write a callback function.

PHP

php-amqplib is a PHP library that is used to efficiently publish and consume messages in message queues that are compatible with the Advanced Message Queuing Protocol (AMQP), such as RabbitMQ. The php-amqplib library does not allow you to configure parameters to implement automatic connection recovery. To implement automatic connection recovery in PHP, you must manually write code.

Note
  • When you configure automatic connection recovery, make sure that the version of the php-amqplib library is 3.6.1 or later.

  • If the client is connected to RabbitMQ by using AMQProxy, the code for automatic connection recovery does not take effect when the client is disconnected due to idleness. In scenarios in which messages are infrequently sent and received, client disconnections may occur due to idleness. We recommend that you connect the client to RabbitMQ by using the endpoint of RabbitMQ

Sample code

Java

The following sample code provides an example on how to enable automatic connection recovery and automatic topology recovery:

ConnectionFactory factory = new ConnectionFactory();
// The endpoint. You can view the endpoint of an instance on the Instance Details page in the ApsaraMQ for RabbitMQ console. 
factory.setHost("xxx.xxx.aliyuncs.com");
// Replace ${instanceId} with the ID of the ApsaraMQ for RabbitMQ instance. You can view the instance ID on the Instances page in the ApsaraMQ for RabbitMQ console. 
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
// The vhost name. Make sure that the vhost is created in the ApsaraMQ for RabbitMQ console. 
factory.setVirtualHost("${VhostName}");
// The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections. 
factory.setPort(5672);
// The timeout period. Set the value based on the network environment. 
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// Specifies whether to enable automatic connection recovery. 
factory.setAutomaticRecoveryEnabled(true);
// The retry interval. Set the value to 10 seconds. 
factory.setNetworkRecoveryInterval(10000);
// Specifies whether to enable automatic topology recovery. 
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection();                      

Python

The following sample code provides an example on how to enable automatic connection recovery on a consumer client by using Pika:

# -*- coding: utf-8 -*-

import logging
import time
import pika

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

class Consumer(object):

    def __init__(self, amqp_url, queue):
        self.should_reconnect = False

        self._connection = None
        self._channel = None
        self._closing = False
        self._url = amqp_url

        self._queue = queue

    def connect(self):
        '''
        Create a connection and configure the following callbacks:
        on_open_callback: the callback that is invoked when a connection is created.
        on_open_error_callback: the callback that is invoked when a connection fails to be created.
        on_close_callback: the callback that is invoked when a connection is closed.
        '''
        return pika.SelectConnection(
            parameters=pika.URLParameters(self._url),
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_open_error,
            on_close_callback=self.on_connection_closed)

    def on_connection_open(self, _unused_connection):
        '''
        The callback that is invoked when a connection is created.
        Create a channel and configure the following callback:
        on_channel_open: the callback that is invoked when a channel is created.
        '''
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_connection_open_error(self, _unused_connection, err):
        """
        The callback that is invoked when a connection fails to be created.
        Print the error message and recreate a connection.
        """
        LOGGER.error('Connection open failed: %s', err)
        self.reconnect()

    def on_connection_closed(self, _unused_connection, reason):
        """
        The callback that is invoked when a connection is closed.
        The following scenarios may occur:
        1. The connection is closed as expected and the client exits.
        2. The client is unexpectedly disconnected and tries to recreate a connection.
        """
        self._channel = None
        if self._closing:
            self._connection.ioloop.stop()
        else:
            LOGGER.warning('Connection closed, reconnect necessary: %s', reason)
            self.reconnect()

    def close_connection(self):
        """
        Close the connection.
        """
        if self._connection.is_closing or self._connection.is_closed:
            LOGGER.info('Connection is closing or already closed')
        else:
            LOGGER.info('Closing connection')
            self._connection.close()

    def reconnect(self):
        """
        Set the self.should_reconnect parameter to True and stop the I/O loop.
        """
        self.should_reconnect = True
        self.stop()

    def on_channel_open(self, channel):
        """
        The callback that is invoked when a channel is created.
        Configure the callback.
        on_channel_closed: the callback that is invoked when a channel is closed.
        Start message consumption from the queue.
        """
        self._channel = channel
        self._channel.add_on_close_callback(self.on_channel_closed)
        self.start_consuming()

    def on_channel_closed(self, channel, reason):
        """
        The callback that is invoked when a channel is closed.
        Print channel information and close the connection
        """
        LOGGER.warning('Channel %i was closed: %s', channel, reason)
        self.close_connection()

    def start_consuming(self):
        """
        Start message consumption from the queue.
        """
        LOGGER.info('start consuming...')
        self._channel.basic_consume(
            self._queue, self.on_message)

    def on_message(self, _unused_channel, basic_deliver, properties, body):
        """
        Consume messages and upload acknowledgment (ACK).
        """
        LOGGER.info('Received message: %s', body.decode())
        # The consumption logic.
        self._channel.basic_ack(basic_deliver.delivery_tag)

    def run(self):
        """
        Create a connection and start the I/O loop.
        """
        self._connection = self.connect()
        self._connection.ioloop.start()

    def stop(self):
        """
        Stop the I/O loop.
        """
        if not self._closing:
            self._closing = True
            self._connection.ioloop.stop()
            LOGGER.info('Stopped')


class AutoRecoveryConsumer(object):

    def __init__(self, amqp_url, queue):
        self._amqp_url = amqp_url
        self._queue = queue
        self._consumer = Consumer(self._amqp_url, queue)

    def run(self):
        """
        Perform the while True loop until the KeyboardInterrupt exception is thrown.
        In the run() method, the queue to which the I/O loop listens is started and messages are processed. The loop ensures that the consumer can continuously run and automatically reconnect to the broker.
        """
        while True:
            try:
                self._consumer.run()
            except KeyboardInterrupt:
                self._consumer.stop()
                break
            self._maybe_reconnect()

    def _maybe_reconnect(self):
        """
        Determine whether a reconnection is required. The interval between two consecutive reconnections is 1 second.
        """
        if self._consumer.should_reconnect:
            self._consumer.stop()
            time.sleep(1)
            self._consumer = Consumer(self._amqp_url, self._queue)


def main():
    username = 'MjoxODgwNzcwODY5MD****'
    password = 'NDAxREVDQzI2MjA0OT****'
    host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com'
    port = 5672
    vhost = 'vhost_test'

    # amqp_url: amqp://<username>:<password>@<host>:<port>/<vhost>
    amqp_url = 'amqp://%s:%s@%s:%i/%s' % (username, password, host, port, vhost)
    consumer = AutoRecoveryConsumer(amqp_url, 'QueueTest')
    consumer.run()


if __name__ == '__main__':
    main()

PHP

The following sample code provides an example on how to enable automatic connection recovery on a consumer client by using php-amqplib:

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;

const ONE_SECOND = 1;


/**
 * Create a connection.
 */
function connect() {
    $host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com';
    $username = 'NDAxREVDQzI2MjA0OT****';
    $password = 'NDAxREVDQzI2MjA0OT****';
    $port = 5672;
    $vhost = 'vhost_test';
    return new AMQPStreamConnection($host, $port, $username, $password, $vhost);
}

/**
 * Release a connection.
 */
function cleanup_connection($connection) {
    try {
        if($connection !== null) {
            $connection->close();
        }
    } catch (\ErrorException $e) {
    }
}

$connection = null;

while(true){
    try {
        $connection = connect();
        start_consuming($connection);
    } catch (AMQPConnectionClosedException $e) {
        echo $e->getMessage() . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    } catch(AMQPRuntimeException $e) {
        echo $e->getMessage() . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    } catch(\RuntimeException $e) {
        echo 'Runtime exception ' . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    } catch(\ErrorException $e) {
        echo 'Error exception ' . PHP_EOL;
        cleanup_connection($connection);
        sleep(ONE_SECOND);
    }
}

/**
 * Start consumption.
 * @param AMQPStreamConnection $connection
 */
function start_consuming($connection) {
    $queue = 'queueTest';
    $consumerTag = 'consumer';
    $channel = $connection->channel();
    $channel->queue_declare($queue, false, true, false, false);
    $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
    while ($channel->is_consuming()) {
        $channel->wait();
    }
}


/**
 * Process the message.
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */
function process_message($message)
{
    // Process the business logic.
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->ack();
}

Limits

  • The detection of connection failures requires a period of time to complete. You can use the Publisher Confirms mechanism to ensure that messages sent during this period of time are not lost.

  • Channel exceptions cannot trigger automatic connection recovery. In most cases, channel exceptions are application-level issues and need to be handled by application owners.

  • Automatic connection recovery does not cause channels to automatically recover.