全部產品
Search
文件中心

Container Service for Kubernetes:Spark on ACK概述

更新時間:Dec 23, 2025

Spark on ACK是ACK基於Spark on Kubernetes提供的解決方案,讓您能夠基於ACK提供的企業級容器應用管理能力,快速構建高效、靈活且可擴充的Spark巨量資料處理平台。

Spark on ACK介紹

Apache Spark是一種專門用於大規模資料處理的計算引擎,廣泛應用於資料分析和機器學習等情境。自 2.3 版本起,Spark支援將作業提交至 Kubernetes 叢集中(Running Spark on Kubernetes)。

Spark Operator是用於在Kubernetes叢集中運行Spark工作負載的Operator,支援以Kubernetes原生的方式自動化管理Spark作業的生命週期,包括作業配置、作業提交、作業重試等。

Spark on ACK解決方案對Spark Operator等相關組件進行了定製與最佳化,相容開源版本並進一步拓展了功能特性。通過與阿里雲產品生態的無縫整合,例如Log Service、Object Storage Service和可觀測性等,您可以基於Spark on ACK快速構建一個靈活、高效且可擴充的巨量資料處理平台。

功能優勢

  • 簡化開發與營運

    • 可移植性:支援將Spark應用及其依賴打包為標準化的容器鏡像,從而實現Spark作業在不同 Kubernetes叢集間的無縫遷移。

    • 可觀測性:支援通過Spark History Server組件查看作業運行狀態,並整合阿里雲Log ServiceSLS和可觀測監控Prometheus版,進一步提升作業的可觀測性。

    • 工作流程編排:通過工作流程編排引擎(例如Apache AirflowArgo Workflows)編排 Spark 作業,能夠實現資料處理流水線的自動化、高效調度與跨環境一致性部署,提升營運效率並降低遷移成本。

    • 多版本支援:支援在單個ACK叢集中同時運行多個不同版本的Spark作業。

  • 作業調度與資源管理

    • 作業隊列管理:與ack-kube-queue整合,提供靈活的作業隊列管理和資源配額管理,自動最佳化工作負載資源分派並提升叢集資源使用率。

    • 多種調度策略:複用ACK調度器已有的調度能力,支援多種批調度策略,包括Gang Scheduling、Capacity Scheduling等。

    • 多架構調度:支援混合使用x86和Arm架構的ECS資源,例如通過使用倚天Arm架構伺服器實現增效降本。

    • 多叢集調度:通過ACK One多叢集艦隊將Spark作業在多叢集中進行調度和分發,提升多叢集資源使用率。

    • 彈性算力供給:支援自訂彈性資源優先順序調度,混合使用多種彈性方案,包括節點自動調整節點即時彈性等;也支援使用ECI、ACS算力,無需保有雲端服務器執行個體,按需使用,靈活擴縮。

    • 在離線混部:與ack-koordinator整合,支援在離線混部,提高叢集資源使用率。

  • 效能與穩定性最佳化

    • Shuffle效能最佳化:通過配置Spark作業使用Celeborn作為Remote Shuffle Service,實現存算分離,提升Shuffle效能和穩定性。

    • 資料訪問加速:基於Fluid資料編排和訪問加速能力,加速Spark作業資料訪問,提升作業效能。

整體架構

在ACK叢集中部署Spark作業時,您可以通過Spark Operator快速提交作業,使用ACK與阿里雲產品整合帶來的可觀測、調度、資源彈性等能力。Spark on ACK的整體架構如下。

