Horizontal pod autoscaling based on the metrics of ApsaraMQ for RabbitMQ

Updated at: 2025-04-23 09:14

The ack-keda component provides event-driven scaling and periodically consumes data from event sources. When pending messages increase, ACK KEDA is triggered to scale a batch of jobs within seconds. You can use ApsaraMQ for RabbitMQ and ack-keda to monitor the queue length and messaging rate metrics. This topic describes how to configure horizontal pod autoscaling based on the message queue length metrics of ApsaraMQ for RabbitMQ and the event-driven autoscaler KEDA.

Prerequisites

Step 1: Create and deploy a Deployment

  1. Log on to the ACK console. In the left-side navigation pane, click Clusters.

  2. On the Clusters page, find the cluster that you want to manage and click its name. In the left-side pane, choose Workloads > Deployments.

  3. In the upper-left corner of the Deployments page, click Create from YAML.

  4. On the Create page, set Sample Template to Custom. Create a Deployment named sample-app based on the following content and click Create:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: consumer
            image: consumer  # Replace with the actual image used by the consumer application of ApsaraMQ for RabbitMQ. 
            resources:
              limits:
                cpu: "500m"

Step 2: Configure horizontal pod autoscaling based on the metrics of ApsaraMQ for RabbitMQ

This step shows how to obtain the information of the ApsaraMQ for RabbitMQ instance you created and configure Horizontal Pod Autoscaler (HPA) to automatically scale the application based on the metrics of ApsaraMQ for RabbitMQ.

  1. Obtain the information of the ApsaraMQ for RabbitMQ instance.

    1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

    2. On the Instances page, click the name of the instance you created to go to the Instance Details page. On the Endpoint Information tab, record the endpoint in the Endpoint column of Public Endpoint.

      Note

      If you use a public endpoint when you create the ApsaraMQ for RabbitMQ instance, the instance may be vulnerable to malicious attacks and unauthorized access. In this example, a public endpoint is used only for demonstration. If your application is deployed in a virtual private cloud (VPC) and does not need to access the Internet, we recommend that you disable Internet access to enhance security.

    3. In the left-side navigation pane, click Static Accounts and record the username and password that are used to log on to the instance.静态用户名密码..png

    4. In the left-side navigation pane, click vhosts and record the name of the virtual host (vhost). Example: amq-test.获取vhost..png

  2. Run the following command to create a connection string:

    echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64

    Replace rabbitmq-username and rabbitmq-password with the username and password that you recorded, replace localhost with the public endpoint that you recorded, and replace vhost with the name of the vhost.

  3. Create a Secret based on the following YAML content:

    apiVersion: v1
    kind: Secret
    metadata:
      name: keda-rabbitmq-secret
    data:
      host: YW1x****** # The connection string that you created.
  4. Use the following YAML content to create a TriggerAuthentication object:

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: keda-trigger-auth-rabbitmq-conn
      namespace: default
    spec:
      secretTargetRef:
        - parameter: host
          name: keda-rabbitmq-secret # keda-rabbitmq-secret is the Secret that you created in the preceding step. 
          key: host

    Run the following command to deploy the TriggerAuthentication object in the cluster:

    kubectl apply -f secret.yaml
    kubectl apply -f rabbitmq-trigger-auth.yaml

    After the TriggerAuthentication object is deployed, you can use RabbitMQ triggers in KEDA. You can also reference the TriggerAuthentication object to connect to the ApsaraMQ for RabbitMQ instance to query metrics.

  5. Create a file named ScaledObject.yaml and copy the following content to the file:

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: rabbitmq-deployment
      maxReplicaCount: 10
      minReplicaCount: 1
      triggers:
      - type: rabbitmq
        metadata:
          protocol: amqp
          queueName: queue-test
          mode: QueueLength
          value: "20"
          metricName: queue-test 
        authenticationRef:
          name: keda-trigger-auth-rabbitmq-conn

    Parameter

    Description

    Parameter

    Description

    scaleTargetRef

    The object that you want to scale. In this example, the value is set to gsample-appg, which is the name of the Deployment that you created in Step 1: Create and deploy a Deployment.

    maxReplicaCount

    The maximum number of replicated pods.

    minReplicaCount

    The minimum number of replicated pods.

    protocol

    The protocol that is used by ack-keda and the ApsaraMQ for RabbitMQ instance to communicate. Valid values: auto, http, and amqp.

    queueName

    The name of the queue from which you want to read data.

    value

    The scale-out threshold.

    metricName

    The name of a custom metric based on which HPA scales the application. In this example, the queue-test metric is used, which indicates the queue length.

  6. Configure the application and check the status of the ScaledObject and HPA.

    Run the following command to create a ScaledObject:

    kubectl apply -f ScaledObject.yaml  

    Run the following command to check the status of the ScaledObject:

    kubectl get ScaledObject

    Expected output:

    NAME                    SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS   AUTHENTICATION                    READY   ACTIVE   FALLBACK   AGE
    rabbitmq-scaledobject   apps/v1.Deployment   sample-app        1     10   rabbitmq   keda-trigger-auth-rabbitmq-conn    True    False    False      17s

    Run the following command to check whether HPA is deployed:

    kubectl get hpa

    Expected output:

    NAME                             REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)   1         10        2          2m35s

