すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:ApsaraMQ for RabbitMQのメトリックに基づく水平ポッド自動スケーリング

最終更新日:Dec 13, 2024

このトピックでは、ApsaraMQ for RabbitMQおよびイベント駆動型オートスケーラKEDAのメトリックに基づいて、水平ポッド自動スケーリングを設定する方法について説明します。

前提条件

  • ack-kedaコンポーネントが展開されます。 詳細については、「ACK KEDA」をご参照ください。

  • ApsaraMQ for RabbitMQインスタンスが作成されました。 詳細については、「手順2: リソースの作成」をご参照ください。

機能

企業のビジネスが発展するにつれて、メッセージングの需要も増加しています。 Message Queue for RabbitMQ は、高可用性分散ストレージに基づいて開発されたメッセージングサービスです。 Message Queue for RabbitMQは、AMQP (Advanced Message Queuing Protocol) 0-9-1をサポートし、高い同時実行性、分散アーキテクチャ、およびスケーラビリティを備えています。 RabbitMQおよびKEDAのMessage Queueを使用して、キューの長さとメッセージングレートのメトリックに基づいてアプリケーションをスケーリングできます。 これにより、リソース使用率を改善しながら、アプリケーションの変動する負荷を処理できます。 RabbitMQのメッセージキューは、データ送信のAMQPのみをサポートします。 オープンソースのRabbitMQは、データ送信用の複数のプロトコルをサポートします。

このトピックでは、ack-kedaを使用して、RabbitMQのMessage Queueのメトリックを、Horizontal Pod Autoscaler (HPA) でサポートされているメトリックに変換する方法の例を示します。

手順1: デプロイの作成とデプロイ

  1. ACKコンソールにログインします。 左側のナビゲーションウィンドウで、[クラスター] をクリックします。

  2. [クラスター] ページで、管理するクラスターの名前をクリックします。 左側のウィンドウで、[ワークロード] > [デプロイ] を選択します。

  3. [デプロイメント] ページで、右上隅の [YAMLから作成] をクリックします。

  4. [作成] ページで、[サンプルテンプレート][カスタム] に設定します。 次のコンテンツに基づいてsample-appという名前のデプロイを作成し、[作成] をクリックします。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: consumer
            image: consumer  # Replace with the actual image used by the consumer application of Message Queue for RabbitMQ. 
            resources:
              limits:
                cpu: "500m"