image
  • 用戶端:通過kubectl、Arena等命令列工具將Spark作業提交至ACK叢集。

  • 工作流程:通過Apache Airflow、Argo Workflows等工作流程架構來編排Spark作業並提交至ACK叢集。

  • 可觀測:通過Spark History Server、阿里雲Log ServiceSLS、阿里雲可觀測監控 Prometheus 版搭建可觀測體系,包括查看作業運行狀態、收集和分析作業日誌和監控指標等。

  • Spark Operator:自動化管理Spark作業的生命週期,包括作業組態管理、作業提交和重試等。

  • Remote Shuffle Service(RSS):使用Apache Celeborn作為 RSS,提高Spark作業在Shuffle時的效能和穩定性。

  • 緩衝:使用Fluid作為分布式緩衝系統實現資料接入和資料訪問加速。

  • 雲基礎設施:作業運行過程中將使用到阿里雲提供的基礎設施,包括計算資源(ECSECIACS)、儲存資源(雲端硬碟NASOSS)和網路資源(ENIVPCSLB)等。

計費說明

在ACK叢集中運行Spark作業時,相關組件的安裝是免費的。使用過程中,ACK叢集本身的費用(叢集管理費和相關雲產品費用)仍然正常收取,請參見計費概述

如果您同時使用了其他雲產品,例如通過Log ServiceSLS收集Spark作業產生的日誌、Spark作業讀寫OSS/NAS中的資料等,產生的雲產品費用由各雲產品收取。您可以參見下文的操作文檔瞭解。

開始使用

在ACK叢集中運行Spark作業的大致流程如下,包括基礎使用、可觀測性和高階配置,供您按需選擇和配置。

image

基礎使用

流程

說明

構建Spark容器鏡像

您可以選擇直接使用開源社區提供的Spark容器鏡像,或者基於開源容器鏡像進行定製並推送到您自己的鏡像倉庫中。下面是一個Dockerfile樣本,您可以按需修改此Dockerfile,例如替換Spark基礎鏡像、加入依賴Jar包等,然後構建鏡像並推送到鏡像倉庫中。

展開查看樣本Dockerfile

ARG SPARK_IMAGE=spark:3.5.4

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

# Add dependency for log4j-layout-template-json
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-layout-template-json/2.24.1/log4j-layout-template-json-2.24.1.jar ${SPARK_HOME}/jars

# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.3/celeborn-client-spark-3-shaded_2.12-0.5.3.jar ${SPARK_HOME}/jars

建立專屬命名空間

為Spark作業建立一個或多個專屬的命名空間(本教程使用spark),用於實現資源隔離和資源配額等。後續Spark作業都將運行在該命名空間中。建立命令如下。

kubectl create namespace spark

使用Spark Operator運行Spark作業

部署ack-spark-operator組件,並配置 spark.jobNamespaces=["spark"](只監聽spark命名空間中提交的Spark作業)。部署完成後,即可運行下述樣本Spark作業。

展開查看樣本Spark作業

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark # 需要確保此命名空間在 spark.jobNamespaces 指定的命名空間列表中。
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成您自己的 Spark 容器鏡像。
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
        serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
詳細內容,請參見使用Spark Operator運行Spark作業

讀寫OSS資料

Spark作業訪問阿里雲OSS資料有多種方式,包括Hadoop Aliyun SDKHadoop AWS SDKJindoSDK等,根據選擇的SDK,您需要在Spark容器鏡像中包含相應的依賴並在Spark作業中配置Hadoop相關參數。

展開查看範例程式碼

參見Spark作業讀寫OSS資料將測試資料集上傳至OSS後,您可以運行如下樣本Spark作業。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成您自己的 Spark 鏡像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 指定輸入測試資料集,將 <OSS_BUCKET> 替換成 OSS Buckt 名稱
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 迭代次數
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # 配置 Spark 作業訪問 OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # 將 <OSS_ENDPOINT> 替換成 OSS 訪問端點,例如北京地區 OSS 的內網訪問端點為 oss-cn-beijing-internal.aliyuncs.com
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 從指定 Secret 中讀取環境變數
          - secretRef:
              name: spark-oss-secret
        serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 從指定 Secret 中讀取環境變數
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
詳細內容,請參見Spark作業讀寫OSS資料

可觀測性

流程

說明

部署Spark History Server

