すべてのプロダクト
Search
ドキュメントセンター

:Spark Operatorを使用したSparkジョブの実行

最終更新日:Nov 11, 2024

Apache Sparkは、大規模なデータ処理用のコンピューティングエンジンです。 ビッグデータコンピューティングや機械学習のシナリオでワークロードを分析するために広く使用されています。 Spark Operatorは、Sparkジョブのデプロイを自動化し、KubernetesクラスターでSparkジョブのライフサイクルを管理する機能を提供します。 このトピックでは、Spark Operatorを使用してACKクラスターでSparkジョブを実行する方法について説明します。 これにより、データエンジニアはビッグデータ処理ジョブを迅速かつ効率的に実行および管理できます。

前提条件

Introduction to Spark Operator

Spark Operatorは、KubernetesクラスターでSparkジョブを実行し、Sparkジョブのライフサイクル管理を自動化するように設計されています。 SparkApplicationScheduledSparkApplicationなどのCustomResourceDefinitions (CRD) を使用して、Sparkジョブを送信および管理できます。 Spark Operatorは、自動スケーリング、ヘルスチェック、リソース管理などのKubernetes機能を活用することで、Sparkジョブの実行を効率的に監視および最適化できます。 ACKは、オープンソースのkubeflow/spark-operatorコンポーネントに基づいてack-spark-operatorコンポーネントを提供します。 詳細については、「Spark演算子 | Kubeflow」をご参照ください。

利点:

  • 簡易管理: Kubernetesの宣言型ジョブ設定を使用して、Sparkジョブのデプロイを自動化し、ライフサイクルを管理します。

  • マルチテナントのサポート: Kubernetesの名前空間とリソースクォータのメカニズムを使用して、ユーザー全体のリソースを割り当て、分離できます。 Kubernetesノード選択メカニズムを使用して、Sparkジョブが専用リソースを使用できるようにすることもできます。

  • Elasticリソースプロビジョニング: Elasticコンテナインスタンスまたはelasticノードプールを使用して、ピーク時に大量のelasticリソースを提供し、パフォーマンスとコストのバランスを取ります。

該当するシナリオ:

  • データ分析: データサイエンティストは、インタラクティブなデータ分析とデータクレンジングにSparkを使用できます。

  • バッチデータコンピューティング: スケジュールされたバッチジョブを実行して、多数のデータセットを処理できます。

  • リアルタイムデータ処理: Spark Streamingライブラリは、リアルタイムデータをストリーミングする機能を提供します。

手順の概要

このトピックでは、Spark Operatorを使用してACKクラスターでSparkジョブを実行および管理し、ビッグデータを効率的に処理する方法について説明します。

  1. ack-spark-operatorコンポーネントをインストールする: ACKクラスターにSpark Operatorをインストールして、Sparkジョブを管理および実行します。

  2. Sparkジョブの送信: Sparkジョブの構成ファイルを作成して送信し、データ処理タスクを実行します。

  3. Sparkジョブの表示: ジョブのステータスを監視し、詳細な情報とログを取得します。

  4. Spark web UIにアクセスする: webインターフェイスでSparkジョブの実行を表示します。

  5. Sparkジョブの更新: ビジネス要件に基づいてジョブ設定を変更し、パラメーターを動的に変更できます。

  6. Sparkジョブの削除: 完了した、またはコストを削減するために不要になったSparkジョブを削除できます。

