全部產品
Search
文件中心

AnalyticDB:通過PySpark開發Spark應用

更新時間:Nov 20, 2024

本文介紹了如何開發AnalyticDB for MySQL Spark Python作業,以及如何通過VirtualEnv技術打包Python作業的運行環境。

前提條件

  • 叢集的產品系列為湖倉版

  • 叢集與OSS儲存空間位於相同地區。

  • 已在湖倉版叢集中建立Job型資源群組。具體操作,請參見建立資源群組

  • 已建立AnalyticDB for MySQL叢集的資料庫帳號。

PySpark的基本用法

  1. 編寫如下樣本程式,並將樣本程式儲存為example.py

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. example.py程式上傳到OSS中。具體操作,請參見控制台上傳檔案

  3. 進入Spark開發編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單。在集群清單上方,選擇產品系列,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發>Spark Jar 開發

  4. 在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。

  5. 在編輯器中執行以下作業內容。

    {
    
     "name": "Spark Python Test",
     "file": "oss://{your oss bucket path}/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
     }
    }

    參數說明請參見參數說明

使用Python依賴

使用方法

如果您使用自行開發或第三方開發的依賴開發Python程式時,需將使用的依賴上傳至OSS中,並在提交Spark作業時配置pyFiles參數。

樣本

本文樣本以引入自訂Function Compute員工的稅後收入為例。樣本將資料檔案staff.csv上傳至OSS中。staff.csv中的樣本資料如下:

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. 開發依賴並上傳至OSS中。

    1. 建立名為tools的檔案夾,並在該檔案夾下建立名為func.py的程式

      def tax(salary):
          """
          convert string to int and cut 15% tax from the salary
      
          :param salary: The salary of staff worker
          :return:
          """
          return 0.15 * int(salary)
      
    2. tools檔案夾壓縮後上傳至OSS中。本文樣本為tools.tar.gz

      說明

      如果依賴多個Python檔案,建議您使用gz壓縮包進行壓縮。您可以在Python代碼中以module方式引用Python檔案。

  2. 編寫名為example.py樣本程式。

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    import sys
    
    # import third party file
    from tools import func
    
    if __name__ == "__main__":
        # init pyspark context
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # read csv from oss to a dataframe, show the table
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # print schema and data to the console
        df.printSchema()
        df.show()
        # create an udf
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # cut tax from salary and show result
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. example.py程式上傳到OSS中。具體操作,請參見控制台上傳檔案

  4. 進入Spark開發編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單。在集群清單上方,選擇產品系列,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發>Spark Jar 開發

  5. 在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。

  6. 在編輯器中執行以下作業內容。

    {
     "name": "Spark Python",
     "file": "oss://<bucket name>/example.py",
     "pyFiles": ["oss://<bucket name>/tools.tar.gz"],
     "args": [
     "oss://<bucket name>/staff.csv"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
     }
    }

    參數說明:

    • filePython程式所在的OSS路徑。

    • pyFilesPySpark依賴的Python檔案所在的OSS路徑,尾碼可以是tar或tar.gz。多個壓縮包使用英文逗號(,)分隔。

      說明

      PySpark應用所依賴的所有Python檔案必須儲存在OSS中。

    • args:使用JAR包時需要使用的參數。本文為staff.csv樣本資料所在的OSS路徑。

    更多參數,請參見參數說明

使用Virtual Environments打包依賴環境

開發Python作業時,如果您遇到複雜的依賴環境,可以通過Python的Virtual Environments技術進行環境管理和隔離。AnalyticDB for MySQL Spark支援使用Virtual Environments將本地依賴的環境打包並上傳到OSS中。關於Virtual Environments的更多資訊,請參見Python官方社區文檔

重要

AnalyticDB for MySQL Spark使用的glibc-devel版本為2.28,若Virtual Environments不相容2.28版本,PySpark任務可能無法正常執行。

使用方法

使用Virtual Environments打包Python環境,需將壓縮包上傳至OSS中,並在提交Spark作業時配置相關參數,以指定Python環境壓縮包所在的OSS路徑和使用的Python解譯器的本地路徑。

  • 指定Python環境壓縮包所在的OSS路徑:

    • 若Python環境的壓縮包較小,您可配置archives參數。

    • 若Python環境的壓縮包較大,您可配置spark.executorEnv.ADB_SPARK_DOWNLOAD_FILESspark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES參數。

  • 指定使用的Python解譯器的本地路徑:spark.pyspark.python參數。

樣本

  1. 準備Linux環境。

    Virtual Environments需在Linux作業系統中打包Python環境,您可以通過以下三種方式準備Linux環境。本文以購買阿里雲ECS執行個體為例。

    • 購買作業系統為Centos 7或AnolisOS 8的阿里雲ECS執行個體。具體操作,請參見自訂購買執行個體

    • 在本地安裝Centos 7或者AnolisOS 8以上版本的作業系統。

    • 使用Centos或AnolisOS的官方Docker鏡像,在鏡像內部打包Python環境。

  2. 使用Virtual Environments打包Python運行環境,並將壓縮包上傳至OSS中。

    使用Virtualenv或Conda打包專案依賴的Python環境,打包時可自訂Python的版本。此處以Virtualenv打包為例。

    # Create directory venv at current path with python3
    # MUST ADD --copies !
    virtualenv --copies --download --python python3.7 venv
    
    # active environment
    source venv/bin/activate
    
    # install third party modules
    pip install scikit-spark==0.4.0
    
    # check the result
    pip list
    
    # compress the environment
    tar -czvf venv.tar.gz venv
    說明

    如果您想通過Conda打包專案依賴,請參見Conda管理虛擬環境

  3. 進入Spark開發編輯器。

    1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單。在集群清單上方,選擇產品系列,然後單擊目的地組群ID。

    2. 在左側導覽列,單擊作業開發>Spark Jar 開發

  4. 在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。

  5. 在編輯器中執行以下作業內容。

    {
     "name": "venv example",
     "archives": [
     "oss://testBucketname/venv.tar.gz#PY3"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    說明

    Python環境的壓縮包過大時,可參考如下代碼。

    {
     "name": "venv example",
     "conf": {
     "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
     "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    參數說明:

    • archives:Python環境壓縮包所在的OSS路徑。本文樣本為venv.tar.gz壓縮包所在的OSS路徑。

    • spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Executor節點參數,用於指定Python環境壓縮包所在的OSS路徑。

    • spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Driver節點參數,用於指定Python環境壓縮包所在的OSS路徑。

    • spark.pyspark.python:指定要使用的Python解譯器的本地路徑。

    其他參數,請參見參數說明