本文展示如何提交PySpark作業以及使用自訂Virtualenv。
雲原生資料湖分析(DLA)產品已退市,AnalyticDB for MySQL湖倉版支援DLA已有功能,並提供更多的功能和更好的效能。AnalyticDB for MySQL相關使用文檔,請參見通過PySpark開發Spark應用。
PySpark基本使用方式
1.開發主程式檔案
您可以建立如下內容的example.py
檔案,樣本中定義main函數可以允許PySpark找到程式的統一啟動入口。
from __future__ import print_function
from pyspark.sql import SparkSession
# import third part file
from tools import func
if __name__ == "__main__":
# init pyspark context
spark = SparkSession\
.builder\
.appName("Python Example")\
.getOrCreate()
df = spark.sql("SELECT 2021")
# print schema and data to the console
df.printSchema()
df.show()
2.執行主程式檔案
和Scala、Java程式開發的JAR包一樣,您需要將
example.py
檔案上傳到OSS中,並在Spark的啟動配置中使用file
來指定這個檔案為開機檔案。在DLA控制台的Serverless->作業管理頁面,使用如下範例程式碼配置作業。
{ "name": "Spark Python", "file": "oss://{your bucket name}/example.py" "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small", "spark.kubernetes.pyspark.pythonVersion": "3" } }
重要您需要在配置時將
{your bucket name}
替換為您使用的OSS的Bucket名稱。樣本中使用Python 3執行檔案,和社區版的Spark相同,通過
spark.kubernetes.pyspark.pythonVersion
配置使用的Python版本,預設為Python 2.7。
單擊執行。
如何上傳自行開發的或者第三方開發的Module
當開發Python程式時,往往會用到自行開發的或者由第三方開發的各種Module模組,這些模組可以上傳並載入到PySpark的執行環境中,被主程式調用。
以計算員工的稅後收入為例,步驟如下。
1. 準備測試資料
建立一個如下格式的CSV檔案,命名為staff.csv
,並上傳到OSS中。檔案反映了每個員工的資訊和收入情況。
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
如何將檔案上傳到OSS請參見簡單上傳。
2. 開發一個依賴方法
建立一個檔案夾
tools
。在
tools
檔案夾中建立一個檔案func.py
,檔案內容如下。def tax(salary): """ convert string to int then cut 15% tax from the salary return a float number :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)
將
tools
檔案夾壓縮為tools.zip
後上傳到OSS中。壓縮包的產生方式如下。重要不同作業系統平台的ZIP壓縮公用程式會略有區別,請保證解壓後可以看到頂層目錄是tools檔案夾。
3. 開發主程式
開發一個Spark的Python程式,將測試中的CSV從OSS中讀取出來,註冊為一個DataFrame
。同時將依賴包中的tax
方法註冊為一個Spark UDF
,然後使用該UDF
對剛剛產生的DataFrame
進行計算並列印結果。
範例程式碼如下, 您需要在配置時將{your bucket name}
替換為您使用的OSS的Bucket名稱。
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
# import third part file
from tools import func
if __name__ == "__main__":
# init pyspark context
spark = SparkSession\
.builder\
.appName("Python Example")\
.getOrCreate()
# read csv from oss to a dataframe, show the table
df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
# print schema and data to the console
df.printSchema()
df.show()
# create an udf
taxCut = udf(lambda salary: func.tax(salary), FloatType())
# cut tax from salary and show result
df.select("name", taxCut("salary").alias("final salary")).show()
spark.stop()
將代碼寫入到example.py
中,並上傳到OSS。
4. 提交任務
在DLA控制台的Serverless->作業管理頁面,建立一個作業,並提交以下作業資訊。
{
"name": "Spark Python",
"file": "oss://{your bucket name}/example.py",
"pyFiles": ["oss://{your bucket name}/tools.zip"],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small",
"spark.dla.connectors": "oss",
"spark.kubernetes.pyspark.pythonVersion": "3"
}
}
代碼中主要參數說明。
參數 | 說明 | 是否必選 |
conf | Spark任務用到的配置參數,需要的配置項如下。
| 否 |
更多參數說明請參見作業配置指南。
PySpark使用自訂Virtualenv
當需要複雜的第三方依賴包時,可以使用Virtualenv來將本地調試環境上傳到雲端的Spark叢集中。這種方式可以將大量複雜的系統包,如Pandas、Numpy、PyMySQL等裝入隔離環境,並遷移到相同的作業系統中。您可以選擇如下兩種方案。
Virtualenv的更多資訊請參見Python官方社區venv說明。
自行產生Virtualenv壓縮包
1.準備Linux環境
由於Virtualenv
需要相同的作業系統,當前上傳到DLA Spark使用的壓縮包必須在Linux環境下的進行安裝。您可以採用如下方式準備Linux環境。
準備一台Centos7的電腦進行打包。
在阿里雲以隨用隨付的方式新開一台Centos 7的ECS,使用完畢後關閉。
使用Centos 7的官方Docker鏡像,在鏡像內部打包。
2.在Linux環境下打包Python執行環境
常用的執行環境打包工具包括Virtualenv、Conda,您可以根據您的需要來選擇對應的工具,並安裝工具到您的Linux環境中。
當前Serverless Spark支援的Python版本為3.7及以下主要版本。
Spark運行環境為Centos 7,請使用該環境打包Venv(推薦使用Docker中的環境打包)。
以下樣本使用Virtualenv
產生一個執行環境壓縮包venv.zip
,壓縮包中包含了scikit-spark
的特定版本。
# create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python Python3.7 venv
# active environment
source venv/bin/activate
# install third part modules
pip install scikit-spark==0.4.0
# check the result
pip list
# zip the environment
zip -r venv.zip venv
如何使用Conda產生執行環境,請參見Conda管理虛擬環境。
3.在Spark中使用Python執行環境
您可以在提交Spark作業時,使用如下的代碼配置作業。其中spark.pyspark.python
的參數值表示上傳的壓縮檔中的運行包。更多參數說明,請參見作業參數說明。
{
"name": "venv example",
"archives": [
"oss://test/venv.zip#PY3"
],
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.dla.connectors": "oss",
"spark.executor.instances": 1,
"spark.dla.job.log.oss.uri": "oss://test/spark-logs",
"spark.pyspark.python": "./PY3/venv/bin/python3",
"spark.executor.resourceSpec": "medium"
},
"file": "oss://test/example.py"
}
與Spark開源社區的語義相同,venv.zip#PY3
代表將壓縮包解壓到計算節點工作目錄的PY3
檔案夾下,繼而可以從本地訪問。如果不使用#
指定檔案夾名稱,則預設使用檔案名稱作為建立的檔案夾名。
使用鏡像工具產生Virtualenv壓縮包
1.使用如下命令拉取鏡像。
-sudo docker pull registry.cn-hangzhou.aliyuncs.com/dla_spark/dla-venv:0.1
2.將需要產生環境的requirements.txt
檔案放置在/home/admin
檔案夾中,並將此檔案夾掛載到Docker中。
-sudo docker run -ti -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
requirements.txt
是Python的標準依賴包描述檔案,更多資訊請參見User Guide。
打包程式自動化執行,您可以看到如下日誌。
adding: venv-20210611-095454/lib64/ (stored 0%)
adding: venv-20210611-095454/lib64/python3.6/ (stored 0%)
adding: venv-20210611-095454/lib64/python3.6/site-packages/ (stored 0%)
adding: venv-20210611-095454/pyvenv.cfg (deflated 30%)
venv-20210611-095454.zip
3.在/home/admin
檔案夾下找到打包好的壓縮檔venv-20210611-095454.zip
。如何使用壓縮包請參見在Spark中使用Python執行環境。
4.(可選)關於Docker鏡像的更多使用說明,您可以執行如下命令查看。
-sudo docker run -i dla-venv:0.1
Used to create venv package for Aliyun DLA
Docker with host machine volumes: https://docs.docker.com/storage/volumes/
Please copy requirements.txt to a folder and mount the folder to docker as /tmp path
Usage example: docker run -it -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
-p python version, could be python2 or python3
-f path to requirements.txt, default is /tmp/requirements.txt
常見問題
如果在使用鏡像工具產生Virtualenv壓縮包時自動打包失敗,您可以通過執行如下命令啟動Linux Centos 7環境,並以Root許可權進入環境內部進行操作。
-sudo docker run -ti --entrypoint bash dla-venv:0.1
後續操作請參見自行產生Virtualenv壓縮包。