全部產品
Search
文件中心

Container Service for Kubernetes:使用Spark Operator運行Spark作業

更新時間:Oct 09, 2024

Apache Spark是一種專門用於大規模資料處理的計算引擎,廣泛應用於資料分析和機器學習等情境。Spark Operator提供了一種在Kubernetes叢集中自動化部署Spark作業和管理其生命週期的能力。本文介紹如何在ACK叢集中使用Spark Operator運行Spark作業,協助資料工程師快速高效地運行和管理巨量資料處理作業。

前提條件

Spark Operator介紹

Spark Operator專為在Kubernetes叢集中運行Spark工作負載而設計,旨在自動化管理Spark作業的生命週期。通過SparkApplicationScheduledSparkApplication等CRD資源,您可以靈活提交和管理Spark作業。利用Kubernetes的自動擴充、健全狀態檢查和資源管理等特性,Spark Operator可以更有效地監控和最佳化Spark作業的運行。ACK基於社區組件kubeflow/spark-operator提供了ack-spark-operator組件,更多資訊,請參見Spark Operator | Kubeflow

使用優勢:

  • 簡化管理:通過Kubernetes的聲明式作業配置,自動化部署Spark作業並管理作業的生命週期。

  • 支援多租戶:可利用Kubernetes的命名空間機制和資源配額機制進行使用者粒度資源隔離和資源分派,並利用Kubernetes的節點選擇機制保證 Spark工作負載可以獲得專用的資源。

  • 彈性資源供給:利用ECIElastic Container Instance或彈性節點池等彈性資源,可在業務高峰期快速獲得大量彈性資源,平衡效能和成本。

適用情境:

  • 資料分析:資料科學家可以利用Spark進行互動式資料分析和資料清洗等。

  • 批量資料計算:運行定時批次工作,處理大規模資料集。

  • 即時資料處理:Spark Streaming庫提供了對即時資料進行串流的能力。

流程概述

本文將引導您完成以下步驟,協助您瞭解如何使用Spark Operator在ACK叢集上運行和管理Spark作業,從而有效地進行巨量資料處理。

  1. 部署ack-spark-operator組件:在ACK叢集中安裝Spark Operator,使其能夠管理和運行Spark作業。

  2. 提交Spark作業:建立並提交一個Spark作業的設定檔,實現對特定資料處理任務的執行。

  3. 查看Spark作業:監控作業的運行狀態,擷取詳細的執行資訊和日誌。

  4. 訪問Spark Web UI:通過Web介面更直觀地瞭解Spark作業執行情況。

  5. 更新Spark作業:根據需求調整作業配置,支援動態更新參數。

  6. 刪除Spark作業:清理已完成或不再需要的Spark作業,避免產生預期外的費用。

步驟一:部署ack-spark-operator組件

  1. 登入Container Service管理主控台,在左側導覽列選擇市場 > 應用市場

  2. 應用市場頁面單擊應用目錄頁簽,然後搜尋並選中ack-spark-operator

  3. ack-spark-operator頁面,單擊一鍵部署

  4. 建立面板中,選擇叢集和命名空間,然後單擊下一步

  5. 參數配置頁面,設定相應參數,然後單擊確定

    下表列出了部分配置參數的說明。完整的參數配置詳情,您可以在ack-spark-operator頁面中的配置項查看。

    參數

    描述

    樣本值

    controller.replicas

    控制器副本數量。

    1(預設值)

    webhook.replicas

    Webhook副本數量。

    1(預設值)

    spark.jobNamespaces

    可運行Spark任務的命名空間列表。包含Null 字元串表示允許所有命名空間。多個命名空間使用英文半形逗號(,)隔開。

    • ["default"](預設值)

    • [""](所有命名空間)

    • ["ns1","ns2","ns3"](多個命名空間)

    spark.serviceAccount.name

    Spark作業會在spark.jobNamespaces指定的每個命名空間中自動建立名為spark-operator-spark的ServiceAccount和RBAC資源並進行相關授權。您可以自訂ServiceAccount名稱,後續提交Spark作業時請指定自訂建立的ServiceAccount名稱。

    spark-operator-spark(預設值)

步驟二:提交Spark作業