spark命名空間中部署ack-spark-history-server,配置日誌儲存後端(支援PVC、OSS/OSS-HDFS、HDFS)等資訊,從指定的儲存系統中讀取Spark事件記錄並解析成Web UI方便使用者查看。下面的樣本配置展示了如何配置Spark History Server從指定的NAS檔案系統的/spark/event-logs路徑中讀取事件記錄。

展開查看樣本配置

# Spark 配置
sparkConf:
  spark.history.fs.logDirectory: file:///mnt/nas/spark/event-logs

# 環境變數
env:
- name: SPARK_DAEMON_MEMORY
  value: 7g

# 資料卷
volumes:
- name: nas
  persistentVolumeClaim:
    claimName: nas-pvc

# 資料卷掛載
volumeMounts:
- name: nas
  subPath: spark/event-logs
  mountPath: /mnt/nas/spark/event-logs

# 根據 Spark 作業數量和規模調整資源大小
resources:
  requests:
    cpu: 2
    memory: 8Gi
  limits:
    cpu: 2
    memory: 8Gi

接著,在提交Spark作業時掛載相同的NAS檔案系統並配置Spark將事件記錄寫入相同的路徑,後續您將可以從Spark History Server中查看該作業,下面是一個樣本作業。

展開查看樣本Spark作業

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成您的 Spark 鏡像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  sparkConf:
    # 事件記錄
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
詳細內容,請參見使用Spark History Server查看Spark作業資訊

配置Log ServiceSLS收集Spark日誌

叢集中運行大量 Spark 作業時,建議使用阿里雲Log Service SLS 統一收集所有 Spark 作業日誌,以便查詢和分析Spark容器的 stdout、stderr 日誌。

展開查看樣本作業