手順2: RabbitMQのMessage Queueのメトリックに基づいて水平ポッドの自動スケーリングを構成する

  1. ApsaraMQ for RabbitMQコンソールにログインします。 左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。

  2. [インスタンス] ページで、使用するインスタンスの名前をクリックして、[インスタンスの詳細] ページに移動します。 [エンドポイント情報] タブで、[パブリックエンドポイント][エンドポイント] 列にエンドポイントを記録します。

  3. 左側のナビゲーションウィンドウで、[静的アカウント] をクリックし、インスタンスへのログインに使用されるユーザー名パスワードを記録します。静态用户名密码..png

  4. 左側のナビゲーションウィンドウで、[vhosts] をクリックし、仮想ホスト (vhost) の名前を記録します。 例: amq-test获取vhost..png

  5. 次のコマンドを実行して、接続文字列を作成します。

    echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64

    rabbitmq-usernamerabbitmq-passwordを記録したユーザー名とパスワードに置き換え、localhostを記録したパブリックエンドポイントに置き換え、vhostをvhostの名前に置き換えます。

  6. 次のYAMLコンテンツに基づいてシークレットを作成します。

    apiVersion: v1
    kind: Secret
    metadata:
      name: keda-rabbitmq-secret
    data:
      host: YW1xWXpNmd4TVRBNEXN0 # The connection string that you created.

  7. 次のYAMLコンテンツを使用して、TriggerAuthenticationオブジェクトを作成します。

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: keda-trigger-auth-rabbitmq-conn
      namespace: default
    spec:
      secretTargetRef:
        - parameter: host
          name: keda-rabbitmq-secret # keda-rabbitmq-secret is the Secret that you created in the preceding step. 
          key: host

    次のコマンドを実行して、TriggerAuthenticationオブジェクトをクラスターにデプロイします。

    kubectl apply -f secret.yaml
    kubectl apply -f rabbitmq-trigger-auth.yaml

    TriggerAuthenticationオブジェクトのデプロイ後、KEDAでRabbitMQトリガーを使用できます。 TriggerAuthenticationオブジェクトを参照してMessage Queue for RabbitMQインスタンスに接続し、メトリクスを照会することもできます。

  8. ScaledObject.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: rabbitmq-deployment
      maxReplicaCount: 10
      minReplicaCount: 1
      triggers:
      - type: rabbitmq
        metadata:
          protocol: amqp
          queueName: amq-test
          mode: QueueLength
          value: "20"
          metricName: custom-testqueue 
        authenticationRef:
          name: keda-trigger-auth-rabbitmq-conn

    パラメーター

    説明

    scaleTargetRef

    スケーリングするオブジェクト。 この例では、値はgsample-appgに設定されます。これは、手順1: Deploymentの作成と配置で作成したDeploymentの名前です。

    maxReplicaCount

    レプリケートされたポッドの最大数。

    minReplicaCount

    レプリケートされたポッドの最小数。

    プロトコル

    ack-kedaとApsaraMQ for RabbitMQインスタンスが通信に使用するプロトコル。 有効な値: auto、http、amqp。

    queueName

    データを読み取るキューの名前。

    スケールアウトしきい値。

  9. 次のコマンドを実行してScaledObjectを作成します。

    // Deploy a ScaledObject. 
    kubectl apply -f ScaledObject.yaml   
    
    scaledobject.keda.sh/rabbitmq-scaledobject created
    
    // Query the status of the ScaledObject. 
    kubectl get ScaledObject
    
    NAME                    SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS   AUTHENTICATION                    READY   ACTIVE   FALLBACK   AGE
    rabbitmq-scaledobject   apps/v1.Deployment   sample-app        1     10    rabbitmq   keda-trigger-auth-rabbitmq-conn   True    False    False      17s
    
    // Check whether the HPA is deployed to scale the application. 
    kubectl get hpa
    
    NAME                             REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)   1         10        2          2m35s

ステップ3: アプリケーションのスケーリングをトリガーするためのデータの生成と消費

  1. 次のコードは、データの生成と使用に使用されます。

    )package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"    // Replace with the name of the queue from which you want to read data. 
    	numMsgs   = 10000
    	pauseTime = 10 * time.Millisecond
      url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // The URL of the Message Queue for RabbitMQ instance. The URL must be in the amqp://guest:password@localhost:5672/vhost format.
    )
    
    func main() {
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		queueName,
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	go produce(ch, q)
    	select {}
    }
    
    func produce(ch *amqp.Channel, q amqp.Queue) {
    	for i := 0; i < numMsgs; i++ {
    		msg := fmt.Sprintf("Message %d", i)
    		err := ch.Publish(
    			"",
    			q.Name,
    			false,
    			false,
    			amqp.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(msg),
    			},
    		)
    		failOnError(err, "Failed to publish a message")
    		log.Printf("Successed to publish a message: %s", msg)
    		time.Sleep(pauseTime)
    	}
    }
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
  2. プロデューサープログラムを実行してデータを生成し、次のコマンドを実行してHPAに関する情報を照会します。

    kubectl get hpa

    予想される出力

    NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s

    出力は、sample-appアプリケーションのレプリケートされたポッドの数がKEDAで指定された最大値にスケーリングされていることを示しています。

  3. プロデューサープログラムを停止し、コンシューマープログラムを実行します。 次に、次のコマンドを実行します。

    kubectl get hpa -w

    予想される出力

    NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   235000m/20 (avg)   1         10        10         9m51s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        10         10m
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        1          15m

    出力は、sample-appアプリケーションのレプリケートされたポッドの数が、データ消費が終了した時点でKEDAで指定された最小値にスケーリングされていることを示しています。

関連ドキュメント

ApsaraMQ For RocketMQおよびイベント駆動型オートスケーラーKEDAのメトリックに基づいて水平ポッドの自動スケーリングを設定する方法の詳細については、「ApsaraMQ for RocketMQメトリックに基づくKEDA」をご参照ください。