ack-keda コンポーネントは、イベントソースから定期的にデータを消費することで、イベント駆動のスケーリングを提供します。メッセージが蓄積されると、ack-keda は数秒以内にオフラインタスクのバッチのスケーリングをトリガーします。ApsaraMQ for RabbitMQ と Keda を使用して、キューの長さやメッセージレートなどのメトリックをモニターできます。このトピックでは、ApsaraMQ for RabbitMQ のキュー長メトリックと Keda イベント駆動オートスケーラーを使用してアプリケーションを自動的にスケーリングする方法について説明します。
前提条件
ack-keda コンポーネントがデプロイされていること。 詳細については、「イベント駆動のスケーリング」をご参照ください。
ApsaraMQ for RabbitMQ インスタンスと関連リソースが作成されていること。 詳細については、「リソースの作成」をご参照ください。
kubectl を使用して Kubernetes クラスターに接続していること。 詳細については、「kubectl を使用してクラスターに接続する」をご参照ください。
Go ランタイム環境がインストールされていること。
ステップ 1: ワークロードをデプロイしてアプリケーションを作成する
ACK コンソールにログインします。 左側のナビゲーションウィンドウで、[クラスター] をクリックします。
クラスター ページで、目的のクラスターを見つけてその名前をクリックします。 左側のペインで、 を選択します。
[デプロイメント] ページで、[YAML から作成] をクリックします。[テンプレート] を [カスタム] に設定します。次の YAML を使用して sample-app という名前のアプリケーションを作成し、[作成] をクリックします。
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # これを実際の RabbitMQ コンシューマーのイメージに置き換えます。 resources: limits: cpu: "500m"
ステップ 2: RabbitMQ メトリックに基づいてスケーリングの例をデプロイする
これらの手順では、ApsaraMQ for RabbitMQ インスタンスに関する情報を取得し、Horizontal Pod Autoscaler (HPA) を構成して、RabbitMQ メトリックに基づいてアプリケーションを自動的にスケーリングする方法を示します。
ApsaraMQ for RabbitMQ インスタンスの情報を取得します。
ApsaraMQ for RabbitMQ コンソールにログインします。 左側のナビゲーションウィンドウで、インスタンスリスト を選択します。
ターゲットインスタンスの名前をクリックして、[インスタンスの詳細] ページに移動します。[エンドポイント] タブで、[パブリックエンドポイント] の [エンドポイント] を表示して記録します。
パブリックエンドポイントを使用すると、外部からの攻撃や不正アクセスなどのセキュリティリスクが生じます。この例では、デモンストレーションのみを目的としてパブリックエンドポイントを使用しています。アプリケーションが Alibaba Cloud VPC 内で実行され、外部アクセスを必要としない場合は、パブリックアクセスを有効にしないでください。
左側のナビゲーションウィンドウで、[ユーザーと権限] をクリックします。[ユーザー名] と [パスワード] を確認し、記録します。
ユーザーを作成していない場合は、「ユーザーと権限」を参照して作成してください。
左側のナビゲーションウィンドウで、[Vhosts] をクリックします。[Vhost] の値 (例: amq-test) を表示して記録します。
次のコマンドを実行して、接続認証文字列を作成して記録します。
echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64rabbitmq-usernameとrabbitmq-passwordを、取得した [ユーザー名] と [パスワード] に置き換えます。localhostを記録したエンドポイントに置き換えます。vhostを前のステップで記録した名前に置き換えます。次の YAML を使用して Secret を作成します。
apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1x****** # 作成した接続認証文字列。次の YAML を使用して TriggerAuthentication オブジェクトを作成し、YAML ファイルをクラスターにデプロイします。
apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret # keda-rabbitmq-secret は、前のステップで作成した Secret です。 key: host次のコマンドを実行して、YAML ファイルをクラスターにデプロイします。
kubectl apply -f secret.yaml kubectl apply -f rabbitmq-trigger-auth.yamlデプロイが完了すると、Keda で RabbitMQ トリガーを使用できます。また、TriggerAuthentication オブジェクトを参照して RabbitMQ に接続し、メトリックデータを取得することもできます。
ScaledObject.yaml という名前の YAML ファイルを次の内容で作成します。
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: sample-app maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: queue-test mode: QueueLength value: "20" metricName: queue-test authenticationRef: name: keda-trigger-auth-rabbitmq-connパラメーター
説明
scaleTargetRefスケーリングするオブジェクト。この例では、ステップ 1: ワークロードをデプロイしてアプリケーションを作成する で作成されたアプリケーションである sample-app に設定されています。
maxReplicaCountレプリカの最大数。
minReplicaCountレプリカの最小数。
protocolKeda コンポーネントと RabbitMQ 間の通信プロトコル。有効な値: auto、http、amqp。
queueNameメッセージを読み取るキューの名前。
valueスケールアウトをトリガーするしきい値。
metricNameカスタムメトリックの名前。これにより、HPA はスケーリングの決定に使用するメトリックを認識します。ここでは、キューの長さを取得するために使用される RabbitMQ キューの名前 queue-test です。
構成を適用し、ScaledObject および HPA リソースのステータスを確認します。
次のコマンドを実行してリソースを作成します。
kubectl apply -f ScaledObject.yaml次のコマンドを実行して、スケーリング設定のステータスを確認します。
kubectl get ScaledObject期待される出力:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17s次のコマンドを実行して HPA のステータスを確認します。
kubectl get hpa期待される出力:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
ステップ 3: データを生成および消費してスケーリングをトリガーする
ApsaraMQ for RabbitMQ キューの次のプロデューサーおよびコンシューマーコードは、キューの長さを使用してコンテナーをスケーリングする方法を示しています。
次のプロデューサーコードを使用して RabbitMQ メッセージを生成します。
package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // これをメッセージを読み取るキューの名前に置き換えます。 numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // RabbitMQ にアクセスするための URL。フォーマット: amqp://username:password@localhost:5672/vhost ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") go produce(ch, q) select {} } // メッセージを生成します。 func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "Failed to publish a message") log.Printf("Successed to publish a message: %s", msg) time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }Go 環境を初期化し、依存関係をダウンロードします。
go mod init producer go mod tidyHPA の詳細を確認します。
プロデューサーコードを実行して、キューのメッセージを生成します。
go run producer.go次のコマンドを実行して HPA の詳細を確認します。
kubectl get hpa期待される出力:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s期待される出力は、sample-app が Keda コンポーネントで設定されたレプリカの最大数までスケールアウトしたことを示しています。
プロデューサープログラムを停止し、次のコンシューマープログラムを実行してキューからメッセージを消費します。
package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( queueName = "queue-test" url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // RabbitMQ にアクセスするための URL。フォーマット: amqp://username:password@localhost:5672/vhost ) func main() { // RabbitMQ に接続します。 conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // チャンネルを作成します。 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // キューが存在することを確認するためにキューを宣言します。 _, err = ch.QueueDeclare( queueName, // キュー名 true, // 永続的 false, // 排他的 false, // 待機なし false, // 自動削除 nil, // 引数 ) failOnError(err, "Failed to declare a queue") // メッセージチャンネルを取得します。 msgs, err := ch.Consume( queueName, // キュー名 "", // コンシューマー名。不要な場合は空のままにします。 true, // 自動確認 false, // 排他的 false, // ローカルなし false, // 待機なし nil, // 引数 ) failOnError(err, "Failed to register a consumer") // 受信したメッセージを処理するための goroutine を作成します。 go func() { for msg := range msgs { log.Printf("Received a message: %s", msg.Body) // ここで受信したメッセージを処理できます (例: ビジネスロジックの実装)。 fmt.Printf("Processed message: %s\n", msg.Body) } }() // メインプログラムをブロックして実行を継続させます。 select {} } // シンプルなエラー処理関数。 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }コンシューマーコードを実行してメッセージを消費します。
go run consumer.goHPA のスケールインをモニターするには、次のコマンドを実行します。
kubectl get hpa -w期待される出力:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15m期待される出力は、一定期間データ消費がない場合、sample-app が Keda コンポーネントで設定されたレプリカの最小数までスケールインすることを示しています。
参考
カスタム RocketMQ メッセージ蓄積メトリックに基づいて HPA を構成し、より柔軟なスケーリングを実現する方法については、「RocketMQ メトリックに基づく KEDA」をご参照ください。