ステップ1: ack-spark-operatorコンポーネントのインストール

  1. ACKコンソールにログインします。 左側のナビゲーションウィンドウで、[Marketplace] > [Marketplace] を選択します。

  2. [マーケットプレイス] ページで、[アプリカタログ] タブをクリックします。 [ack-spark-operator] を見つけてクリックします。

  3. ack-spark-operatorページで、[デプロイ] をクリックします。

  4. [デプロイ] パネルで、クラスターと名前空間を選択し、[次へ] をクリックします。

  5. [パラメーター] ステップでパラメーターを設定し、[OK] をクリックします。

    下表に一部のパラメーターを示します。 パラメーター設定は、ack-spark-operatorページの [パラメーター] セクションにあります。

    パラメーター

    説明

    controller.replicas

    コントローラーレプリカの数。

    デフォルト値は 1 です。

    webhook. レプリカ

    webhookレプリカの数。

    デフォルト値は 1 です。

    spark.jobNamespaces

    Sparkジョブを実行できる名前空間。 このパラメーターを空のままにすると、すべての名前空間でSparkジョブを実行できます。 複数の名前空間は、コンマ (,) で区切ります。

    • デフォルト値: ["Default"]

    • ["]: すべての名前空間。

    • ["ns1","ns2","ns3"]: 1つ以上の名前空間を指定します。

    spark.serviceAccount.name

    Sparkジョブは、spark-operator-sparkという名前のServiceAccountと、spark.jobNamespacesで指定された各名前空間に対応するロールベースのアクセス制御 (RBAC) リソースを自動的に作成します。 ServiceAccountのカスタム名を指定し、Sparkジョブの送信時にカスタム名を指定できます。

    デフォルト値: spark-operator-spark

ステップ2: Sparkジョブを送信する

SparkApplication YAMLファイルを作成して、データ処理用のSparkジョブを送信できます。

  1. spark-pi. YAMLという名前のSparkApplication yamlファイルを作成します。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: default     # Make sure that the namespace is in the namespace list specified by spark.jobNamespaces. 
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments:
      - "1000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark   # Replace spark-operator-spark with the custom name that you specified. 
      executor:
        instances: 1
        cores: 1
        coreLimit: 1200m
        memory: 512m
      restartPolicy:
        type: Never
  2. 次のコマンドを実行して、Sparkジョブを送信します。

    kubectl apply -f spark-pi.yaml

    期待される出力:

    sparkapplication.sparkoperator.k8s.io/spark-pi created

ステップ3: Sparkジョブを表示する

次のコマンドを実行して、Sparkジョブのステータス、ポッド情報、およびログを照会できます。

  1. 次のコマンドを実行して、Sparkジョブのステータスを表示します。

    kubectl get sparkapplication spark-pi

    期待される出力:

    NAME       STATUS      ATTEMPTS   START                  FINISH       AGE
    spark-pi   SUBMITTED   1          2024-06-04T03:17:11Z   <no value>   15s
  2. 次のコマンドを実行し、sparkoperatorを設定します。 Kubernetes、 io/app-nameラベルをspark-piに設定して、Sparkジョブを実行するポッドのステータスを確認します。

    kubectl get pod -l sparkoperator.k8s.io/app-name=spark-pi

    期待される出力:

    NAME                               READY   STATUS    RESTARTS   AGE
    spark-pi-7272428fc8f5f392-exec-1   1/1     Running   0          13s
    spark-pi-7272428fc8f5f392-exec-2   1/1     Running   0          13s
    spark-pi-driver                    1/1     Running   0          49s

    Sparkジョブが完了すると、すべてのエグゼキューターポッドがドライバによって自動的に削除されます。

  3. 次のコマンドを実行して、Sparkジョブの詳細を表示します。

    kubectl describe sparkapplication spark-pi

    期待出力の表示

    出力は、現在のジョブのステータスによって異なります。

    Name:         spark-pi
    Namespace:    default
    Labels:       <none>
    Annotations:  <none>
    API Version:  sparkoperator.k8s.io/v1beta2
    Kind:         SparkApplication
    Metadata:
      Creation Timestamp:  2024-06-04T03:16:59Z
      Generation:          1
      Resource Version:    1350200
      UID:                 1a1f9160-5dbb-XXXX-XXXX-be1c1fda4859
    Spec:
      Arguments:
        1000
      Driver:
        Core Limit:  1200m
        Cores:       1
        Memory:           512m
        Service Account:  spark
      Executor:
        Core Limit:  1200m
        Cores:       1
        Instances:   1
        Memory:               512m
      Image:                  registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      Image Pull Policy:      IfNotPresent
      Main Application File:  local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      Main Class:             org.apache.spark.examples.SparkPi
      Mode:                   cluster
      Restart Policy:
        Type:         Never
      Spark Version:  3.5.2
      Type:           Scala
    Status:
      Application State:
        State:  COMPLETED
      Driver Info:
        Pod Name:             spark-pi-driver
        Web UI Address:       172.XX.XX.92:0
        Web UI Port:          4040
        Web UI Service Name:  spark-pi-ui-svc
      Execution Attempts:     1
      Executor State:
        spark-pi-26c5XXXXX1408337-exec-1:  COMPLETED
      Last Submission Attempt Time:        2024-06-04T03:17:11Z
      Spark Application Id:                spark-0042dead12XXXXXX43675f09552a946
      Submission Attempts:                 1
      Submission ID:                       117ee161-3951-XXXX-XXXX-e7d24626c877
      Termination Time:                    2024-06-04T03:17:55Z
    Events:
      Type    Reason                     Age   From            Message
      ----    ------                     ----  ----            -------
      Normal  SparkApplicationAdded      91s   spark-operator  SparkApplication spark-pi was added, enqueuing it for submission
      Normal  SparkApplicationSubmitted  79s   spark-operator  SparkApplication spark-pi was submitted successfully
      Normal  SparkDriverRunning         61s   spark-operator  Driver spark-pi-driver is running
      Normal  SparkExecutorPending       56s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] is pending
      Normal  SparkExecutorRunning       53s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] is running
      Normal  SparkDriverCompleted       35s   spark-operator  Driver spark-pi-driver completed
      Normal  SparkApplicationCompleted  35s   spark-operator  SparkApplication spark-pi completed
      Normal  SparkExecutorCompleted     35s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] completed
  4. 次のコマンドを実行して、ドライバポッドの最後の20のログエントリを表示します。

    kubectl logs -- tail=20 spark-pi-driver

    期待される出力:

    24/05/30 10:05:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    24/05/30 10:05:30 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 7.942 s
    24/05/30 10:05:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
    24/05/30 10:05:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
    24/05/30 10:05:30 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 8.043996 s
    Pi is roughly 3.1419522314195225
    24/05/30 10:05:30 INFO SparkContext: SparkContext is stopping with exitCode 0.
    24/05/30 10:05:30 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1e18858fc8f56b14-driver-svc.default.svc:4040
    24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/05/30 10:05:30 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/05/30 10:05:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/05/30 10:05:30 INFO MemoryStore: MemoryStore cleared
    24/05/30 10:05:30 INFO BlockManager: BlockManager stopped
    24/05/30 10:05:30 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/05/30 10:05:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/05/30 10:05:30 INFO SparkContext: Successfully stopped SparkContext
    24/05/30 10:05:30 INFO ShutdownHookManager: Shutdown hook called
    24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /var/data/spark-14ed60f1-82cd-4a33-b1b3-9e5d975c5b1e/spark-01120c89-5296-4c83-8a20-0799eef4e0ee
    24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-5f98ed73-576a-41be-855d-dabdcf7de189

    の削除

