全部產品
Search
文件中心

Data Lake Analytics - Deprecated:PySpark

更新時間:Jul 06, 2024

本文展示如何提交PySpark作業以及使用自訂Virtualenv。

重要

雲原生資料湖分析(DLA)產品已退市,AnalyticDB for MySQL湖倉版支援DLA已有功能,並提供更多的功能和更好的效能。AnalyticDB for MySQL相關使用文檔,請參見通過PySpark開發Spark應用

PySpark基本使用方式

1.開發主程式檔案

您可以建立如下內容的example.py檔案,樣本中定義main函數可以允許PySpark找到程式的統一啟動入口。

from __future__ import print_function
from pyspark.sql import SparkSession


# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    df = spark.sql("SELECT 2021")
    # print schema and data to the console
    df.printSchema()
    df.show()

2.執行主程式檔案

  1. 和Scala、Java程式開發的JAR包一樣,您需要將example.py檔案上傳到OSS中,並在Spark的啟動配置中使用file來指定這個檔案為開機檔案。

  2. 在DLA控制台的Serverless->作業管理頁面,使用如下範例程式碼配置作業。

    {
    
        "name": "Spark Python",
    
        "file": "oss://{your bucket name}/example.py"
    
        "conf": {
    
            "spark.driver.resourceSpec": "small",
    
            "spark.executor.instances": 2,
    
            "spark.executor.resourceSpec": "small",
    
            "spark.kubernetes.pyspark.pythonVersion": "3"
    
      }
    }

    重要
    • 您需要在配置時將{your bucket name}替換為您使用的OSS的Bucket名稱。

    • 樣本中使用Python 3執行檔案,和社區版的Spark相同,通過spark.kubernetes.pyspark.pythonVersion配置使用的Python版本,預設為Python 2.7。

  3. 單擊執行

如何上傳自行開發的或者第三方開發的Module

當開發Python程式時,往往會用到自行開發的或者由第三方開發的各種Module模組,這些模組可以上傳並載入到PySpark的執行環境中,被主程式調用。

以計算員工的稅後收入為例,步驟如下。

1. 準備測試資料

建立一個如下格式的CSV檔案,命名為staff.csv,並上傳到OSS中。檔案反映了每個員工的資訊和收入情況。

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
說明

如何將檔案上傳到OSS請參見簡單上傳

2. 開發一個依賴方法

  1. 建立一個檔案夾tools

  2. tools檔案夾中建立一個檔案func.py,檔案內容如下。

    def tax(salary):
        """
        convert string to int
        then cut 15% tax from the salary
        return a float number
    
        :param salary: The salary of staff worker
        :return:
        """
        return 0.15 * int(salary)
  3. tools檔案夾壓縮為tools.zip後上傳到OSS中。壓縮包的產生方式如下。p286346

    重要

    不同作業系統平台的ZIP壓縮公用程式會略有區別,請保證解壓後可以看到頂層目錄是tools檔案夾。

3. 開發主程式

開發一個Spark的Python程式,將測試中的CSV從OSS中讀取出來,註冊為一個DataFrame。同時將依賴包中的tax方法註冊為一個Spark UDF,然後使用該UDF對剛剛產生的DataFrame進行計算並列印結果。

範例程式碼如下, 您需要在配置時將{your bucket name}替換為您使用的OSS的Bucket名稱。

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

將代碼寫入到example.py中,並上傳到OSS。

4. 提交任務

在DLA控制台的Serverless->作業管理頁面,建立一個作業,並提交以下作業資訊。

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/tools.zip"],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

代碼中主要參數說明。

參數

說明

是否必選

conf

Spark任務用到的配置參數,需要的配置項如下。

  • "spark.dla.connectors": "oss" :此任務需要有串連OSS的能力。

  • "spark.kubernetes.pyspark.pythonVersion": "3" :此任務需要使用Python 3來執行。

說明

更多參數說明請參見作業配置指南

PySpark使用自訂Virtualenv

