由於服務端升級、服務端重啟、網路抖動等原因,服務端和用戶端的網路連接可能會斷開。本文介紹使用Java、Python、PHP語言在用戶端設定自動回復的方法和範例程式碼,避免斷連對您的業務造成影響。
觸發原因
Connection的I/O拋出異常。
Socket讀取操作逾時。
檢測到服務端心跳丟失。
恢複方法
Java
4.0.0及以上版本Java用戶端預設開啟Connection和Topology自動回復,您無需在代碼中設定。
在用戶端開啟Connection和Topology(Queue、Exchange、Binding、Consumer)自動回復的方法如下:
factory.setAutomaticRecoveryEnabled(boolean)
:用於開啟或關閉Connection自動回復。factory.setNetworkRecoveryInterval(long)
:用於設定重試時間間隔。如果Connection自動回復異常,設定了Connection自動回復的用戶端將在一段固定時間間隔(預設為5秒)後重試。factory.setTopologyRecoveryEnabled(boolean)
:用於開啟Topology自動回復。Topology包括Queue、Exchange、Binding、Consumer。
Python
Pika是開源RabbitMQ官方推薦的Python用戶端庫。與Java用戶端提供的串連自動回復功能不同,Pika庫本身不直接支援通過配置來實現自動的串連恢複。因此,要在Python中實現這一功能,您需要通過編寫回呼函數來手動處理串連的恢複過程。
PHP
php-amqplib
是一個PHP庫,用於與AMQP協議相容的訊息佇列(如RabbitMQ)進行高效的訊息發布和消費。php-amqplib
庫本身不直接支援通過配置來實現自動的串連恢複。因此,要在PHP中實現這一功能,您需要手動處理串連的恢複過程。
設定自動重連時,使用的
php-amqplib
庫版本應為3.6.1及以上版本。當用戶端串連AMQProxy且串連因空閑斷開時,下文串連自動重連的代碼不生效。因此,在訊息收發頻率低的情境下(容易發生串連空閑斷開),建議直接連接RabbitMQ服務的存取點。
範例程式碼
Java
開啟Connection和Topology自動回復的用戶端範例程式碼如下:
ConnectionFactory factory = new ConnectionFactory();
// 設定存取點,在雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面擷取。
factory.setHost("xxx.xxx.aliyuncs.com");
// ${instanceId}為執行個體ID,從雲訊息佇列 RabbitMQ 版控制台執行個體詳情頁面擷取。
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
// 設定Vhost名稱,請確保已在訊息佇列AMQP版控制台上建立。
factory.setVirtualHost("${VhostName}");
// 預設連接埠,非加密連接埠5672,加密連接埠5671。
factory.setPort(5672);
// 基於網路環境設定合理的逾時時間。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 開啟Connection自動回復。
factory.setAutomaticRecoveryEnabled(true);
// 設定Connection重試時間間隔為10秒。
factory.setNetworkRecoveryInterval(10000);
// 開啟Topology自動回復。
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection();
Python
Python Pika串連自動回復的消費者用戶端範例程式碼如下:
# -*- coding: utf-8 -*-
import logging
import time
import pika
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
class Consumer(object):
def __init__(self, amqp_url, queue):
self.should_reconnect = False
self._connection = None
self._channel = None
self._closing = False
self._url = amqp_url
self._queue = queue
def connect(self):
'''
建立connection,並設定以下回調:
on_open_callback: 串連成功回調
on_open_error_callback: 建立串連失敗回調
on_close_callback: 串連關閉回調
'''
return pika.SelectConnection(
parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, _unused_connection):
'''
串連成功回調
建立channel,並設定回調:
on_channel_open: channel建立成功回調
'''
self._connection.channel(on_open_callback=self.on_channel_open)
def on_connection_open_error(self, _unused_connection, err):
"""
建立串連失敗回調
列印錯誤資訊,並嘗試重新串連
"""
LOGGER.error('Connection open failed: %s', err)
self.reconnect()
def on_connection_closed(self, _unused_connection, reason):
"""
串連關閉回調
分以下兩種情況:
1. 正常關閉,直接退出
2. 串連異常斷開,嘗試重新串連
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reconnect necessary: %s', reason)
self.reconnect()
def close_connection(self):
"""
關閉串連
"""
if self._connection.is_closing or self._connection.is_closed:
LOGGER.info('Connection is closing or already closed')
else:
LOGGER.info('Closing connection')
self._connection.close()
def reconnect(self):
"""
修改should_reconnect為True,並停止io_loop
"""
self.should_reconnect = True
self.stop()
def on_channel_open(self, channel):
"""
channel建立成功回調
設定回調:
on_channel_closed: channel關閉回調
開始隊列消費
"""
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self.start_consuming()
def on_channel_closed(self, channel, reason):
"""
channel關閉回調
列印channel關閉資訊,並關閉串連
"""
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self.close_connection()
def start_consuming(self):
"""
開始隊列消費
"""
LOGGER.info('start consuming...')
self._channel.basic_consume(
self._queue, self.on_message)
def on_message(self, _unused_channel, basic_deliver, properties, body):
"""
消費訊息並上傳ack
"""
LOGGER.info('Received message: %s', body.decode())
# 處理商務邏輯
self._channel.basic_ack(basic_deliver.delivery_tag)
def run(self):
"""
建立connection,並啟動io_loop
"""
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
"""
停止io_loop
"""
if not self._closing:
self._closing = True
self._connection.ioloop.stop()
LOGGER.info('Stopped')
class AutoRecoveryConsumer(object):
def __init__(self, amqp_url, queue):
self._amqp_url = amqp_url
self._queue = queue
self._consumer = Consumer(self._amqp_url, queue)
def run(self):
"""
while True 迴圈,直到KeyboardInterrupt異常
在run方法中,會啟動io_loop監聽隊列並處理訊息,通過迴圈確保消費者持續運行並且能夠自動重連
"""
while True:
try:
self._consumer.run()
except KeyboardInterrupt:
self._consumer.stop()
break
self._maybe_reconnect()
def _maybe_reconnect(self):
"""
判斷是否需要重連,每次重連間隔1s
"""
if self._consumer.should_reconnect:
self._consumer.stop()
time.sleep(1)
self._consumer = Consumer(self._amqp_url, self._queue)
def main():
username = 'MjoxODgwNzcwODY5MD****'
password = 'NDAxREVDQzI2MjA0OT****'
host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com'
port = 5672
vhost = 'vhost_test'
# amqp_url: amqp://<username>:<password>@<host>:<port>/<vhost>
amqp_url = 'amqp://%s:%s@%s:%i/%s' % (username, password, host, port, vhost)
consumer = AutoRecoveryConsumer(amqp_url, 'QueueTest')
consumer.run()
if __name__ == '__main__':
main()
PHP
PHP php-amqplib串連自動回復的消費者用戶端範例程式碼如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
const ONE_SECOND = 1;
/**
* 建立串連
*/
function connect() {
$host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com';
$username = 'NDAxREVDQzI2MjA0OT****';
$password = 'NDAxREVDQzI2MjA0OT****';
$port = 5672;
$vhost = 'vhost_test';
return new AMQPStreamConnection($host, $port, $username, $password, $vhost);
}
/**
* 清理串連
*/
function cleanup_connection($connection) {
try {
if($connection !== null) {
$connection->close();
}
} catch (\ErrorException $e) {
}
}
$connection = null;
while(true){
try {
$connection = connect();
start_consuming($connection);
} catch (AMQPConnectionClosedException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\RuntimeException $e) {
echo 'Runtime exception ' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\ErrorException $e) {
echo 'Error exception ' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
}
}
/**
* 啟動消費
* @param AMQPStreamConnection $connection
*/
function start_consuming($connection) {
$queue = 'queueTest';
$consumerTag = 'consumer';
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
while ($channel->is_consuming()) {
$channel->wait();
}
}
/**
* 處理訊息
* @param \PhpAmqpLib\Message\AMQPMessage $message
*/
function process_message($message)
{
// 處理商務邏輯
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
}
恢複限制
Connection斷開需要一定的時間檢測。要確保這段時間內發送的訊息不丟失,需使用Publisher Confirms實現可靠發送。
Channel異常導致Connection斷開時,不會觸發Connection自動回復。Channel異常通常為應用層級的問題,需要使用方自行處理。
Connection自動回復不會使Channel也自動回復。