The ack-keda component enables event-driven pod autoscaling by monitoring external metric sources such as message queues. When messages accumulate in a RabbitMQ queue, KEDA triggers a scale-out within seconds. When the queue drains, pods scale back in automatically.
This topic walks you through scaling a Deployment based on the queue length metric from ApsaraMQ for RabbitMQ.
How it works
KEDA connects your ApsaraMQ for RabbitMQ instance to the Kubernetes Horizontal Pod Autoscaler (HPA) through the following resource chain:
A Secret stores the RabbitMQ connection string.
A TriggerAuthentication references the Secret so KEDA can authenticate with RabbitMQ.
A ScaledObject defines which Deployment to scale, which queue to monitor, and the threshold that triggers scaling.
KEDA periodically polls the queue length and feeds the metric to HPA, which adjusts the replica count accordingly.
Prerequisites
Before you begin, make sure that you have:
The ack-keda component deployed in your cluster. For more information, see Event-driven scaling
An ApsaraMQ for RabbitMQ instance and related resources are created. For more information, see Create resources
kubectlconnected to your cluster. For more information, see Connect to a cluster using kubectlA Go runtime environment installed
Step 1: Deploy a sample workload
Create a Deployment that serves as the scaling target. In a production scenario, replace the container image with your actual RabbitMQ consumer application.
Log on to the Container Service Management Console . In the navigation pane on the left, click Clusters.
On the Clusters page, click the name of your cluster. In the navigation pane on the left, click .
On the Deployments page, click Create from YAML. Set Sample Template to Custom, paste the following YAML, and then click Create.
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # Replace with your actual RabbitMQ consumer image. resources: limits: cpu: "500m"
Step 2: Configure KEDA to scale based on RabbitMQ queue length
This step connects KEDA to your ApsaraMQ for RabbitMQ instance and defines the scaling rules.
Gather RabbitMQ connection details
Log on to the ApsaraMQ for RabbitMQ console. In the left navigation pane, click Instances.
Click the name of your instance. On the Instance Details page, click the Endpoint Information tab and record an endpoint.
WarningA public endpoint exposes your instance to external access. This tutorial uses a public endpoint for demonstration only. If your ACK cluster and RabbitMQ instance are in the same VPC, use the VPC (internal) endpoint instead.
In the left navigation pane, click Users and Permissions. Record the Username and Password.
If no user exists, see Users and Permissions to create one.
In the left navigation pane, click Vhosts. Record the Vhost value (for example,
amq-test).
Create a Secret and TriggerAuthentication
Build a Base64-encoded connection string by running the following command. Replace the placeholders with the values you recorded.
Placeholder Description Example <rabbitmq-username>Username from the RabbitMQ console Mjpt****<rabbitmq-password>Password from the RabbitMQ console QT****<endpoint>Public or internal endpoint amqp-cn-****.amqp-0.net.mq.amqp.aliyuncs.com<vhost>Virtual host name amq-testecho -n "amqp://<rabbitmq-username>:<rabbitmq-password>@<endpoint>:5672/<vhost>" | base64Save the following YAML as
secret.yaml. Replace thehostvalue with the Base64 string from the previous step.apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1x****** # Your Base64-encoded connection string.Save the following YAML as
trigger-auth.yaml. This TriggerAuthentication tells KEDA how to authenticate with RabbitMQ by referencing the Secret.apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret key: hostApply both files:
kubectl apply -f secret.yaml kubectl apply -f trigger-auth.yaml
Create a ScaledObject
A ScaledObject links a Deployment to a KEDA trigger. The following configuration scales sample-app based on the queue-test queue length.
Save the following YAML as
scaledobject.yaml:Parameter Description scaleTargetRef.nameThe Deployment to scale. Set this to the application created in Step 1. maxReplicaCountMaximum number of replicas. minReplicaCountMinimum number of replicas. protocolProtocol used to connect to RabbitMQ. Valid values: auto,http,amqp.queueNameThe RabbitMQ queue to monitor. modeThe metric to use for scaling. Set to QueueLengthto scale based on the number of messages in the queue.valueThe threshold that triggers a scale-out. metricNameThe custom metric name reported to HPA. Typically set to the queue name. apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: sample-app maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: queue-test mode: QueueLength value: "20" metricName: queue-test authenticationRef: name: keda-trigger-auth-rabbitmq-connApply the ScaledObject:
kubectl apply -f scaledobject.yaml
Verify the configuration
Check the ScaledObject status: Expected output:
kubectl get ScaledObjectNAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17sCheck the HPA status: Expected output:
TARGETS: 0/20 (avg)shows the current queue length (0) against the threshold (20).kubectl get hpaNAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
Step 3: Test scaling behavior
Produce and consume messages to observe scale-out and scale-in in action. The following Go programs connect directly to ApsaraMQ for RabbitMQ.
Produce messages to trigger scale-out
Create a file named
producer.gowith the following code: Replace<username>,<password>,<endpoint>, and<vhost>with the values you recorded earlier. The URL format isamqp://username:password@endpoint:5672/vhost.package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // Must match the queueName in your ScaledObject. numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://<username>:<password>@<endpoint>:5672/<vhost>" ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") go produce(ch, q) select {} } // produce publishes messages to the queue. func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "Failed to publish a message") log.Printf("Published message: %s", msg) time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }Initialize the Go module and download dependencies:
go mod init producer go mod tidyRun the producer:
go run producer.goIn a separate terminal, watch the HPA scale out: Expected output: The replica count increases to the maximum (10) as messages accumulate in the queue.
kubectl get hpaNAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s
Consume messages to observe scale-in
Stop the producer program. Create a file named
consumer.gowith the following code: Replace theurlconstant with your RabbitMQ connection string.package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( queueName = "queue-test" url = "amqp://<username>:<password>@<endpoint>:5672/<vhost>" ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() _, err = ch.QueueDeclare( queueName, true, // durable false, // exclusive false, // no-wait false, // auto-delete nil, ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( queueName, "", // consumer tag true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, ) failOnError(err, "Failed to register a consumer") go func() { for msg := range msgs { log.Printf("Received: %s", msg.Body) fmt.Printf("Processed: %s\n", msg.Body) } }() select {} } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }Run the consumer:
go run consumer.goMonitor the HPA in real time: Expected output: After the consumer drains the queue, the target metric drops to 0. After a period of no data consumption, KEDA scales the Deployment back to the minimum replica count (1).
kubectl get hpa -wNAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15m
Clean up
To remove the resources created in this tutorial, run the following commands:
kubectl delete -f scaledobject.yaml
kubectl delete -f trigger-auth.yaml
kubectl delete -f secret.yaml
kubectl delete deployment sample-appReferences
KEDA based on RocketMQ metrics -- Configure HPA based on ApsaraMQ for RocketMQ message accumulation metrics.