全部產品
Search
文件中心

:Python作業開發實踐

更新時間:Aug 29, 2024

Lindorm計算引擎通過HTTP RESTful API的方式提供Spark Python作業提交入口,您可以按照這種方式運行流批任務、機器學習和圖計算任務。本文介紹Lindorm計算引擎Python作業開發的詳細步驟。

前提條件

已開通Lindorm計算引擎,具體操作請參見開通與變更配置

Spark Python作業開發流程

  1. 準備Spark Python作業

  2. 打包Spark Python作業

  3. 上傳Spark Python作業

  4. 提交Spark Python作業

步驟一:準備Spark Python作業

  1. 下載Spark Python作業樣本壓縮包,下載連結為Spark Python作業樣本

  2. 解壓Spark Python作業樣本壓縮包,解壓後的目錄名稱為lindorm-spark-examples。開啟lindorm-spark-examples/python目錄,參考python目錄結構。

  3. 專案開發的根目錄以your_project為例,介紹專案的目錄結構。

    1. your_project目錄下建立__init__.py檔案,內容為空白。

    2. 改造入口檔案。

      1. 在入口檔案中編寫代碼,將your_project添加到sys.path中,代碼詳情請參見Spark Python作業樣本中的lindorm-spark-examples/python/your_project/main.py檔案的Notice1部分。

        # Notice1: You need to do the following step to complete the code modification:
        # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py
        # Step2: Please add current dir to sys.path, you should add the following code to your main file
        current_dir = os.path.abspath(os.path.dirname(__file__))
        sys.path.append(current_dir)
        print("current dir in your_project: %s" % current_dir)
        print("sys.path: %s \n" % str(sys.path))
      2. 在入口檔案中將入口邏輯封裝到main(argv)方法中。代碼詳情請參見Spark Python作業樣本中的lindorm-spark-examples/python/your_project/main.py檔案的Notice2部分。

        # Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function,
        # so that launcher.py in parent directory just call main(sys.argv)
        def main(argv):
            print("Receive arguments: %s \n" % str(argv))
        
            print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__)))
            # Write your code here
        
        
        if __name__ == "__main__":
            main(sys.argv)
    3. 建立Spark Python作業啟動的入口檔案,用來調用main(argv)方法。在根目錄your_project建立同級目錄launcher.py,可以複製Spark Python作業樣本中的lindorm-spark-examples/python/launcher.py檔案。

步驟二:打包Spark Python作業

  1. 打包專案依賴的Python環境和第三方類庫。推薦使用Conda或者Virtualenv將依賴類庫打包為tar包,具體操作請參見Python Package Management

    重要
    • 使用Conda或者Virtualenv打的tar包通過spark.archives傳遞,可以是spark.archives支援的所有格式。詳細說明,請參見spark.archives

    • 請在Linux環境下完成該操作,以保證Lindorm計算引擎能正常識Python二進位檔案。

  2. 打包專案檔。將your_project檔案打包為.zip或者.egg格式檔案。

    • 執行以下命令將專案檔打包為.zip格式檔案。

      zip -r project.zip your_project
    • 將專案檔打包為.egg格式檔案,具體操作請參見Building Eggs

步驟三:上傳Spark Python作業

將以下檔案都上傳至OSS,具體操作請參見上傳檔案

  • 步驟二中打包的Python環境和第三方類庫(也就是tar包)。

  • 步驟二中打包的專案檔(也就是.zip或者.egg檔案)。

  • 步驟一中的launcher.py檔案。

步驟四:提交Spark Python作業

Lindorm計算引擎支援以下兩種方式提交並管理作業。

請求參數包括以下兩個部分:

  • Python作業環境參數說明,樣本如下:

    {"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
    • 提交Python作業專案檔(也就是.zip.egg或者.py格式的檔案)時,請配置configs參數中的spark.submit.pyFiles

    • 提交Python環境和第三方類庫(也就是tar包)時,請配置configs參數中的spark.archivesspark.kubernetes.driverEnv.PYSPARK_PYTHON

      • 配置spark.archives參數時使用井號(#)指定targetDir

      • 配置spark.kubernetes.driverEnv.PYSPARK_PYTHON參數指定Python檔案路徑。

  • 如果將檔案上傳至OSS,需要在configs參數中配置以下資訊。

    表 1. 配置configs相關參數

    參數

    樣本值

    說明

    spark.hadoop.fs.oss.endpoint

    oss-cn-beijing-internal.aliyuncs.com

    儲存Python檔案的OSS地址。

    spark.hadoop.fs.oss.accessKeyId

    testAccessKey ID

    通過阿里雲控制台建立的Access Key ID和Access Key Secret,擷取方法請參見建立AccessKey

    spark.hadoop.fs.oss.accessKeySecret

    testAccessKey Secret

    spark.hadoop.fs.oss.impl

    固定值:org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem

    訪問OSS的類。

    說明

    更多參數請參見參數說明

Python作業開發樣本

  1. 下載並解壓Spark Python作業樣本

  2. 改造入口檔案,開啟Python目錄下的your_project/main.py檔案並配置相關代碼。

    1. your_project目錄添加到sys.path中。

      current_dir = os.path.abspath(os.path.dirname(__file__))
      sys.path.append(current_dir)
      print("current dir in your_project: %s" % current_dir)
      print("sys.path: %s \n" % str(sys.path))
    2. main.py檔案中添加入口邏輯,如下樣本初始化SparkSession。

      from pyspark.sql import SparkSession
      spark = SparkSession \
          .builder \
          .appName("PythonImportTest") \
          .getOrCreate()
      print(spark.conf)
      spark.stop()
  3. 在Python目錄下打包your_project檔案。

    zip -r your_project.zip your_project
  4. 在Linux環境下,使用Conda打包Python運行環境。

    conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz
  5. 將打包好的your_project.zippyspark_conda_env.tar.gz上傳至OSS,並將Python目錄下的launcher.py檔案上傳至OSS。

  6. 通過以下兩種方式提交作業。

作業診斷

Python作業提交成功後,可以在作業列表頁面查看作業健全狀態和SparkUI訪問地址,具體操作請參見查看作業。作業提交過程中如果有其他問題,請提交工單並將JobID和WebUI地址提供給工單處理人員。