全部产品
Search
文档中心

云消息队列 RabbitMQ 版:实例限流最佳实践

更新时间:Oct 09, 2024

云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,本文介绍云消息队列 RabbitMQ 版实例的限流规则、限流后的行为以及限流最佳实践等。

限流后行为

云消息队列 RabbitMQ 版实例的TPS流量峰值超过您所购买实例的TPS规格上限时,云消息队列 RabbitMQ 版实例会被限流。

限流后的行为如下:

  • 云消息队列 RabbitMQ 版服务端会返回错误码信息。具体请参见错误码说明

  • 云消息队列 RabbitMQ 版服务端关闭当前请求的Channel。代码中可以捕获异常重新开启Channel。

限流Java示例代码如下:

private static final int MAX_RETRIES = 5; // 最大重试次数
private static final long WAIT_TIME_MS = 2000; // 每次重试的等待时间(以毫秒为单位)

private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
    try {
        // ......
        // 在当前通道channel下执行的任何操作
        // 例如消息发送、消费等
        // ......

    } catch (AlreadyClosedException e) {
        String message = e.getMessage();
        if (isChannelClosed(message)) {
            // 如果通道已经关闭,关闭并重新创建通道
            channel = createChannelWithRetry(connection); 
            // 在重连后可以继续执行其它操作
            // ......
        } else {
            throw e;
        }
    }
}

private Channel createChannelWithRetry(Connection connection) {
    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        try {
            return connection.createChannel();
        } catch (Exception e) {
            System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
            // 检查错误, 若仍是被限流导致的关闭错误,则可以等待后继续重试
            // 也可移除本部分重试逻辑
            if (attempt < MAX_RETRIES) {
                try {
                    Thread.sleep(WAIT_TIME_MS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // 还原中断状态
                }
            } else {
                throw new RuntimeException("Exceeded maximum retries to create channel", e);
            }
        }
    }
    throw new RuntimeException("This line should never be reached"); // 理论上不会到达这里
}

private boolean isChannelClosed(String errorMsg) {
    // 判断是否包含channel.close报错,该报错代表通道已关闭。
    // 可能涵盖530,541等错误信息。
    if (errorMsg != null && errorMsg.contains("channel.close")) {
        System.out.println("[ChannelClosed] Error details: " + errorMsg);
        return true;
    }
    return false;
}

错误码信息:

  • 错误码:reply-code=530

  • 错误信息:reply-text=denied for too many requests

Java客户端错误堆栈示例:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>
(reply-code=530, reply-text=denied for too many requests, ReqId:5FB4C999314635F952FCBFF6, ErrorHelp[dstQueue=XXX_test_queue,
srcExchange=Producer.ExchangeName,bindingKey=XXX_test_bk, http://mrw.so/6rNqO8], class-id=50, method-id=20)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
    at java.lang.Thread.run(Thread.java:748)

实例秒级TPS峰值查询

通过查询实例实际使用的秒级TPS峰值,您可以了解业务的流量波动情况和流量峰值,判断实例规格是否满足业务需求。

云消息队列 RabbitMQ 版提供以下三种方式查询实例的秒级TPS峰值:

查询方式

说明

查询时间级别

查询资源级别

(推荐)通过云监控查询实例TPS峰值并设置告警

优势:

  • 查询结果最大可显示14天内的TPS峰值变化,可快速定位异常范围。

  • 支持将实例TPS峰值作为监控指标设置告警。

  • 支持免费使用。

分钟级TPS峰值

取值为1分钟周期内,每秒钟实例TPS的最大值。

实例级别TPS峰值

(推荐)通过实例详情查询实例TPS峰值

  • 优势:

    • 支持查询秒级TPS峰值,可精确异常范围。

    • 支持查看具体API接口的TPS峰值。

    • 支持免费使用。

  • 不足:为避免显示结果过多,只显示10分钟内的查询结果。

秒级TPS峰值

  • 实例级别TPS峰值

  • 实例内某个API接口的TPS峰值

通过日志查询实例TPS峰值

  • 优势:支持通过SLS分析语句查询,适合复杂问题定位场景。

  • 不足:

    • 相较于前两种查询方式,操作较复杂,查询结果不够直观。

    • 需要额外支付日志服务相关费用,具体计费信息,请参见日志服务计费项

秒级TPS峰值

实例级别TPS峰值

实例TPS计算规则

以下接口调用时,会被计算进TPS流量中,即调用一次接口,计算为一次TPS。

  • ConnectionOpenChannelOpen

  • QueueDeclareQueueDeleteQueueBindQueueUnbind

  • ExchangeDeclareExchangeDelete

  • ExchangeBindExchangeUnBind

  • SendMessageBasicConsumeBasicGetBasicAckBasicRejectBasicNackBasicRecover

  • 延时消息是云消息队列 RabbitMQ 版的高级特性消息,发送延时消息时,调用API接口的次数需要在普通消息的基础上乘以5倍,消费延时消息时与普通消息次数相同。

    示例:1秒内发送2条延时消息,消费3条延时消息。则此时API调用TPS为:2×5+3=13次/秒。

  • 统计SendMessage接口的调用次数时,实际计算值为消息经过路由后要存储到的Queue的数量。

    例如,发送1条到Fanout类型Exchange的消息,最后要保存到10个Queue中,则SendMessage调用次数计算为10次。