本文介紹了如何開發AnalyticDB for MySQL Spark Python作業,以及如何通過VirtualEnv技術打包Python作業的運行環境。
前提條件
叢集的產品系列為湖倉版。
叢集與OSS儲存空間位於相同地區。
已在湖倉版叢集中建立Job型資源群組。具體操作,請參見建立資源群組。
已建立AnalyticDB for MySQL叢集的資料庫帳號。
如果是通過阿里雲帳號訪問,只需建立高許可權帳號。具體操作,請參見建立高許可權帳號。
如果是通過RAM使用者訪問,需要建立高許可權帳號和普通帳號並且將RAM使用者綁定到普通帳號上。具體操作,請參見建立資料庫帳號和綁定或解除綁定RAM使用者與資料庫帳號。
PySpark的基本用法
編寫如下樣本程式,並將樣本程式儲存為
example.py
。from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show()
將
example.py
程式上傳到OSS中。具體操作,請參見控制台上傳檔案。進入Spark開發編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單。在集群清單上方,選擇產品系列,然後單擊目的地組群ID。
在左側導覽列,單擊作業開發>Spark Jar 開發。
在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。
在編輯器中執行以下作業內容。
{ "name": "Spark Python Test", "file": "oss://{your oss bucket path}/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
參數說明請參見參數說明。
使用Python依賴
使用方法
如果您使用自行開發或第三方開發的依賴開發Python程式時,需將使用的依賴上傳至OSS中,並在提交Spark作業時配置pyFiles
參數。
樣本
本文樣本以引入自訂Function Compute員工的稅後收入為例。樣本將資料檔案staff.csv
上傳至OSS中。staff.csv
中的樣本資料如下:
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
開發依賴並上傳至OSS中。
建立名為
tools
的檔案夾,並在該檔案夾下建立名為func.py
的程式。def tax(salary): """ convert string to int and cut 15% tax from the salary :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)
將
tools
檔案夾壓縮後上傳至OSS中。本文樣本為tools.tar.gz
。說明如果依賴多個Python檔案,建議您使用gz壓縮包進行壓縮。您可以在Python代碼中以module方式引用Python檔案。
編寫名為
example.py
的樣本程式。from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # import third party file from tools import func if __name__ == "__main__": # init pyspark context spark = SparkSession.builder.appName("Python Example").getOrCreate() # read csv from oss to a dataframe, show the table cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # print schema and data to the console df.printSchema() df.show() # create an udf taxCut = udf(lambda salary: func.tax(salary), FloatType()) # cut tax from salary and show result df.select("name", taxCut("salary").alias("final salary")).show() spark.stop()
將
example.py
程式上傳到OSS中。具體操作,請參見控制台上傳檔案。進入Spark開發編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單。在集群清單上方,選擇產品系列,然後單擊目的地組群ID。
在左側導覽列,單擊作業開發>Spark Jar 開發。
在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。
在編輯器中執行以下作業內容。
{ "name": "Spark Python", "file": "oss://<bucket name>/example.py", "pyFiles": ["oss://<bucket name>/tools.tar.gz"], "args": [ "oss://<bucket name>/staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }
參數說明:
file:Python程式所在的OSS路徑。
pyFiles:PySpark依賴的Python檔案所在的OSS路徑,尾碼可以是tar或tar.gz。多個壓縮包使用英文逗號(,)分隔。
說明PySpark應用所依賴的所有Python檔案必須儲存在OSS中。
args:使用JAR包時需要使用的參數。本文為
staff.csv
樣本資料所在的OSS路徑。
更多參數,請參見參數說明。
使用Virtual Environments打包依賴環境
開發Python作業時,如果您遇到複雜的依賴環境,可以通過Python的Virtual Environments技術進行環境管理和隔離。AnalyticDB for MySQL Spark支援使用Virtual Environments將本地依賴的環境打包並上傳到OSS中。關於Virtual Environments的更多資訊,請參見Python官方社區文檔。
AnalyticDB for MySQL Spark使用的glibc-devel版本為2.28,若Virtual Environments不相容2.28版本,PySpark任務可能無法正常執行。
使用方法
使用Virtual Environments打包Python環境,需將壓縮包上傳至OSS中,並在提交Spark作業時配置相關參數,以指定Python環境壓縮包所在的OSS路徑和使用的Python解譯器的本地路徑。
指定Python環境壓縮包所在的OSS路徑:
若Python環境的壓縮包較小,您可配置
archives
參數。若Python環境的壓縮包較大,您可配置
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES
和spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES
參數。
指定使用的Python解譯器的本地路徑:
spark.pyspark.python
參數。
樣本
準備Linux環境。
Virtual Environments需在Linux作業系統中打包Python環境,您可以通過以下三種方式準備Linux環境。本文以購買阿里雲ECS執行個體為例。
購買作業系統為Centos 7或AnolisOS 8的阿里雲ECS執行個體。具體操作,請參見自訂購買執行個體。
在本地安裝Centos 7或者AnolisOS 8以上版本的作業系統。
使用Centos或AnolisOS的官方Docker鏡像,在鏡像內部打包Python環境。
使用Virtual Environments打包Python運行環境,並將壓縮包上傳至OSS中。
使用Virtualenv或Conda打包專案依賴的Python環境,打包時可自訂Python的版本。此處以Virtualenv打包為例。
# Create directory venv at current path with python3 # MUST ADD --copies ! virtualenv --copies --download --python python3.7 venv # active environment source venv/bin/activate # install third party modules pip install scikit-spark==0.4.0 # check the result pip list # compress the environment tar -czvf venv.tar.gz venv
說明如果您想通過Conda打包專案依賴,請參見Conda管理虛擬環境。
進入Spark開發編輯器。
登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單。在集群清單上方,選擇產品系列,然後單擊目的地組群ID。
在左側導覽列,單擊作業開發>Spark Jar 開發。
在編輯器視窗上方,選擇Job型資源群組和Spark作業類型。本文以Batch類型為例。
在編輯器中執行以下作業內容。
{ "name": "venv example", "archives": [ "oss://testBucketname/venv.tar.gz#PY3" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
或
說明Python環境的壓縮包過大時,可參考如下代碼。
{ "name": "venv example", "conf": { "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3", "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
參數說明:
archives:Python環境壓縮包所在的OSS路徑。本文樣本為
venv.tar.gz
壓縮包所在的OSS路徑。spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Executor節點參數,用於指定Python環境壓縮包所在的OSS路徑。
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Driver節點參數,用於指定Python環境壓縮包所在的OSS路徑。
spark.pyspark.python:指定要使用的Python解譯器的本地路徑。
其他參數,請參見參數說明。