Throttling can be triggered for an ApsaraMQ for RabbitMQ instance based on the peak transactions per second (TPS) of the instance. This topic describes the throttling rules for ApsaraMQ for RabbitMQ instances, the operations performed by the consumer and producer after throttling is triggered, and the best practices that you can use for throttling.
Operations performed by the consumer and producer after throttling is triggered
If the peak TPS of an ApsaraMQ for RabbitMQ instance exceeds the TPS threshold value that is specified when you purchase the ApsaraMQ for RabbitMQ instance, throttling is triggered for the instance.
The following operations are performed after throttling is triggered:
The ApsaraMQ for RabbitMQ broker returns an error code and an error message. For more information, see Error codes.
The ApsaraMQ for RabbitMQ broker closes the channel of the current request. You can detect exceptions in the code and reopen the channel.
Sample Java code for throttling:
private static final int MAX_RETRIES = 5; // The maximum number of retries.
private static final long WAIT_TIME_MS = 2000; // The wait time between two consecutive retries. Unit: milliseconds.
private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
try {
// ......
// Any operation performed in the current channel.
// The operation can be message sending or message consumption.
// ......
} catch (AlreadyClosedException e) {
String message = e.getMessage();
if (isChannelClosed(message)) {
// If the channel is closed, close and re-create the channel.
channel = createChannelWithRetry(connection);
// Continue to perform other operations after reconnection.
// ......
} else {
throw e;
}
}
}
private Channel createChannelWithRetry(Connection connection) {
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
return connection.createChannel();
} catch (Exception e) {
System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
// Check for errors. If the channel is still closed due to throttling, you can wait and try again.
// You can also remove the retry logic in this part.
if (attempt < MAX_RETRIES) {
try {
Thread.sleep(WAIT_TIME_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Reset the thread to the interrupted state.
}
} else {
throw new RuntimeException("Exceeded maximum retries to create channel", e);
}
}
}
throw new RuntimeException("This line should never be reached"); // In theory, this step cannot be reached.
}
private boolean isChannelClosed(String errorMsg) {
// Determine whether the channel.close error is reported. The error indicates that the channel is closed.
// Error codes, such as 530 and 541, may be included.
if (errorMsg != null && errorMsg.contains("channel.close")) {
System.out.println("[ChannelClosed] Error details: " + errorMsg);
return true;
}
return false;
}
The following section describes the error code and error message:
Error code: reply-code=530
Error message: reply-text=denied for too many requests
Query the peak TPS of an instance
You can query the actual peak TPS of an instance to understand the traffic fluctuation and peak traffic of your service and determine whether the current instance specification meets your business requirements.
ApsaraMQ for RabbitMQ allows you to use the following methods to query the peak TPS of an instance:
Method | Description | Time granularity | Resource level |
(Recommend) Query the peak TPS of an instance and configure an alert rule by using CloudMonitor | Benefits:
| Peak TPS at the minute level The peak TPS of an instance during a 1-minute statistical period | Peak TPS of an instance |
(Recommend) Query the peak TPS of an instance on the Instance Details page |
| Peak TPS at the second level |
|
Query the peak TPS of an instance by using Simple Log Service |
| Peak TPS at the second level | Peak TPS of an instance |
Rules based on which the TPS of an instance is calculated
If you call one of the following operations once, one TPS is counted.
ConnectionOpen and ChannelOpen
QueueDeclare, QueueDelete, QueueBind, and QueueUnbind
ExchangeDeclare and ExchangeDelete
ExchangeBind and ExchangeUnBind
SendMessage, BasicConsume, BasicGet, BasicAck, BasicReject, BasicNack, and BasicRecover
Delayed messages are a type of featured message provided by ApsaraMQ for RabbitMQ. One API call to send a delayed message is counted as five API calls to send a normal message, while one API call to receive a delayed message is still counted as one API call to send a normal message.
For example, if you send two delayed messages and receive three delayed messages within a second, the total number of API calls is 13. The number of API calls is calculated by using the following formula: 2 × 5 + 3 = 13.
When you count the number of API calls to the SendMessage operation, the actual value is the number of queues to which messages are stored after the messages are routed.
For example, if you send one message to an exchange of the Fanout type and save the message to 10 queues, the number of API calls to the SendMessage operation is counted as 10.