全部產品
Search
文件中心

ApsaraMQ for RabbitMQ:執行個體限流最佳實務

更新時間:Oct 10, 2024

雲訊息佇列 RabbitMQ 版會對單一實例的TPS流量峰值進行限流,本文介紹雲訊息佇列 RabbitMQ 版執行個體的限流規則、限流後的行為以及限流最佳實務等。

限流後行為

雲訊息佇列 RabbitMQ 版執行個體的TPS流量峰值超過您所購買執行個體的TPS規格上限時,雲訊息佇列 RabbitMQ 版執行個體會被限流。

限流後的行為如下:

  • 雲訊息佇列 RabbitMQ 版服務端會返回錯誤碼資訊。具體請參見錯誤碼說明

  • 雲訊息佇列 RabbitMQ 版服務端關閉當前請求的Channel。代碼中可以捕獲異常重新開啟Channel。

限流Java範例程式碼如下:

private static final int MAX_RETRIES = 5; // 最大重試次數
private static final long WAIT_TIME_MS = 2000; // 每次重試的等待時間(以毫秒為單位)

private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
    try {
        // ......
        // 在當前通道channel下執行的任何操作
        // 例如訊息發送、消費等
        // ......

    } catch (AlreadyClosedException e) {
        String message = e.getMessage();
        if (isChannelClosed(message)) {
            // 如果通道已經關閉,關閉並重新建立通道
            channel = createChannelWithRetry(connection); 
            // 在重連後可以繼續執行其它操作
            // ......
        } else {
            throw e;
        }
    }
}

private Channel createChannelWithRetry(Connection connection) {
    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        try {
            return connection.createChannel();
        } catch (Exception e) {
            System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
            // 檢查錯誤, 若仍是被限流導致的關閉錯誤,則可以等待後繼續重試
            // 也可移除本部分重試邏輯
            if (attempt < MAX_RETRIES) {
                try {
                    Thread.sleep(WAIT_TIME_MS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // 還原中斷狀態
                }
            } else {
                throw new RuntimeException("Exceeded maximum retries to create channel", e);
            }
        }
    }
    throw new RuntimeException("This line should never be reached"); // 理論上不會到達這裡
}

private boolean isChannelClosed(String errorMsg) {
    // 判斷是否包含channel.close報錯,該報錯代表通道已關閉。
    // 可能涵蓋530,541等錯誤資訊。
    if (errorMsg != null && errorMsg.contains("channel.close")) {
        System.out.println("[ChannelClosed] Error details: " + errorMsg);
        return true;
    }
    return false;
}

錯誤碼資訊:

  • 錯誤碼:reply-code=530

  • 錯誤資訊:reply-text=denied for too many requests

Java用戶端錯誤堆棧樣本:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>
(reply-code=530, reply-text=denied for too many requests, ReqId:5FB4C999314635F952FCBFF6, ErrorHelp[dstQueue=XXX_test_queue,
srcExchange=Producer.ExchangeName,bindingKey=XXX_test_bk, http://mrw.so/6rNqO8], class-id=50, method-id=20)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
    at java.lang.Thread.run(Thread.java:748)

執行個體秒級TPS峰值查詢

通過查詢執行個體實際使用的秒級TPS峰值,您可以瞭解業務的流量波動情況和流量峰值,判斷執行個體規格是否滿足業務需求。

雲訊息佇列 RabbitMQ 版提供以下三種方式查詢執行個體的秒級TPS峰值:

查詢方式

說明

查詢時間層級

查詢資源層級

(推薦)通過CloudMonitor查詢執行個體TPS峰值並設定警示

優勢:

  • 查詢結果最大可顯示14天內的TPS峰值變化,可快速定位異常範圍。

  • 支援將執行個體TPS峰值作為監控指標設定警示。

  • 支援免費使用。

分鐘級TPS峰值

取值為1分鐘周期內,每秒鐘執行個體TPS的最大值。

執行個體層級TPS峰值

(推薦)通過執行個體詳情查詢執行個體TPS峰值

  • 優勢:

    • 支援查詢秒級TPS峰值,可精確異常範圍。

    • 支援查看具體API介面的TPS峰值。

    • 支援免費使用。

  • 不足:為避免顯示結果過多,只顯示10分鐘內的查詢結果。

秒級TPS峰值

  • 執行個體層級TPS峰值

  • 執行個體內某個API介面的TPS峰值

通過日誌查詢執行個體TPS峰值

  • 優勢:支援通過SLS分析語句查詢,適合複雜問題定位情境。

  • 不足:

    • 相較於前兩種查詢方式,操作較複雜,查詢結果不夠直觀。

    • 需要額外支付Log Service相關費用,具體計費資訊,請參見Log Service計費項目

秒級TPS峰值

執行個體層級TPS峰值

執行個體TPS計算規則

以下介面調用時,會被計算進TPS流量中,即調用一次介面,計算為一次TPS。

  • ConnectionOpenChannelOpen

  • QueueDeclareQueueDeleteQueueBindQueueUnbind

  • ExchangeDeclareExchangeDelete

  • ExchangeBindExchangeUnBind

  • SendMessageBasicConsumeBasicGetBasicAckBasicRejectBasicNackBasicRecover

  • 延時訊息是雲訊息佇列 RabbitMQ 版的進階特性訊息,發送延時訊息時,調用API介面的次數需要在普通訊息的基礎上乘以5倍,消費延時訊息時與普通訊息次數相同。

    樣本:1秒內發送2條延時訊息,消費3條延時訊息。則此時API調用TPS為:2×5+3=13次/秒。

  • 統計SendMessage介面的調用次數時,實際計算值為訊息經過路由後要儲存到的Queue的數量。

    例如,發送1條到Fanout類型Exchange的訊息,最後要儲存到10個Queue中,則SendMessage調用次數計算為10次。