すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:ApsaraMQ for RabbitMQ メトリックに基づく水平ポッド自動スケーリング

最終更新日:Nov 09, 2025

ack-keda コンポーネントは、イベントソースから定期的にデータを消費することで、イベント駆動のスケーリングを提供します。メッセージが蓄積されると、ack-keda は数秒以内にオフラインタスクのバッチのスケーリングをトリガーします。ApsaraMQ for RabbitMQ と Keda を使用して、キューの長さやメッセージレートなどのメトリックをモニターできます。このトピックでは、ApsaraMQ for RabbitMQ のキュー長メトリックと Keda イベント駆動オートスケーラーを使用してアプリケーションを自動的にスケーリングする方法について説明します。

前提条件

  • ack-keda コンポーネントがデプロイされていること。 詳細については、「イベント駆動のスケーリング」をご参照ください。

  • ApsaraMQ for RabbitMQ インスタンスと関連リソースが作成されていること。 詳細については、「リソースの作成」をご参照ください。

  • kubectl を使用して Kubernetes クラスターに接続していること。 詳細については、「kubectl を使用してクラスターに接続する」をご参照ください。

  • Go ランタイム環境がインストールされていること。

ステップ 1: ワークロードをデプロイしてアプリケーションを作成する

  1. ACK コンソールにログインします。 左側のナビゲーションウィンドウで、[クラスター] をクリックします。

  2. クラスター ページで、目的のクラスターを見つけてその名前をクリックします。 左側のペインで、[ワークロード] > [デプロイメント] を選択します。

  3. [デプロイメント] ページで、[YAML から作成] をクリックします。[テンプレート][カスタム] に設定します。次の YAML を使用して sample-app という名前のアプリケーションを作成し、[作成] をクリックします。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: consumer
            image: consumer  # これを実際の RabbitMQ コンシューマーのイメージに置き換えます。
            resources:
              limits:
                cpu: "500m"

ステップ 2: RabbitMQ メトリックに基づいてスケーリングの例をデプロイする

これらの手順では、ApsaraMQ for RabbitMQ インスタンスに関する情報を取得し、Horizontal Pod Autoscaler (HPA) を構成して、RabbitMQ メトリックに基づいてアプリケーションを自動的にスケーリングする方法を示します。

  1. ApsaraMQ for RabbitMQ インスタンスの情報を取得します。

    1. ApsaraMQ for RabbitMQ コンソールにログインします。 左側のナビゲーションウィンドウで、インスタンスリスト を選択します。

    2. ターゲットインスタンスの名前をクリックして、[インスタンスの詳細] ページに移動します。[エンドポイント] タブで、[パブリックエンドポイント][エンドポイント] を表示して記録します。

      パブリックエンドポイントを使用すると、外部からの攻撃や不正アクセスなどのセキュリティリスクが生じます。この例では、デモンストレーションのみを目的としてパブリックエンドポイントを使用しています。アプリケーションが Alibaba Cloud VPC 内で実行され、外部アクセスを必要としない場合は、パブリックアクセスを有効にしないでください。

    3. 左側のナビゲーションウィンドウで、[ユーザーと権限] をクリックします。[ユーザー名][パスワード] を確認し、記録します。

      ユーザーを作成していない場合は、「ユーザーと権限」を参照して作成してください。
    4. 左側のナビゲーションウィンドウで、[Vhosts] をクリックします。[Vhost] の値 (例: amq-test) を表示して記録します。

  2. 次のコマンドを実行して、接続認証文字列を作成して記録します。

    echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64

    rabbitmq-usernamerabbitmq-password を、取得した [ユーザー名][パスワード] に置き換えます。localhost を記録したエンドポイントに置き換えます。vhost を前のステップで記録した名前に置き換えます。

  3. 次の YAML を使用して Secret を作成します。

    apiVersion: v1
    kind: Secret
    metadata:
      name: keda-rabbitmq-secret
    data:
      host: YW1x****** # 作成した接続認証文字列。
  4. 次の YAML を使用して TriggerAuthentication オブジェクトを作成し、YAML ファイルをクラスターにデプロイします。

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: keda-trigger-auth-rabbitmq-conn
      namespace: default
    spec:
      secretTargetRef:
        - parameter: host
          name: keda-rabbitmq-secret # keda-rabbitmq-secret は、前のステップで作成した Secret です。
          key: host

    次のコマンドを実行して、YAML ファイルをクラスターにデプロイします。

    kubectl apply -f secret.yaml
    kubectl apply -f rabbitmq-trigger-auth.yaml

    デプロイが完了すると、Keda で RabbitMQ トリガーを使用できます。また、TriggerAuthentication オブジェクトを参照して RabbitMQ に接続し、メトリックデータを取得することもできます。

  5. ScaledObject.yaml という名前の YAML ファイルを次の内容で作成します。

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: sample-app
      maxReplicaCount: 10
      minReplicaCount: 1
      triggers:
      - type: rabbitmq
        metadata:
          protocol: amqp
          queueName: queue-test
          mode: QueueLength
          value: "20"
          metricName: queue-test 
        authenticationRef:
          name: keda-trigger-auth-rabbitmq-conn

    パラメーター

    説明

    scaleTargetRef

    スケーリングするオブジェクト。この例では、ステップ 1: ワークロードをデプロイしてアプリケーションを作成する で作成されたアプリケーションである sample-app に設定されています。

    maxReplicaCount

    レプリカの最大数。

    minReplicaCount

    レプリカの最小数。

    protocol

    Keda コンポーネントと RabbitMQ 間の通信プロトコル。有効な値: auto、http、amqp。

    queueName

    メッセージを読み取るキューの名前。

    value

    スケールアウトをトリガーするしきい値。

    metricName

    カスタムメトリックの名前。これにより、HPA はスケーリングの決定に使用するメトリックを認識します。ここでは、キューの長さを取得するために使用される RabbitMQ キューの名前 queue-test です。

  6. 構成を適用し、ScaledObject および HPA リソースのステータスを確認します。

    次のコマンドを実行してリソースを作成します。

    kubectl apply -f ScaledObject.yaml  

    次のコマンドを実行して、スケーリング設定のステータスを確認します。

    kubectl get ScaledObject

    期待される出力:

    NAME                    SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS   AUTHENTICATION                    READY   ACTIVE   FALLBACK   AGE
    rabbitmq-scaledobject   apps/v1.Deployment   sample-app        1     10   rabbitmq   keda-trigger-auth-rabbitmq-conn    True    False    False      17s

    次のコマンドを実行して HPA のステータスを確認します。

    kubectl get hpa

    期待される出力:

    NAME                             REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)   1         10        2          2m35s

