The ack-keda component provides event-driven scaling and periodically consumes data from event sources. When pending messages increase, ACK KEDA is triggered to scale a batch of jobs within seconds. You can use ApsaraMQ for RabbitMQ and ack-keda to monitor the queue length and messaging rate metrics. This topic describes how to configure horizontal pod autoscaling based on the message queue length metrics of ApsaraMQ for RabbitMQ and the event-driven autoscaler KEDA.
Prerequisites
ack-keda is deployed. For more information, see ACK KEDA.
An ApsaraMQ for RabbitMQ instance is created. For more information, see Create resources.
A kubectl client is connected to your Kubernetes cluster. For more information, see Obtain the kubeconfig file of a cluster and use kubectl to connect to the cluster.
Go is installed.
Step 1: Create and deploy a Deployment
Log on to the ACK console. In the left-side navigation pane, click Clusters.
On the Clusters page, find the cluster that you want to manage and click its name. In the left-side pane, choose .
In the upper-left corner of the Deployments page, click Create from YAML.
On the Create page, set Sample Template to Custom. Create a Deployment named sample-app based on the following content and click Create:
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # Replace with the actual image used by the consumer application of ApsaraMQ for RabbitMQ. resources: limits: cpu: "500m"
Step 2: Configure horizontal pod autoscaling based on the metrics of ApsaraMQ for RabbitMQ
This step shows how to obtain the information of the ApsaraMQ for RabbitMQ instance you created and configure Horizontal Pod Autoscaler (HPA) to automatically scale the application based on the metrics of ApsaraMQ for RabbitMQ.
Obtain the information of the ApsaraMQ for RabbitMQ instance.
Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.
On the Instances page, click the name of the instance you created to go to the Instance Details page. On the Endpoint Information tab, record the endpoint in the Endpoint column of Public Endpoint.
If you use a public endpoint when you create the ApsaraMQ for RabbitMQ instance, the instance may be vulnerable to malicious attacks and unauthorized access. In this example, a public endpoint is used only for demonstration. If your application is deployed in a virtual private cloud (VPC) and does not need to access the Internet, we recommend that you disable Internet access to enhance security.
In the left-side navigation pane, click Static Accounts and record the username and password that are used to log on to the instance.
In the left-side navigation pane, click vhosts and record the name of the virtual host (vhost). Example: amq-test.
Run the following command to create a connection string:
echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64
Replace
rabbitmq-username
andrabbitmq-password
with the username and password that you recorded, replacelocalhost
with the public endpoint that you recorded, and replacevhost
with the name of the vhost.Create a Secret based on the following YAML content:
apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1x****** # The connection string that you created.
Use the following YAML content to create a TriggerAuthentication object:
apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret # keda-rabbitmq-secret is the Secret that you created in the preceding step. key: host
Run the following command to deploy the TriggerAuthentication object in the cluster:
kubectl apply -f secret.yaml kubectl apply -f rabbitmq-trigger-auth.yaml
After the TriggerAuthentication object is deployed, you can use RabbitMQ triggers in KEDA. You can also reference the TriggerAuthentication object to connect to the ApsaraMQ for RabbitMQ instance to query metrics.
Create a file named ScaledObject.yaml and copy the following content to the file:
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: rabbitmq-deployment maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: queue-test mode: QueueLength value: "20" metricName: queue-test authenticationRef: name: keda-trigger-auth-rabbitmq-conn
Parameter
Description
Parameter
Description
scaleTargetRef
The object that you want to scale. In this example, the value is set to gsample-appg, which is the name of the Deployment that you created in Step 1: Create and deploy a Deployment.
maxReplicaCount
The maximum number of replicated pods.
minReplicaCount
The minimum number of replicated pods.
protocol
The protocol that is used by ack-keda and the ApsaraMQ for RabbitMQ instance to communicate. Valid values: auto, http, and amqp.
queueName
The name of the queue from which you want to read data.
value
The scale-out threshold.
metricName
The name of a custom metric based on which HPA scales the application. In this example, the queue-test metric is used, which indicates the queue length.
Configure the application and check the status of the ScaledObject and HPA.
Run the following command to create a ScaledObject:
kubectl apply -f ScaledObject.yaml
Run the following command to check the status of the ScaledObject:
kubectl get ScaledObject
Expected output:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17s
Run the following command to check whether HPA is deployed:
kubectl get hpa
Expected output:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
Step 3: Produce and consume data to trigger application scaling
This step shows how to use producer code and consumer code to enable HPA to scale the application based on the message queue length in ApsaraMQ for RabbitMQ.
Generate messages by using the following producer code:
package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // Replace with the name of the queue from which you want to read data. numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // The URL of the ApsaraMQ for RabbitMQ instance. The URL must be in the amqp://guest:password@localhost:5672/vhost format. ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") go produce(ch, q) select {} } // Produce messages. func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "Failed to publish a message") log.Printf("Successed to publish a message: %s", msg) time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
Run the following command to query information about HPA:
Run the producer code to send messages.
go run producer.go
Run the following command to query information about HPA:
kubectl get hpa
Expected output:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s
The output shows that the number of replicated pods for the sample-app application is scaled to the maximum value specified in KEDA.
Terminate the producer program and run the following consumer program to receive messages.
package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( queueName = "queue-test" url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // The URL of the ApsaraMQ for RabbitMQ instance. The URL must be in the amqp://guest:password@localhost:5672/vhost format. ) func main() { // Connect to ApsaraMQ for RabbitMQ. conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // Create a channel. ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // Claim a queue to ensure that the queue exists. _, err = ch.QueueDeclare( queueName, // The queue name. true, // Specify whether to enable persistent storage. false, // Specify whether to enable an exclusive queue. false, // Specify whether to store messages in the queue. false, // Specify whether to enable auto deletion. nil, // Additional attributes. ) failOnError(err, "Failed to declare a queue") // Obtain the message channel. msgs, err := ch.Consume( queueName, // The queue name. "", // The consumer name. You can leave this parameter empty. true, // Enable auto confirmation. false, // Specify whether to enable an exclusive queue. false, // Specify whether to store messages in the queue. false, // Specify whether to enable a prioritized queue. nil, // Additional parameters. ) failOnError(err, "Failed to register a consumer") // Create a goroutine to process the received messages. go func() { for msg := range msgs { log.Printf("Received a message: %s", msg.Body) // Process the received messages based on the message consumption logic. fmt.Printf("Processed message: %s\n", msg.Body) } }() // Block the main program and keep the queue running. select {} } // A simple error handling function. func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
Run the consumer code to receive messages.
go run consumer.go
Run the following command to check whether HPA scales in the application:
kubectl get hpa -w
Expected output:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15m
The output shows that the number of replicated pods for the sample-app application is scaled to the minimum value specified in KEDA at a point in time after data consumption ends.
References
For more information about how to configure HPA to scale an application based on the message accumulation metrics of ApsaraMQ for RocketMQ, see Keda based on RocketMQ metrics.