全部產品
Search
文件中心

Container Service for Kubernetes:Spark作業使用Fluid加速資料訪問

更新時間:Feb 21, 2025

本文介紹如何使用Fluid加速資料訪問,通過JindoRuntime最佳化OSS資料訪問,從而提升資料密集型應用的效能。

前提條件

  • 已部署ack-spark-operator組件,請參見部署ack-spark-operator組件

    說明

    本教程在部署時使用了配置參數 spark.jobNamespaces=["spark"],如果您配置了不同的命名空間,在體驗本教程時需要相應地修改namespace欄位。

  • 已安裝雲原生AI套件並部署ack-fluid組件。具體操作,請參見安裝雲原生AI套件

  • 已產生測試資料並上傳至OSS。具體操作,請參見準備測試資料並上傳至OSS

Fluid簡介

Fluid是一個開源的Kubernetes原生的分布式資料集編排和加速引擎,主要服務於雲原生情境下的資料密集型應用,例如巨量資料應用、AI 應用等。Fluid的核心功能包括:

  • 資料集抽象原生支援:將資料密集型應用所需基礎支撐能力功能化,實現資料高效訪問並降低多維管理成本。

  • 可擴充的資料引擎外掛程式:提供統一的提供者,方便接入第三方儲存,通過不同的Runtime實現資料操作。

  • 自動化的資料操作:提供多種操作模式,與自動化營運體系相結合。

  • 資料彈性與調度:將資料緩衝技術和彈性擴縮容、資料親和性調度能力相結合,提高資料訪問效能。

  • 運行時平台無關:支援原生、邊緣、Serverless Kubernetes叢集、Kubernetes多叢集等多樣化環境,適用於混合雲情境。

關於Fluid的更多資訊,請參見資料加速Fluid概述

步驟一:建立Fluid專屬節點池

在ACK叢集中建立名為fluid的專屬節點池用於部署Fluid中JindoRuntime Worker pod。本教程中樣本節點池中共3個節點,執行個體規格均為巨量資料網路增強型執行個體ecs.d1ne.4xlarge,每個節點打上標籤fluid-cloudnative.github.io/node="true"和汙點fluid-cloudnative.github.io/node="true":NoSchedule,每個節點帶有8塊大小為5905 GB的高吞吐SATA HDD本地碟,分別被格式化和掛載至/mnt/disk1/mnt/disk2、... 和 /mnt/disk8。關於建立節點池的具體操作,請參見建立和管理節點池。關於節點池的選型,請參見Fluid資料緩衝最佳化策略最佳實務

步驟二:建立Dataset

  1. 建立如下Secret資訊清單檔並儲存為fluid-oss-secret.yaml,用於儲存OSS訪問憑據。

    請將<ACCESS_KEY_ID><ACCESS_KEY_SECRET>替換為您的阿里雲AccessKey ID和AccessKey Secret。

    apiVersion: v1
    kind: Secret
    metadata:
      name: fluid-oss-secret
      namespace: spark
    stringData:
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  1. 執行如下命令,建立Secret資源。

    kubectl create -f fluid-oss-secret.yaml

    預期輸出如下:

    secret/fluid-oss-secret created
  1. 建立如下DataSet資訊清單檔並儲存為spark-fluid-dataset.yaml

    apiVersion: data.fluid.io/v1alpha1
    kind: Dataset
    metadata:
      name: spark
      namespace: spark
    spec:
      mounts:
      - name: spark
        # 需要加速的OSS訪問路徑,需將<OSS_BUCKET>替換成您的OSS儲存桶名稱。
        mountPoint: oss://<OSS_BUCKET>/
        path: /
        options:
          # OSS訪問端點,需將<OSS_ENDPOINT>替換成您的OSS儲存桶訪問端點,
          # 例如,北京地區OSS內網訪問端點為oss-cn-beijing-internal.aliyuncs.com。
          fs.oss.endpoint: <OSS_ENDPOINT>
        encryptOptions:
        - name: fs.oss.accessKeyId
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_ID
        - name: fs.oss.accessKeySecret
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_SECRET
      # 資料將緩衝到滿足親和性的節點上。
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: fluid-cloudnative.github.io/node
              operator: In
              values:
              - "true"
      # 節點汙點容忍。
      tolerations:
      - key: fluid-cloudnative.github.io/node
        operator: Equal
        value: "true"
        effect: NoSchedule

    其中各個欄位的說明如下:

    • mountPoint:需要加速的OSS路徑。

    • fs.oss.endpoint:OSS Bucket訪問端點,例如北京地區內網端點為oss-cn-beijing-internal.aliyuncs.com

    • encryptOptions :配置了從名為fluid-oss-secret的Secret中讀取OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET 作為OSS訪問憑據。

  2. 執行如下命令,建立Dataset資源。

    kubectl create -f spark-fluid-dataset.yaml

    預期輸出:

    dataset.data.fluid.io/spark created
  3. 執行如下命令查看Dataset部署狀態。

    kubectl get -n spark dataset spark -o wide

    預期輸出:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE      HCFS URL   TOTAL FILES   CACHE HIT RATIO   AGE
    spark                                                                  NotBound                                              58m

    可以看到此時Dataset處於NotBound狀態。

