本文介绍如何使用RabbitMQ指标和事件驱动自动伸缩工具Keda实现应用的弹性伸缩。
前提条件
功能介绍
随着企业业务的不断发展,对应用程序的要求也逐渐提高。消息队列RabbitMQ版是一款基于高可用分布式存储架构实现的AMQP 0-9-1协议的消息产品,具备高并发、分布式、灵活扩缩容等云消息服务优势。使用RabbitMQ和Keda可以监控队列长度和消息速率指标,根据需求进行弹性伸缩应用,帮助您优化资源使用和满足应用程序的负载需求。目前阿里云RabbitMQ只支持amqp协议传输数据,RabbitMQ社区版本支持多种协议。
本文举例如何配置ack-keda,实现将RabbitMQ指标转换为HPA可用指标,并实现容器自动伸缩。
步骤一:部署工作负载创建应用
登录容器服务管理控制台,在左侧导航栏选择集群。
在集群列表页面,单击目标集群名称,然后在左侧导航栏,选择 。
在无状态页面,单击使用YAML创建资源。
在创建页面,选择示例模板为自定义。使用如下内容创建名为sample-app的应用,然后单击创建。
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # 修改为业务真实的RabbitMQ的消费者镜像。 resources: limits: cpu: "500m"
步骤二:部署基于RabbitMQ指标的弹性示例
登录云消息队列 RabbitMQ 版控制台,然后在左侧导航栏选择实例列表。
单击目标实例名称,进入实例详情页面。在接入点信息页签,查看并记录公网接入点的Endpoint。
在左侧导航栏中单击静态用户名密码,查看并记录用户名和密码。
在左侧导航栏中单击Vhost列表,查看并记录Vhost的值,例如amq-test。
执行如下指令,创建连接认证字符串。
echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64
其中rabbitmq-username和rabbitmq-password分别为记录的用户名和密码、localhost为记录的Endpoint、vhost为上一步记录的名称。
使用如下YAML创建Secret。
apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1xWXpNmd4TVRBNEXN0 # 已创建的连接认证字符串。
使用如下YAML创建TriggerAuthentication对象,并将该YAML文件部署到Kubernetes集群中。
apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret # keda-rabbitmq-secret为上一步创建的Secret。 key: host
执行如下命令,将YAML文件部署到Kubernetes集群中。
kubectl apply -f secret.yaml kubectl apply -f rabbitmq-trigger-auth.yaml
部署完成后,您可以在Keda中使用RabbitMQ触发器,并且可以通过引用TriggerAuthentication对象来连接RabbitMQ获取指标数据。
使用如下内容,创建YAML文件ScaledObject.yaml。
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: rabbitmq-deployment maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: amq-test mode: QueueLength value: "20" metricName: custom-testqueue authenticationRef: name: keda-trigger-auth-rabbitmq-conn
参数
说明
scaleTargetRef
配置扩缩容的对象,这里配置步骤一:部署工作负载创建应用已创建的应用sample-app。
maxReplicaCount
最大副本数。
minReplicaCount
最小副本数。
protocol
keda组件与RabitMQ之间的通信协议。取值范围:auto、http、amqp。
queueName
待读取信息的队列名称。
value
触发扩容的阈值。
执行如下命令,创建资源。
// 下发伸缩配置。 kubectl apply -f ScaledObject.yaml scaledobject.keda.sh/rabbitmq-scaledobject created // 获取伸缩配置状态。 kubectl get ScaledObject NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17s // 检查HPA的生成情况。 kubectl get hpa NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
步骤三:生产及消费数据实现扩缩容
本次测试借助下面代码生产及消费数据。
)package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // 替换为待读取信息的队列名称。 numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // 访问的RabbitMQ的url,拼接方式amqp://guest:password@localhost:5672/vhost ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") go produce(ch, q) select {} } func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "Failed to publish a message") log.Printf("Successed to publish a message: %s", msg) time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
运行Producer程序生产数据,然后执行如下命令,查看HPA详情。
kubectl get hpa
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s
预期输出表明,sample-app已经扩容到keda组件设置的最大值。
关闭Producer程序,运行Consumer程序,然后执行如下命令。
kubectl get hpa -w
预期输出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15m
预期输出表明,在数据消费结束一段时间后,sample-app缩容至keda组件设置的最小值。
相关文档
关于如何利用Kubernetes事件驱动自动伸缩工具Keda,根据自定义的RocketMQ消息堆积指标,启动容器水平伸缩(HPA),请参见基于RocketMQ指标的容器水平伸缩。