本文展示如何提交PySpark作业以及使用自定义Virtualenv。
云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见通过PySpark开发Spark应用。
PySpark基本使用方式
1.开发主程序文件
您可以建立如下内容的example.py
文件,示例中定义main函数可以允许PySpark找到程序的统一启动入口。
from __future__ import print_function
from pyspark.sql import SparkSession
# import third part file
from tools import func
if __name__ == "__main__":
# init pyspark context
spark = SparkSession\
.builder\
.appName("Python Example")\
.getOrCreate()
df = spark.sql("SELECT 2021")
# print schema and data to the console
df.printSchema()
df.show()
2.执行主程序文件
和Scala、Java程序开发的JAR包一样,您需要将
example.py
文件上传到OSS中,并在Spark的启动配置中使用file
来指定这个文件为启动文件。在DLA控制台的Serverless->作业管理页面,使用如下示例代码配置作业。
{ "name": "Spark Python", "file": "oss://{your bucket name}/example.py" "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small", "spark.kubernetes.pyspark.pythonVersion": "3" } }
重要您需要在配置时将
{your bucket name}
替换为您使用的OSS的Bucket名称。示例中使用Python 3执行文件,和社区版的Spark相同,通过
spark.kubernetes.pyspark.pythonVersion
配置使用的Python版本,默认为Python 2.7。
单击执行。
如何上传自行开发的或者第三方开发的Module
当开发Python程序时,往往会用到自行开发的或者由第三方开发的各种Module模块,这些模块可以上传并加载到PySpark的执行环境中,被主程序调用。
以计算员工的税后收入为例,步骤如下。
1. 准备测试数据
新建一个如下格式的CSV文件,命名为staff.csv
,并上传到OSS中。文件反映了每个员工的信息和收入情况。
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
如何将文件上传到OSS请参见简单上传。
2. 开发一个依赖方法
创建一个文件夹
tools
。在
tools
文件夹中创建一个文件func.py
,文件内容如下。def tax(salary): """ convert string to int then cut 15% tax from the salary return a float number :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)
将
tools
文件夹压缩为tools.zip
后上传到OSS中。压缩包的生成方式如下。重要不同操作系统平台的ZIP压缩工具会略有区别,请保证解压后可以看到顶层目录是tools文件夹。
3. 开发主程序
开发一个Spark的Python程序,将测试中的CSV从OSS中读取出来,注册为一个DataFrame
。同时将依赖包中的tax
方法注册为一个Spark UDF
,然后使用该UDF
对刚刚生成的DataFrame
进行计算并打印结果。
示例代码如下, 您需要在配置时将{your bucket name}
替换为您使用的OSS的Bucket名称。
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
# import third part file
from tools import func
if __name__ == "__main__":
# init pyspark context
spark = SparkSession\
.builder\
.appName("Python Example")\
.getOrCreate()
# read csv from oss to a dataframe, show the table
df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
# print schema and data to the console
df.printSchema()
df.show()
# create an udf
taxCut = udf(lambda salary: func.tax(salary), FloatType())
# cut tax from salary and show result
df.select("name", taxCut("salary").alias("final salary")).show()
spark.stop()
将代码写入到example.py
中,并上传到OSS。
4. 提交任务
在DLA控制台的Serverless->作业管理页面,新建一个作业,并提交以下作业信息。
{
"name": "Spark Python",
"file": "oss://{your bucket name}/example.py",
"pyFiles": ["oss://{your bucket name}/tools.zip"],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small",
"spark.dla.connectors": "oss",
"spark.kubernetes.pyspark.pythonVersion": "3"
}
}
代码中主要参数说明。
参数 | 说明 | 是否必选 |
conf | Spark任务用到的配置参数,需要的配置项如下。
| 否 |
更多参数说明请参见作业配置指南。
PySpark使用自定义Virtualenv
当需要复杂的第三方依赖包时,可以使用Virtualenv来将本地调试环境上传到云端的Spark集群中。这种方式可以将大量复杂的系统包,如Pandas、Numpy、PyMySQL等装入隔离环境,并迁移到相同的操作系统中。您可以选择如下两种方案。
Virtualenv的更多信息请参见Python官方社区venv说明。
自行生成Virtualenv压缩包
1.准备Linux环境
由于Virtualenv
需要相同的操作系统,当前上传到DLA Spark使用的压缩包必须在Linux环境下的进行安装。您可以采用如下方式准备Linux环境。
准备一台Centos7的电脑进行打包。
在阿里云以按量付费的方式新开一台Centos 7的ECS,使用完毕后关闭。
使用Centos 7的官方Docker镜像,在镜像内部打包。
2.在Linux环境下打包Python执行环境
常用的执行环境打包工具包括Virtualenv、Conda,您可以根据您的需要来选择对应的工具,并安装工具到您的Linux环境中。
当前Serverless Spark支持的Python版本为3.7及以下主版本。
Spark运行环境为Centos 7,请使用该环境打包Venv(推荐使用Docker中的环境打包)。
以下示例使用Virtualenv
生成一个执行环境压缩包venv.zip
,压缩包中包含了scikit-spark
的特定版本。
# create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python Python3.7 venv
# active environment
source venv/bin/activate
# install third part modules
pip install scikit-spark==0.4.0
# check the result
pip list
# zip the environment
zip -r venv.zip venv
如何使用Conda生成执行环境,请参见Conda管理虚拟环境。
3.在Spark中使用Python执行环境
您可以在提交Spark作业时,使用如下的代码配置作业。其中spark.pyspark.python
的参数值表示上传的压缩文件中的运行包。更多参数说明,请参见作业参数说明。
{
"name": "venv example",
"archives": [
"oss://test/venv.zip#PY3"
],
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.dla.connectors": "oss",
"spark.executor.instances": 1,
"spark.dla.job.log.oss.uri": "oss://test/spark-logs",
"spark.pyspark.python": "./PY3/venv/bin/python3",
"spark.executor.resourceSpec": "medium"
},
"file": "oss://test/example.py"
}
与Spark开源社区的语义相同,venv.zip#PY3
代表将压缩包解压到计算节点工作目录的PY3
文件夹下,继而可以从本地访问。如果不使用#
指定文件夹名称,则默认使用文件名称作为新建的文件夹名。
使用镜像工具生成Virtualenv压缩包
1.使用如下命令拉取镜像。
-sudo docker pull registry.cn-hangzhou.aliyuncs.com/dla_spark/dla-venv:0.1
2.将需要生成环境的requirements.txt
文件放置在/home/admin
文件夹中,并将此文件夹挂载到Docker中。
-sudo docker run -ti -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
requirements.txt
是Python的标准依赖包描述文件,更多信息请参见User Guide。
打包程序自动化执行,您可以看到如下日志。
adding: venv-20210611-095454/lib64/ (stored 0%)
adding: venv-20210611-095454/lib64/python3.6/ (stored 0%)
adding: venv-20210611-095454/lib64/python3.6/site-packages/ (stored 0%)
adding: venv-20210611-095454/pyvenv.cfg (deflated 30%)
venv-20210611-095454.zip
3.在/home/admin
文件夹下找到打包好的压缩文件venv-20210611-095454.zip
。如何使用压缩包请参见在Spark中使用Python执行环境。
4.(可选)关于Docker镜像的更多使用说明,您可以执行如下命令查看。
-sudo docker run -i dla-venv:0.1
Used to create venv package for Aliyun DLA
Docker with host machine volumes: https://docs.docker.com/storage/volumes/
Please copy requirements.txt to a folder and mount the folder to docker as /tmp path
Usage example: docker run -it -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
-p python version, could be python2 or python3
-f path to requirements.txt, default is /tmp/requirements.txt
常见问题
如果在使用镜像工具生成Virtualenv压缩包时自动打包失败,您可以通过执行如下命令启动Linux Centos 7环境,并以Root权限进入环境内部进行操作。
-sudo docker run -ti --entrypoint bash dla-venv:0.1
后续操作请参见自行生成Virtualenv压缩包。