步驟三:建立JindoRuntime

  1. 根據如下內容建立JindoRuntime資訊清單檔並儲存為 spark-fluid-jindoruntime.yaml

    apiVersion: data.fluid.io/v1alpha1
    kind: JindoRuntime
    metadata:
      # 必須和Dataset的名稱相同。
      name: spark
      namespace: spark
    spec:
      # Worker副本數量。
      replicas: 3
      tieredstore:
        levels:
        # 緩衝類型為HDD磁碟。
        - mediumtype: HDD
          # 資料集類型。
          volumeType: hostPath
          # 需要根據節點上的磁碟數量進行調整。
          path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8
          # 單個Worker能提供的緩衝容量。
          quotaList: 5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi
          high: "0.99"
          low: "0.95"
      worker:
        resources:
          requests:
            cpu: 14
            memory: 56Gi
          limits:
            cpu: 14
            memory: 56Gi

    其中各個欄位說明如下:

    • replicas:建立JindoFS叢集的Worker數量。

    • mediumtype:緩衝類型。

    • path:儲存路徑。

    • quota:緩衝系統最大容量。

    • high:儲存容量上限大小。

    • low:儲存容量下限大小。

  1. 執行如下命令建立JindoRuntime資源。

    kubectl create -f spark-fluid-jindoruntime.yaml

    預期輸出:

    jindoruntime.data.fluid.io/spark created
  2. 執行如下命令,查看JindoRuntime部署狀態。

    kubectl get -n spark jindoruntime spark

    預期輸出:

    NAME    MASTER PHASE   WORKER PHASE   FUSE PHASE   AGE
    spark   Ready          Ready          Ready        2m28s

    可以看到FUSE PHASE狀態為Ready,表示JindoRuntime部署成功。

  3. 執行如下命令,再次查看Dataset部署狀態:

    kubectl get -n spark dataset spark -o wide

    預期輸出:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   [Calculating]    0.00B    128.91TiB                            Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     2m5

    可以看到Dataset狀態已經變為Bound,表示Dataset已經部署成功。

(可選)步驟四:進行資料預熱

由於首次訪問無法命中資料緩衝,Fluid提供了DataLoad緩衝預熱操作,以提升首次資料訪問的效率。資料預熱通過預先將資料載入到緩衝中,提高資料訪問速度,從而最佳化資料處理效率和系統效能。

  1. 根據如下內容建立Dataload資訊清單檔並儲存為spark-fluid-dataload.yaml

    apiVersion: data.fluid.io/v1alpha1
    kind: DataLoad
    metadata:
      name: spark
      namespace: spark
    spec:
      dataset:
        name: spark
        namespace: spark
      loadMetadata: true
  2. 執行如下命令,建立Dataload資源。

    kubectl create -f spark-fluid-dataload.yaml

    預期輸出:

    dataload.data.fluid.io/spark created
  3. 執行如下命令,觀察資料預熱進度。

    kubectl get -n spark dataload spark -w

    預期輸出:

    NAME    DATASET   PHASE      AGE     DURATION
    spark   spark     Executing   20s   Unfinished
    spark   spark     Complete   9m31s   8m37s

    可以看到這次資料預熱總共耗時8m37s

  4. 執行如下命令,再次查看Dataset狀態。

    kubectl get -n spark dataset spark -o wide

    預期輸出:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     19m

    在前一次查看Dataset狀態時,緩衝的資料量為0.00B,經過資料預熱之後,緩衝的資料量已經變為326.85GiB

步驟五:運行樣本Spark作業

方式一:使用Posix檔案系統介面

  1. 根據如下內容建立SparkApplication資訊清單檔並儲存為spark-pagerank-fluid-posix.yaml

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-posix
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      image: spark:3.5.4
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # 用file://格式訪問本地檔案。
      - file:///mnt/fluid/data/pagerank_dataset.txt
      - "10"
      sparkVersion: 3.5.4
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        volumeMounts:
        # 將dataset對應的PVC掛載至/mnt/fluid路徑。
        - name: spark
          mountPath: /mnt/fluid
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 1
        coreLimit: "1"
        memory: 4g
        volumeMounts:
        # 將dataset對應的PVC掛載至/mnt/fluid路徑。
        - name: spark
          mountPath: /mnt/fluid
      volumes:
      #  添加Fluid建立出的與dataset同名的PVC。
      - name: spark
        persistentVolumeClaim:
          claimName: spark
      restartPolicy:
        type: Never
    說明

    上述作業中使用到的Spark鏡像來自於社區,如果您遇到網路原因導致無法拉取鏡像,請同步社區鏡像或自行構建鏡像並推送到您自己的鏡像倉庫中。

  2. 執行如下命令,提交Spark作業。

    kubectl create -f spark-pagerank-fluid-posix.yaml

    預期輸出:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-posix created
  3. 執行如下命令,查看Spark作業狀態:

    kubectl get -n spark sparkapplication spark-pagerank-fluid-posix -w

    預期輸出:

    NAME                         STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   87s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   SUCCEEDING   1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s
    spark-pagerank-fluid-posix   COMPLETED    1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s

    可以看到作業已經成功運行完成。