ステップ4: Spark web UIにアクセスする

Sparkジョブは、Sparkジョブの実行を監視するweb UIを提供します。 Spark web UIにアクセスするために、kubectl port-forwardコマンドを実行して、クラスター内のポートをローカルポートにマップします。 Spark web UIサービスは、Sparkジョブが実行中であるか、ドライバーポッドが [実行中] 状態の場合にのみ使用できます。 Sparkジョブが完了すると、web UIにアクセスできなくなります。

ack-spark-operatorコンポーネントをデプロイすると、controller.uiService.enableが自動的にtrueに設定され、サービスが自動的に作成されます。 サービスのポートをローカルポートにマップして、web UIにアクセスできます。 コンポーネントのデプロイ時にcontroller.uiService.enablefalseに設定した場合、サービスは作成されません。 この場合、ポッドのポートをマッピングすることで、web UIにアクセスできます。

重要

kubectl port-forwardコマンドで指定されたローカルポートは、テスト環境にのみ適しており、運用環境には適していません。 この方法を使用するときは注意してください。

  1. ビジネス要件に基づいて、サービスのポートまたはポッドのポートをローカルポートにマップできます。 次のセクションでは、関連するコマンドについて説明します。

    • 次のコマンドを実行して、サービスのポートをマッピングしてweb UIにアクセスします。

      kubectl port-forward services/spark-pi-ui-svc 4040
    • 次のコマンドを実行して、ポッドのポートをマッピングしてweb UIにアクセスします。

      kubectl port-forward pods/spark-pi-driver 4040

      期待される出力:

      Forwarding from 127.0.0.1:4040 -> 4040
      Forwarding from [::1]:4040 -> 4040

      からの転送

  2. http:// 127.0.0.1:4040からweb UIにアクセスします。

(オプション) 手順5: Sparkジョブの更新

Sparkジョブのパラメーターを変更するには、SparkジョブのYAMLファイルを更新します。

  1. spark-pi. YAMLという名前のyamlファイルを変更します。 たとえば、argumentsパラメーターを10000に、executorパラメーターを2に設定します。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments:
      - "10000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark
      executor:
        instances: 2
        cores: 1
        coreLimit: 1200m
        memory: 512m
      restartPolicy:
        type: Never
  2. 次のコマンドを実行して、Sparkジョブを更新します。

    kubectl apply -f spark-pi.yaml
  3. 次のコマンドを実行して、Sparkジョブのステータスを表示します。

    kubectl get sparkapplication spark-pi

    Sparkジョブが再び実行されます。 期待される出力:

    NAME       STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pi   RUNNING   1          2024-06-04T03:37:34Z   <no value>   20m

(オプション) 手順6: Sparkジョブを削除する

このトピックのすべての手順を実行した後、Sparkジョブが不要になった場合は、次のコマンドを使用して関連するリソースを解放できます。

次のコマンドを実行して、前の手順で作成したSparkジョブを削除します。

kubectl delete -f spark-pi.yaml

次のコマンドを実行することもできます。

kubectl delete sparkapplication spark-pi