云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,本文介绍云消息队列 RabbitMQ 版实例的限流规则、限流后的行为以及限流最佳实践等。
限流后行为
当云消息队列 RabbitMQ 版实例的TPS流量峰值超过您所购买实例的TPS规格上限时,云消息队列 RabbitMQ 版实例会被限流。
限流后的行为如下:
云消息队列 RabbitMQ 版服务端会返回错误码信息。具体请参见错误码说明。
云消息队列 RabbitMQ 版服务端关闭当前请求的Channel。代码中可以捕获异常重新开启Channel。
限流Java示例代码如下:
private static final int MAX_RETRIES = 5; // 最大重试次数
private static final long WAIT_TIME_MS = 2000; // 每次重试的等待时间(以毫秒为单位)
private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
try {
// ......
// 在当前通道channel下执行的任何操作
// 例如消息发送、消费等
// ......
} catch (AlreadyClosedException e) {
String message = e.getMessage();
if (isChannelClosed(message)) {
// 如果通道已经关闭,关闭并重新创建通道
channel = createChannelWithRetry(connection);
// 在重连后可以继续执行其它操作
// ......
} else {
throw e;
}
}
}
private Channel createChannelWithRetry(Connection connection) {
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
return connection.createChannel();
} catch (Exception e) {
System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
// 检查错误, 若仍是被限流导致的关闭错误,则可以等待后继续重试
// 也可移除本部分重试逻辑
if (attempt < MAX_RETRIES) {
try {
Thread.sleep(WAIT_TIME_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // 还原中断状态
}
} else {
throw new RuntimeException("Exceeded maximum retries to create channel", e);
}
}
}
throw new RuntimeException("This line should never be reached"); // 理论上不会到达这里
}
private boolean isChannelClosed(String errorMsg) {
// 判断是否包含channel.close报错,该报错代表通道已关闭。
// 可能涵盖530,541等错误信息。
if (errorMsg != null && errorMsg.contains("channel.close")) {
System.out.println("[ChannelClosed] Error details: " + errorMsg);
return true;
}
return false;
}
错误码信息:
错误码:reply-code=530
错误信息:reply-text=denied for too many requests
实例秒级TPS峰值查询
通过查询实例实际使用的秒级TPS峰值,您可以了解业务的流量波动情况和流量峰值,判断实例规格是否满足业务需求。
云消息队列 RabbitMQ 版提供以下三种方式查询实例的秒级TPS峰值:
查询方式 | 说明 | 查询时间级别 | 查询资源级别 |
优势:
| 分钟级TPS峰值 取值为1分钟周期内,每秒钟实例TPS的最大值。 | 实例级别TPS峰值 | |
(推荐)通过实例详情查询实例TPS峰值 |
| 秒级TPS峰值 |
|
| 秒级TPS峰值 | 实例级别TPS峰值 |
实例TPS计算规则
以下接口调用时,会被计算进TPS流量中,即调用一次接口,计算为一次TPS。
ConnectionOpen、ChannelOpen
QueueDeclare、QueueDelete、QueueBind、QueueUnbind
ExchangeDeclare、ExchangeDelete
ExchangeBind、ExchangeUnBind
SendMessage、BasicConsume、BasicGet、BasicAck、BasicReject、BasicNack、BasicRecover
延时消息是云消息队列 RabbitMQ 版的高级特性消息,发送延时消息时,调用API接口的次数需要在普通消息的基础上乘以5倍,消费延时消息时与普通消息次数相同。
示例:1秒内发送2条延时消息,消费3条延时消息。则此时API调用TPS为:2×5+3=13次/秒。
统计SendMessage接口的调用次数时,实际计算值为消息经过路由后要存储到的Queue的数量。
例如,发送1条到Fanout类型Exchange的消息,最后要保存到10个Queue中,则SendMessage调用次数计算为10次。