方式二:使用HCFS檔案系統介面

  1. 執行如下命令,查看Dataset的HCFS訪問URL。

    kubectl get -n spark dataset spark -o wide

    預期輸出:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     30m

    可以看到,Dataset 的 HCFS URL 為 spark-jindofs-master-0.spark:19434,在配置 Spark 作業時需要將參數 fs.jindofsx.namespace.rpc.address 配置成該值。

  2. 根據如下內容建立SparkApplication資訊清單檔並儲存為spark-pagerank-fluid-hcfs.yaml

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-hcfs
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      # 需將 <SPARK_IMAGE> 替換成您自己的Spark鏡像,該鏡像中需要包含JindoSDK依賴。
      image: <SPARK_IMAGE>
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # 從以下三種方式中選擇一種,需將<OSS_BUCKET>替換成您的 OSS 儲存桶名稱。
      # 方式一:使用 oss:// 格式訪問 OSS 資料。
      - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 方式二:使用 s3:// 格式訪問OSS資料。
      # - s3://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 方式三:使用 s3a:// 格式訪問OSS資料。
      # - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 迭代次數
      - "10"
      sparkVersion: 3.5.4
      hadoopConf:
        #===================
        # 訪問 OSS 相關配置
        #===================
        # 支援使用oss://格式訪問OSS資料。
        fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
        # OSS訪問端點,需將<OSS_ENDPOINT>替換成您的OSS訪問端點。
        # 例如,北京地區OSS內網訪問端點為oss-cn-beijing-internal.aliyuncs.com。
        fs.oss.endpoint: <OSS_ENDPOINT>
        # 從環境變數中讀取OSS訪問憑據。
        fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # 支援使用s3://格式訪問OSS資料。
        fs.s3.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # OSS 訪問端點,需將<OSS_ENDPOINT>替換成您的OSS訪問端點。
        # 例如,北京地區OSS內網訪問端點為oss-cn-beijing-internal.aliyuncs.com。
        fs.s3.endpoint: <OSS_ENDPOINT>
        # 從環境變數中讀取OSS訪問憑據。
        fs.s3.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # 支援使用s3a://格式訪問OSS資料。
        fs.s3a.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # OSS 訪問端點,需將<OSS_ENDPOINT> 替換成您的 OSS 訪問端點。
        # 例如,北京地區OSS內網訪問端點為oss-cn-beijing-internal.aliyuncs.com。
        fs.s3a.endpoint: <OSS_ENDPOINT>
        # 從環境變數中讀取OSS訪問憑據。
        fs.s3a.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        #===================
        # JindoFS 相關配置
        #===================
        fs.xengine: jindofsx
        # Dataset的HCFS URL。
        fs.jindofsx.namespace.rpc.address: spark-jindofs-master-0.spark:19434
        fs.jindofsx.data.cache.enable: "true"
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        envFrom:
        - secretRef:
            name: spark-oss-secret
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 2
        coreLimit: "2"
        memory: 8g
        envFrom:
        - secretRef:
            name: spark-oss-secret
      restartPolicy:
        type: Never
    說明

    上述樣本作業中使用到的Spark鏡像需要包含JindoSDK依賴,您可以參考如下Dockerfile自行構建鏡像並推送到自己的鏡像倉庫中:

    ARG SPARK_IMAGE=spark:3.5.4
    
    FROM ${SPARK_IMAGE}
    
    # Add dependency for JindoSDK support
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars
  3. 執行如下命令提交Spark作業。

    kubectl create -f spark-pagerank-fluid-hcfs.yaml

    預期輸出:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-hcfs create
  4. 執行如下命令查看Spark作業狀態。

    kubectl get -n spark sparkapplication spark-pagerank-fluid-hcfs -w

    預期輸出:

    NAME                        STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   9s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   15s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   SUCCEEDING   1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s
    spark-pagerank-fluid-hcfs   COMPLETED    1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s

(可選)步驟六:環境清理

如果體驗教程後無需使用相關資源,請參見以下操作釋放建立的測試資源。

kubectl delete -f spark-pagerank-fluid-posix.yaml
kubectl delete -f spark-pagerank-fluid-hcfs.yaml
kubectl delete -f spark-fluid-dataload.yaml
kubectl delete -f spark-fluid-jindoruntime.yaml
kubectl delete -f spark-fluid-dataset.yaml
kubectl delete -f fluid-oss-secret.yaml