此代碼在Spark作業中使用SLS收集Spark容器中位於/opt/spark/logs/*.log路徑中的日誌。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成步驟一種構建得到的 Spark 鏡像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  # 從指定的 ConfigMap 中讀取日誌設定檔 log4j2.properties
  sparkConfigMap: spark-log-conf
  sparkConf:
    # 事件記錄
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        serviceAccount: spark-operator-spark
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
詳細內容,請參見使用Log Service收集Spark作業日誌

效能最佳化

流程

說明

通過RSS提升Shuffle效能

Shuffle是分散式運算中的重要操作,其過程通常伴隨著大量磁碟IO、資料序列化和網路IO,容易引發OOM和資料擷取失敗(Fetch失敗)等問題。為了最佳化Shuffle的效能和穩定性,提升計算服務品質,您可以在Spark作業配置中使用Apache Celeborn作為Remote Shuffle Service(RSS)。

展開查看範例程式碼

參見Spark作業使用Celeborn作為RSS在叢集中部署ack-celeborn組件後,您可以基於下方代碼提交Spark作業,使用Celeborn作為RSS。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成您的 Spark 鏡像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 指定輸入測試資料集,將 <OSS_BUCKET> 替換成 OSS Buckt 名稱
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 迭代次數
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # 配置 Spark 作業訪問 OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # 將 <OSS_ENDPOINT> 替換成 OSS 訪問端點,例如北京地區 OSS 的內網訪問端點為 oss-cn-beijing-internal.aliyuncs.com
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  # 從指定的 ConfigMap 中讀取日誌設定檔 log4j2.properties
  sparkConfigMap: spark-log-conf
  sparkConf:
    # 事件記錄
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
    
    # Celeborn 相關配置
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    # 需要根據 Celeborn master 副本數量進行配置
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    spark.celeborn.client.spark.shuffle.writer: hash
    spark.celeborn.client.push.replicate.enabled: "false"
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    spark.executor.userClassPathFirst: "false"
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 從指定 Secret 中讀取環境變數
          - secretRef:
              name: spark-oss-secret
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 從指定 Secret 中讀取環境變數
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
詳細內容,請參見Spark作業使用Celeborn作為RSS

定義彈性資源調度優先順序

使用ECI Pod並配置合適的調度策略,可以按需建立並按資源實際用量付費,有效減少叢集資源閑置帶來的成本浪費。在ECS和ECI資源混用的情境下,還可以指定調度優先順序。

您無需在SparkApplication 中修改調度相關配置,ACK調度器會根據配置的彈性策略自動完成Pod調度。您可以按需靈活地定製多種彈性資源(例如ECS和ECI)的混用。

展開查看樣本彈性策略

以下樣本自訂了彈性策略:對於spark命名空間中由Spark Operator啟動的Pod,優先使用ECS資源並且最多調度10個Pod,在ECS資源不足時再使用ECI資源並且最多調度10個Pod。

apiVersion: scheduling.alibabacloud.com/v1alpha1
kind: ResourcePolicy
metadata:
  name: spark
  namespace: spark
spec:
  # 通過標籤選取器指定調度策略應用的 Pod
  selector:
    # 例如,指定調度策略應用於通過 Spark Operator 方式提交的作業 Pod
    sparkoperator.k8s.io/launched-by-spark-operator: "true"
  strategy: prefer
  # 調度單元配置
  # 擴容時,將按照調度單元的順序進行擴容;縮容時,將按照調度單元的逆序進行縮容。
  units:
  # 第一個調度單元使用 ecs 資源,最多能調度 10 個 pod 到 ECS 執行個體
  - resource: ecs
    max: 10
    # 調度器會將標籤資訊更新到 Pod 上
    podLabels:
      # 這是一條特殊標籤,不會更新到 Pod 上
      k8s.aliyun.com/resource-policy-wait-for-ecs-scaling: "true"
    # 使用 ECS 資源時可以通過節點選取器指定可以調度的節點
    nodeSelector:
      # 例如,選擇隨用隨付類型的 ECS 節點
      node.alibabacloud.com/instance-charge-type: PostPaid
  # 第二個調度單元使用 ECI 資源,最多能調度 10 個 pod 到 ECI 執行個體
  - resource: eci
    max: 10
  # 在統計 Pod 數量時忽略 ResourcePolicy 建立之前已經調度的 Pod
  ignorePreviousPod: false
  # 在統計 Pod 數量時忽略處於 Terminating 狀態的 Pod
  ignoreTerminatingPod: true
  # 搶佔策略
  # BeforeNextUnit 表示調度器將在每個 Unit 調度失敗時嘗試搶佔
  # AfterAllUnits 表示 ResourcePolicy 只在最後一個 Unit 調度失敗時嘗試搶佔
  preemptPolicy: AfterAllUnits
  # Pod 在何種情況下被允許使用後續 Unit 中的資源
  whenTryNextUnits:
    # 當滿足以下兩種情形之一時,允許使用後續 Unit 中的資源
    # 1. 當前 Unit 的 Max 已設定,且該 Unit 中的 Pod 數量大於或等於設定的 Max 值時;
    # 2. 當前 Unit 的 Max 未設定,且該 Unit 的 PodLabels 中包含標籤 k8s.aliyun.com/resource-policy-wait-for-ecs-scaling: "true",且等待逾時之後。
    policy: TimeoutOrExceedMax
    # 當 policy 為 TimeoutOrExceedMax 時,若當前 Unit 資源不足以調度 Pod,則在當前 Unit 中等待,等待時間最大為 timeout,
    # 該策略可以與自動調整以及 ECI 配合,達到優先嘗試節點池自動調整,並且在逾時後自動使用 eci 的效果。
    timeout: 30s
詳細內容,請參見使用ECI彈性資源運行Spark作業

配置動態資源分派

動態資源分派(Dynamic Resource Allocation,簡稱DRA)可根據工作負載的大小動態調整作業所使用的計算資源。您可以為Spark作業啟用動態資源分派,避免因資源不足導致作業執行時間過長或因資源過剩導致資源浪費。

展開查看樣本作業

此樣本作業結合Celeborn RSS進一步配置了動態資源分派。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成建您的 Spark 鏡像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 指定輸入測試資料集,將 <OSS_BUCKET> 替換成 OSS Buckt 名稱
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 迭代次數
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # 配置 Spark 作業訪問 OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # 將 <OSS_ENDPOINT> 替換成 OSS 訪問端點,例如北京地區 OSS 的內網訪問端點為 oss-cn-beijing-internal.aliyuncs.com
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  # 從指定的 ConfigMap 中讀取日誌設定檔 log4j2.properties
  sparkConfigMap: spark-log-conf
  sparkConf:
    # ====================
    # 事件記錄
    # ====================
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs

    # ====================
    # Celeborn
    # Ref: https://github.com/apache/celeborn/blob/main/README.md#spark-configuration
    # ====================
    # Shuffle manager class name changed in 0.3.0:
    #    before 0.3.0: `org.apache.spark.shuffle.celeborn.RssShuffleManager`
    #    since 0.3.0: `org.apache.spark.shuffle.celeborn.SparkShuffleManager`
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    # Must use kryo serializer because java serializer do not support relocation
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    # 需要根據 Celeborn master 副本數量進行配置。
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    # options: hash, sort
    # Hash shuffle writer use (partition count) * (celeborn.push.buffer.max.size) * (spark.executor.cores) memory.
    # Sort shuffle writer uses less memory than hash shuffle writer, if your shuffle partition count is large, try to use sort hash writer.
    spark.celeborn.client.spark.shuffle.writer: hash
    # We recommend setting `spark.celeborn.client.push.replicate.enabled` to true to enable server-side data replication
    # If you have only one worker, this setting must be false 
    # If your Celeborn is using HDFS, it's recommended to set this setting to false
    spark.celeborn.client.push.replicate.enabled: "false"
    # Support for Spark AQE only tested under Spark 3
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    # we recommend enabling aqe support to gain better performance
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    # 當 Spark 版本 >= 3.5.0 時,配置該選項以支援動態資源分派
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.executor.userClassPathFirst: "false"

    # ====================
    # 動態資源分派
    # Ref: https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
    # ====================
    # 啟用動態資源分派
    spark.dynamicAllocation.enabled: "true"
    # 啟用 shuffle 檔案跟蹤,不依賴 ESS 即可實現動態資源分派。
    # 在使用 Celeborn 作為 RSS 時,當 Spark 版本 >= 3.4.0 時,強烈建議關閉該選項。
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    # Executor 數量的初始值。
    spark.dynamicAllocation.initialExecutors: "3"
    # Executor 數量的最小值。
    spark.dynamicAllocation.minExecutors: "0"
    # Executor 數量的最大值。
    spark.dynamicAllocation.maxExecutors: "10"
    # Executor 空閑逾時時間,超過該時間將會被釋放掉。
    spark.dynamicAllocation.executorIdleTimeout: 60s
    # 緩衝了資料區塊的 Executor 空閑逾時時間,超過該時間將會被釋放掉,預設為 infinity,即不會釋放。
    # spark.dynamicAllocation.cachedExecutorIdleTimeout:
    # 當存在待調度任務超過該時間後,將會申請更多的 executor。
    spark.dynamicAllocation.schedulerBacklogTimeout: 1s
    # 每間隔該時間後,將會開始下一批次申請 Executor。 
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 從指定 Secret 中讀取環境變數
          - secretRef:
              name: spark-oss-secret
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 從指定 Secret 中讀取環境變數
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
詳細內容,請參見Spark作業配置動態資源分派

使用Fluid加速資料訪問

如果您的資料位元於線下IDC或在資料訪問時遇到效能瓶頸,可使用Fluid提供的資料接入和分布式緩衝編排能力加速資料訪問。

詳細內容,請參見Spark作業使用Fluid加速資料訪問

相關文檔