您可以通過建立SparkApplication資訊清單檔,提交一個實際的Spark作業以進行資料處理。

  1. 建立如下SparkApplication資訊清單檔,並儲存為spark-pi.yaml

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: default     # 需要確保命名空間在spark.jobNamespaces指定的命名空間列表中。
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments:
      - "1000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark   # 如果您自訂了ServiceAccount名稱,則需要進行相應修改。
      executor:
        instances: 1
        cores: 1
        coreLimit: 1200m
        memory: 512m
      restartPolicy:
        type: Never
  2. 執行以下命令,提交Spark作業。

    kubectl apply -f spark-pi.yaml

    預期輸出如下。

    sparkapplication.sparkoperator.k8s.io/spark-pi created

步驟三:查看Spark作業

您可以通過以下命令擷取Spark作業的運行狀態、相關Pod資訊及日誌。

  1. 執行以下命令,查看Spark作業運行狀態。

    kubectl get sparkapplication spark-pi

    預期輸出如下。

    NAME       STATUS      ATTEMPTS   START                  FINISH       AGE
    spark-pi   SUBMITTED   1          2024-06-04T03:17:11Z   <no value>   15s
  2. 執行以下命令,指定標籤sparkoperator.k8s.io/app-namespark-pi,查看Spark作業的Pod的運行狀態。

    kubectl get pod -l sparkoperator.k8s.io/app-name=spark-pi

    預期輸出如下。

    NAME                               READY   STATUS    RESTARTS   AGE
    spark-pi-7272428fc8f5f392-exec-1   1/1     Running   0          13s
    spark-pi-7272428fc8f5f392-exec-2   1/1     Running   0          13s
    spark-pi-driver                    1/1     Running   0          49s

    當Spark作業運行結束後,所有Executor Pod都將被Driver自動刪除。

  3. 執行以下命令,查看Spark作業詳細資料。

    kubectl describe sparkapplication spark-pi

    展開查看預期輸出

    具體輸出內容會根據當前作業運行狀態而有所不同。

    Name:         spark-pi
    Namespace:    default
    Labels:       <none>
    Annotations:  <none>
    API Version:  sparkoperator.k8s.io/v1beta2
    Kind:         SparkApplication
    Metadata:
      Creation Timestamp:  2024-06-04T03:16:59Z
      Generation:          1
      Resource Version:    1350200
      UID:                 1a1f9160-5dbb-XXXX-XXXX-be1c1fda4859
    Spec:
      Arguments:
        1000
      Driver:
        Core Limit:  1200m
        Cores:       1
        Memory:           512m
        Service Account:  spark
      Executor:
        Core Limit:  1200m
        Cores:       1
        Instances:   1
        Memory:               512m
      Image:                  registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      Image Pull Policy:      IfNotPresent
      Main Application File:  local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      Main Class:             org.apache.spark.examples.SparkPi
      Mode:                   cluster
      Restart Policy:
        Type:         Never
      Spark Version:  3.5.2
      Type:           Scala
    Status:
      Application State:
        State:  COMPLETED
      Driver Info:
        Pod Name:             spark-pi-driver
        Web UI Address:       172.XX.XX.92:0
        Web UI Port:          4040
        Web UI Service Name:  spark-pi-ui-svc
      Execution Attempts:     1
      Executor State:
        spark-pi-26c5XXXXX1408337-exec-1:  COMPLETED
      Last Submission Attempt Time:        2024-06-04T03:17:11Z
      Spark Application Id:                spark-0042dead12XXXXXX43675f09552a946
      Submission Attempts:                 1
      Submission ID:                       117ee161-3951-XXXX-XXXX-e7d24626c877
      Termination Time:                    2024-06-04T03:17:55Z
    Events:
      Type    Reason                     Age   From            Message
      ----    ------                     ----  ----            -------
      Normal  SparkApplicationAdded      91s   spark-operator  SparkApplication spark-pi was added, enqueuing it for submission
      Normal  SparkApplicationSubmitted  79s   spark-operator  SparkApplication spark-pi was submitted successfully
      Normal  SparkDriverRunning         61s   spark-operator  Driver spark-pi-driver is running
      Normal  SparkExecutorPending       56s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] is pending
      Normal  SparkExecutorRunning       53s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] is running
      Normal  SparkDriverCompleted       35s   spark-operator  Driver spark-pi-driver completed
      Normal  SparkApplicationCompleted  35s   spark-operator  SparkApplication spark-pi completed
      Normal  SparkExecutorCompleted     35s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] completed
  4. 執行以下命令,查看Driver Pod作業記錄的最後20行。

    kubectl logs --tail=20 spark-pi-driver

    預期輸出如下:

    24/05/30 10:05:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    24/05/30 10:05:30 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 7.942 s
    24/05/30 10:05:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
    24/05/30 10:05:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
    24/05/30 10:05:30 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 8.043996 s
    Pi is roughly 3.1419522314195225
    24/05/30 10:05:30 INFO SparkContext: SparkContext is stopping with exitCode 0.
    24/05/30 10:05:30 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1e18858fc8f56b14-driver-svc.default.svc:4040
    24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/05/30 10:05:30 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/05/30 10:05:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/05/30 10:05:30 INFO MemoryStore: MemoryStore cleared
    24/05/30 10:05:30 INFO BlockManager: BlockManager stopped
    24/05/30 10:05:30 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/05/30 10:05:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/05/30 10:05:30 INFO SparkContext: Successfully stopped SparkContext
    24/05/30 10:05:30 INFO ShutdownHookManager: Shutdown hook called
    24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /var/data/spark-14ed60f1-82cd-4a33-b1b3-9e5d975c5b1e/spark-01120c89-5296-4c83-8a20-0799eef4e0ee
    24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-5f98ed73-576a-41be-855d-dabdcf7de189