Step 3: Produce and consume data to trigger application scaling

This step shows how to use producer code and consumer code to enable HPA to scale the application based on the message queue length in ApsaraMQ for RabbitMQ.

  1. Generate messages by using the following producer code:

    package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"    // Replace with the name of the queue from which you want to read data. 
    	numMsgs   = 10000
    	pauseTime = 10 * time.Millisecond
            url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // The URL of the ApsaraMQ for RabbitMQ instance. The URL must be in the amqp://guest:password@localhost:5672/vhost format.
    )
    
    func main() {
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		queueName,
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	go produce(ch, q)
    	select {}
    }
    // Produce messages.
    func produce(ch *amqp.Channel, q amqp.Queue) {
    	for i := 0; i < numMsgs; i++ {
    		msg := fmt.Sprintf("Message %d", i)
    		err := ch.Publish(
    			"",
    			q.Name,
    			false,
    			false,
    			amqp.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(msg),
    			},
    		)
    		failOnError(err, "Failed to publish a message")
    		log.Printf("Successed to publish a message: %s", msg)
    		time.Sleep(pauseTime)
    	}
    }
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
  2. Run the following command to query information about HPA:

    Run the producer code to send messages.

    go run producer.go

    Run the following command to query information about HPA:

    kubectl get hpa

    Expected output:

    NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app    443000m/20 (avg)   1         10        10         9m15s

    The output shows that the number of replicated pods for the sample-app application is scaled to the maximum value specified in KEDA.

  3. Terminate the producer program and run the following consumer program to receive messages.

    package main
    
    import (
    	"fmt"
    	"log"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"
    	url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // The URL of the ApsaraMQ for RabbitMQ instance. The URL must be in the amqp://guest:password@localhost:5672/vhost format.
    )
    
    func main() {
    	// Connect to ApsaraMQ for RabbitMQ.
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	// Create a channel.
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	// Claim a queue to ensure that the queue exists.
    	_, err = ch.QueueDeclare(
    		queueName, // The queue name.
    		true,      // Specify whether to enable persistent storage.
    		false,     // Specify whether to enable an exclusive queue.
    		false,     // Specify whether to store messages in the queue.
    		false,     // Specify whether to enable auto deletion.
    		nil,       // Additional attributes.
    	)
    	failOnError(err, "Failed to declare a queue")
    
    	// Obtain the message channel.
    	msgs, err := ch.Consume(
    		queueName, // The queue name.
    		"",        // The consumer name. You can leave this parameter empty.
    		true,      // Enable auto confirmation.
    		false,     // Specify whether to enable an exclusive queue.
    		false,     // Specify whether to store messages in the queue.
    		false,     // Specify whether to enable a prioritized queue.
    		nil,       // Additional parameters.
    	)
    	failOnError(err, "Failed to register a consumer")
    
    	// Create a goroutine to process the received messages.
    	go func() {
    		for msg := range msgs {
    			log.Printf("Received a message: %s", msg.Body)
    			// Process the received messages based on the message consumption logic.
    			fmt.Printf("Processed message: %s\n", msg.Body)
    		}
    	}()
    
    	// Block the main program and keep the queue running.
    	select {}
    }
    
    // A simple error handling function.
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }

    Run the consumer code to receive messages.

    go run consumer.go

    Run the following command to check whether HPA scales in the application:

    kubectl get hpa -w

    Expected output:

    NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   235000m/20 (avg)   1         10        10         9m51s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        10         10m
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        1          15m

    The output shows that the number of replicated pods for the sample-app application is scaled to the minimum value specified in KEDA at a point in time after data consumption ends.

References

For more information about how to configure HPA to scale an application based on the message accumulation metrics of ApsaraMQ for RocketMQ, see Keda based on RocketMQ metrics.

  • On this page (1, M)
  • Prerequisites
  • Step 1: Create and deploy a Deployment
  • Step 2: Configure horizontal pod autoscaling based on the metrics of ApsaraMQ for RabbitMQ
  • Step 3: Produce and consume data to trigger application scaling
  • References
Feedback