ステップ 3: データを生成および消費してスケーリングをトリガーする

ApsaraMQ for RabbitMQ キューの次のプロデューサーおよびコンシューマーコードは、キューの長さを使用してコンテナーをスケーリングする方法を示しています。

  1. 次のプロデューサーコードを使用して RabbitMQ メッセージを生成します。

    package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"    // これをメッセージを読み取るキューの名前に置き換えます。
    	numMsgs   = 10000
    	pauseTime = 10 * time.Millisecond
            url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // RabbitMQ にアクセスするための URL。フォーマット: amqp://username:password@localhost:5672/vhost
    )
    
    func main() {
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		queueName,
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	go produce(ch, q)
    	select {}
    }
    // メッセージを生成します。
    func produce(ch *amqp.Channel, q amqp.Queue) {
    	for i := 0; i < numMsgs; i++ {
    		msg := fmt.Sprintf("Message %d", i)
    		err := ch.Publish(
    			"",
    			q.Name,
    			false,
    			false,
    			amqp.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(msg),
    			},
    		)
    		failOnError(err, "Failed to publish a message")
    		log.Printf("Successed to publish a message: %s", msg)
    		time.Sleep(pauseTime)
    	}
    }
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
  2. Go 環境を初期化し、依存関係をダウンロードします。

    go mod init producer
    go mod tidy
  3. HPA の詳細を確認します。

    プロデューサーコードを実行して、キューのメッセージを生成します。

    go run producer.go

    次のコマンドを実行して HPA の詳細を確認します。

    kubectl get hpa

    期待される出力:

    NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app    443000m/20 (avg)   1         10        10         9m15s

    期待される出力は、sample-app が Keda コンポーネントで設定されたレプリカの最大数までスケールアウトしたことを示しています。

  4. プロデューサープログラムを停止し、次のコンシューマープログラムを実行してキューからメッセージを消費します。

    package main
    
    import (
    	"fmt"
    	"log"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"
    	url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // RabbitMQ にアクセスするための URL。フォーマット: amqp://username:password@localhost:5672/vhost
    )
    
    func main() {
    	// RabbitMQ に接続します。
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	// チャンネルを作成します。
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	// キューが存在することを確認するためにキューを宣言します。
    	_, err = ch.QueueDeclare(
    		queueName, // キュー名
    		true,      // 永続的
    		false,     // 排他的
    		false,     // 待機なし
    		false,     // 自動削除
    		nil,       // 引数
    	)
    	failOnError(err, "Failed to declare a queue")
    
    	// メッセージチャンネルを取得します。
    	msgs, err := ch.Consume(
    		queueName, // キュー名
    		"",        // コンシューマー名。不要な場合は空のままにします。
    		true,      // 自動確認
    		false,     // 排他的
    		false,     // ローカルなし
    		false,     // 待機なし
    		nil,       // 引数
    	)
    	failOnError(err, "Failed to register a consumer")
    
    	// 受信したメッセージを処理するための goroutine を作成します。
    	go func() {
    		for msg := range msgs {
    			log.Printf("Received a message: %s", msg.Body)
    			// ここで受信したメッセージを処理できます (例: ビジネスロジックの実装)。
    			fmt.Printf("Processed message: %s\n", msg.Body)
    		}
    	}()
    
    	// メインプログラムをブロックして実行を継続させます。
    	select {}
    }
    
    // シンプルなエラー処理関数。
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }

    コンシューマーコードを実行してメッセージを消費します。

    go run consumer.go

    HPA のスケールインをモニターするには、次のコマンドを実行します。

    kubectl get hpa -w

    期待される出力:

    NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   235000m/20 (avg)   1         10        10         9m51s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        10         10m
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        1          15m

    期待される出力は、一定期間データ消費がない場合、sample-app が Keda コンポーネントで設定されたレプリカの最小数までスケールインすることを示しています。

参考

カスタム RocketMQ メッセージ蓄積メトリックに基づいて HPA を構成し、より柔軟なスケーリングを実現する方法については、「RocketMQ メトリックに基づく KEDA」をご参照ください。