Lindorm計算引擎通過HTTP RESTful API的方式提供Spark Python作業提交入口,您可以按照這種方式運行流批任務、機器學習和圖計算任務。本文介紹Lindorm計算引擎Python作業開發的詳細步驟。
前提條件
已開通Lindorm計算引擎,具體操作請參見開通與變更配置。
Spark Python作業開發流程
步驟一:準備Spark Python作業
下載Spark Python作業樣本壓縮包,下載連結為Spark Python作業樣本。
解壓Spark Python作業樣本壓縮包,解壓後的目錄名稱為
lindorm-spark-examples
。開啟lindorm-spark-examples/python
目錄,參考python目錄結構。專案開發的根目錄以
your_project
為例,介紹專案的目錄結構。在
your_project
目錄下建立__init__.py
檔案,內容為空白。改造入口檔案。
在入口檔案中編寫代碼,將
your_project
添加到sys.path
中,代碼詳情請參見Spark Python作業樣本中的lindorm-spark-examples/python/your_project/main.py
檔案的Notice1部分。# Notice1: You need to do the following step to complete the code modification: # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py # Step2: Please add current dir to sys.path, you should add the following code to your main file current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
在入口檔案中將入口邏輯封裝到
main(argv)
方法中。代碼詳情請參見Spark Python作業樣本中的lindorm-spark-examples/python/your_project/main.py
檔案的Notice2部分。# Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function, # so that launcher.py in parent directory just call main(sys.argv) def main(argv): print("Receive arguments: %s \n" % str(argv)) print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__))) # Write your code here if __name__ == "__main__": main(sys.argv)
建立Spark Python作業啟動的入口檔案,用來調用
main(argv)
方法。在根目錄your_project
建立同級目錄launcher.py
,可以複製Spark Python作業樣本中的lindorm-spark-examples/python/launcher.py
檔案。
步驟二:打包Spark Python作業
打包專案依賴的Python環境和第三方類庫。推薦使用Conda或者Virtualenv將依賴類庫打包為tar包,具體操作請參見Python Package Management。
重要使用Conda或者Virtualenv打的tar包通過spark.archives傳遞,可以是spark.archives支援的所有格式。詳細說明,請參見spark.archives。
請在Linux環境下完成該操作,以保證Lindorm計算引擎能正常識Python二進位檔案。
打包專案檔。將
your_project
檔案打包為.zip
或者.egg
格式檔案。執行以下命令將專案檔打包為
.zip
格式檔案。zip -r project.zip your_project
將專案檔打包為
.egg
格式檔案,具體操作請參見Building Eggs。
步驟三:上傳Spark Python作業
步驟四:提交Spark Python作業
請求參數包括以下兩個部分:
Python作業環境參數說明,樣本如下:
{"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
提交Python作業專案檔(也就是
.zip
、.egg
或者.py
格式的檔案)時,請配置configs參數中的spark.submit.pyFiles。提交Python環境和第三方類庫(也就是tar包)時,請配置configs參數中的spark.archives和spark.kubernetes.driverEnv.PYSPARK_PYTHON。
配置spark.archives參數時使用井號(#)指定targetDir。
配置spark.kubernetes.driverEnv.PYSPARK_PYTHON參數指定Python檔案路徑。
如果將檔案上傳至OSS,需要在configs參數中配置以下資訊。
表 1. 配置configs相關參數
參數
樣本值
說明
spark.hadoop.fs.oss.endpoint
oss-cn-beijing-internal.aliyuncs.com
儲存Python檔案的OSS地址。
spark.hadoop.fs.oss.accessKeyId
testAccessKey ID
通過阿里雲控制台建立的Access Key ID和Access Key Secret,擷取方法請參見建立AccessKey。
spark.hadoop.fs.oss.accessKeySecret
testAccessKey Secret
spark.hadoop.fs.oss.impl
固定值:org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
訪問OSS的類。
說明更多參數請參見參數說明。
Python作業開發樣本
下載並解壓Spark Python作業樣本。
改造入口檔案,開啟Python目錄下的your_project/main.py檔案並配置相關代碼。
將your_project目錄添加到sys.path中。
current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
在main.py檔案中添加入口邏輯,如下樣本初始化SparkSession。
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PythonImportTest") \ .getOrCreate() print(spark.conf) spark.stop()
在Python目錄下打包your_project檔案。
zip -r your_project.zip your_project
在Linux環境下,使用Conda打包Python運行環境。
conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz
將打包好的your_project.zip和pyspark_conda_env.tar.gz上傳至OSS,並將Python目錄下的launcher.py檔案上傳至OSS。
通過以下兩種方式提交作業。
作業診斷
Python作業提交成功後,可以在作業列表頁面查看作業健全狀態和SparkUI訪問地址,具體操作請參見查看作業。作業提交過程中如果有其他問題,請提交工單並將JobID和WebUI地址提供給工單處理人員。