當需要複雜的第三方依賴包時,可以使用Virtualenv來將本地調試環境上傳到雲端的Spark叢集中。這種方式可以將大量複雜的系統包,如Pandas、Numpy、PyMySQL等裝入隔離環境,並遷移到相同的作業系統中。您可以選擇如下兩種方案。

說明

Virtualenv的更多資訊請參見Python官方社區venv說明

自行產生Virtualenv壓縮包

1.準備Linux環境

由於Virtualenv需要相同的作業系統,當前上傳到DLA Spark使用的壓縮包必須在Linux環境下的進行安裝。您可以採用如下方式準備Linux環境。

  • 準備一台Centos7的電腦進行打包。

  • 在阿里雲以隨用隨付的方式新開一台Centos 7的ECS,使用完畢後關閉。

  • 使用Centos 7的官方Docker鏡像,在鏡像內部打包。

2.在Linux環境下打包Python執行環境

常用的執行環境打包工具包括VirtualenvConda,您可以根據您的需要來選擇對應的工具,並安裝工具到您的Linux環境中。

重要
  • 當前Serverless Spark支援的Python版本為3.7及以下主要版本。

  • Spark運行環境為Centos 7,請使用該環境打包Venv(推薦使用Docker中的環境打包)。

以下樣本使用Virtualenv產生一個執行環境壓縮包venv.zip,壓縮包中包含了scikit-spark的特定版本。

# create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python Python3.7 venv

# active environment
source venv/bin/activate

# install third part modules
pip install scikit-spark==0.4.0

# check the result
pip list

# zip the environment
zip -r venv.zip venv
說明

如何使用Conda產生執行環境,請參見Conda管理虛擬環境

3.在Spark中使用Python執行環境

您可以在提交Spark作業時,使用如下的代碼配置作業。其中spark.pyspark.python的參數值表示上傳的壓縮檔中的運行包。更多參數說明,請參見作業參數說明

{
    "name": "venv example",
    "archives": [
        "oss://test/venv.zip#PY3"
    ],
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.pyspark.python": "./PY3/venv/bin/python3",
        "spark.executor.resourceSpec": "medium"
    },
    "file": "oss://test/example.py"
}
說明

與Spark開源社區的語義相同,venv.zip#PY3代表將壓縮包解壓到計算節點工作目錄的PY3檔案夾下,繼而可以從本地訪問。如果不使用#指定檔案夾名稱,則預設使用檔案名稱作為建立的檔案夾名。

使用鏡像工具產生Virtualenv壓縮包

1.使用如下命令拉取鏡像。

-sudo docker pull registry.cn-hangzhou.aliyuncs.com/dla_spark/dla-venv:0.1

2.將需要產生環境的requirements.txt檔案放置在/home/admin檔案夾中,並將此檔案夾掛載到Docker中。

-sudo docker run -ti -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
說明

requirements.txt是Python的標準依賴包描述檔案,更多資訊請參見User Guide

打包程式自動化執行,您可以看到如下日誌。

adding: venv-20210611-095454/lib64/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/site-packages/ (stored 0%)
  adding: venv-20210611-095454/pyvenv.cfg (deflated 30%)
  venv-20210611-095454.zip

3.在/home/admin檔案夾下找到打包好的壓縮檔venv-20210611-095454.zip。如何使用壓縮包請參見在Spark中使用Python執行環境

4.(可選)關於Docker鏡像的更多使用說明,您可以執行如下命令查看。

-sudo docker run -i dla-venv:0.1

Used to create venv package for Aliyun DLA 
Docker with host machine volumes: https://docs.docker.com/storage/volumes/
Please copy requirements.txt to a folder and mount the folder to docker as /tmp path 
Usage example: docker run -it -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt 
 -p python version, could be python2 or python3
 -f path to requirements.txt, default is /tmp/requirements.txt

常見問題

如果在使用鏡像工具產生Virtualenv壓縮包時自動打包失敗,您可以通過執行如下命令啟動Linux Centos 7環境,並以Root許可權進入環境內部進行操作。

-sudo docker run -ti --entrypoint bash dla-venv:0.1

後續操作請參見自行產生Virtualenv壓縮包