步驟四:訪問Spark Web UI

Spark作業提供了Web UI,可以監控Spark作業的執行狀態。通過使用kubectl port-forward命令,將連接埠轉寄到本地,來訪問該Web UI介面。Web UI服務僅在Spark作業運行期間(即Driver Pod處於Running狀態)可用,Spark作業結束後,Web UI將無法繼續訪問。

在部署ack-spark-operator組件時,controller.uiService.enable預設為true,會自動建立一個Service,您可以將其連接埠轉寄來訪問Web UI,但如果在部署組件時將controller.uiService.enable設定為false,則不會建立Service,也可以通過轉寄Pod的連接埠訪問Web UI。

重要

kubectl port-forward命令建立的連接埠轉寄僅適用於測試環境下的快速驗證,不適合在生產環境中使用,使用時請注意安全風險。

  1. 根據情況選擇通過Service或Pod來轉送連接埠以訪問Web UI。以下是相關的命令:

    • 執行以下命令,通過Service連接埠轉寄訪問Web UI。

      kubectl port-forward services/spark-pi-ui-svc 4040
    • 執行以下命令,通過Pod連接埠轉寄訪問Web UI。

      kubectl port-forward pods/spark-pi-driver 4040

      預期輸出如下。

      Forwarding from 127.0.0.1:4040 -> 4040
      Forwarding from [::1]:4040 -> 4040
  2. 通過http://127.0.0.1:4040訪問Web UI。

(可選)步驟五:更新Spark作業

如需修改Spark作業的參數,您可以更新Spark作業的資訊清單檔。

  1. 編輯資源資訊清單檔spark-pi.yaml,例如將作業參數arguments修改為10000executor數量修改為2

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments:
      - "10000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark
      executor:
        instances: 2
        cores: 1
        coreLimit: 1200m
        memory: 512m
      restartPolicy:
        type: Never
  2. 執行以下命令,更新Spark作業。

    kubectl apply -f spark-pi.yaml
  3. 執行以下命令,查看Spark作業狀態。

    kubectl get sparkapplication spark-pi

    Spark作業將再次開始運行。預期輸出如下。

    NAME       STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pi   RUNNING   1          2024-06-04T03:37:34Z   <no value>   20m

(可選)步驟六:刪除Spark作業

如果您已體驗完本教程,Spark作業已無需使用,您可以通過以下命令操作釋放相關資源。

執行以下命令,刪除上述步驟中建立的Spark作業。

kubectl delete -f spark-pi.yaml

您也可以執行以下命令。

kubectl delete sparkapplication spark-pi