Lindorm计算引擎通过HTTP RESTful API的方式提供Spark Python作业提交入口,您可以按照这种方式运行流批任务、机器学习和图计算任务。本文介绍Lindorm计算引擎Python作业开发的详细步骤。
前提条件
已开通Lindorm计算引擎,具体操作请参见开通与变配。
Spark Python作业开发流程
步骤一:准备Spark Python作业
下载Spark Python作业示例压缩包,下载链接为Spark Python作业示例。
解压Spark Python作业示例压缩包,解压后的目录名称为
lindorm-spark-examples
。打开lindorm-spark-examples/python
目录,参考python目录结构。项目开发的根目录以
your_project
为例,介绍项目的目录结构。在
your_project
目录下新建__init__.py
文件,内容为空。改造入口文件。
在入口文件中编写代码,将
your_project
添加到sys.path
中,代码详情请参见Spark Python作业示例中的lindorm-spark-examples/python/your_project/main.py
文件的Notice1部分。# Notice1: You need to do the following step to complete the code modification: # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py # Step2: Please add current dir to sys.path, you should add the following code to your main file current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
在入口文件中将入口逻辑封装到
main(argv)
方法中。代码详情请参见Spark Python作业示例中的lindorm-spark-examples/python/your_project/main.py
文件的Notice2部分。# Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function, # so that launcher.py in parent directory just call main(sys.argv) def main(argv): print("Receive arguments: %s \n" % str(argv)) print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__))) # Write your code here if __name__ == "__main__": main(sys.argv)
创建Spark Python作业启动的入口文件,用来调用
main(argv)
方法。在根目录your_project
创建同级目录launcher.py
,可以复制Spark Python作业示例中的lindorm-spark-examples/python/launcher.py
文件。
步骤二:打包Spark Python作业
打包项目依赖的Python环境和第三方类库。推荐使用Conda或者Virtualenv将依赖类库打包为tar包,具体操作请参见Python Package Management。
重要使用Conda或者Virtualenv打的tar包通过spark.archives传递,可以是spark.archives支持的所有格式。详细说明,请参见spark.archives。
请在Linux环境下完成该操作,以保证Lindorm计算引擎能正常识Python二进制文件。
打包项目文件。将
your_project
文件打包为.zip
或者.egg
格式文件。执行以下命令将项目文件打包为
.zip
格式文件。zip -r project.zip your_project
将项目文件打包为
.egg
格式文件,具体操作请参见Building Eggs。
步骤三:上传Spark Python作业
步骤四:提交Spark Python作业
请求参数包括以下两个部分:
Python作业环境参数说明,示例如下:
{"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
提交Python作业项目文件(也就是
.zip
、.egg
或者.py
格式的文件)时,请配置configs参数中的spark.submit.pyFiles。提交Python环境和第三方类库(也就是tar包)时,请配置configs参数中的spark.archives和spark.kubernetes.driverEnv.PYSPARK_PYTHON。
配置spark.archives参数时使用井号(#)指定targetDir。
配置spark.kubernetes.driverEnv.PYSPARK_PYTHON参数指定Python文件路径。
如果将文件上传至OSS,需要在configs参数中配置以下信息。
表 1. 配置configs相关参数
参数
示例值
说明
spark.hadoop.fs.oss.endpoint
oss-cn-beijing-internal.aliyuncs.com
存储Python文件的OSS地址。
spark.hadoop.fs.oss.accessKeyId
testAccessKey ID
通过阿里云控制台创建的Access Key ID和Access Key Secret,获取方法请参见创建AccessKey。
spark.hadoop.fs.oss.accessKeySecret
testAccessKey Secret
spark.hadoop.fs.oss.impl
固定值:org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
访问OSS的类。
说明更多参数请参见参数说明。
Python作业开发示例
下载并解压Spark Python作业示例。
改造入口文件,打开Python目录下的your_project/main.py文件并配置相关代码。
将your_project目录添加到sys.path中。
current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
在main.py文件中添加入口逻辑,如下示例初始化SparkSession。
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PythonImportTest") \ .getOrCreate() print(spark.conf) spark.stop()
在Python目录下打包your_project文件。
zip -r your_project.zip your_project
在Linux环境下,使用Conda打包Python运行环境。
conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz
将打包好的your_project.zip和pyspark_conda_env.tar.gz上传至OSS,并将Python目录下的launcher.py文件上传至OSS。
通过以下两种方式提交作业。
作业诊断
Python作业提交成功后,可以在作业列表页面查看作业运行状况和SparkUI访问地址,具体操作请参见查看作业。作业提交过程中如果有其他问题,请提交工单并将JobID和WebUI地址提供给工单处理人员。