全部产品
Search
文档中心

云原生数据仓库AnalyticDB:通过PySpark开发Spark应用

更新时间:Jul 25, 2024

本文介绍了如何开发AnalyticDB for MySQL Spark Python作业,以及如何通过VirtualEnv技术打包Python作业的运行环境。

前提条件

  • 集群的产品系列为湖仓版

  • 集群与OSS存储空间位于相同地域。

  • 已在湖仓版集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建AnalyticDB for MySQL集群的数据库账号。

PySpark的基本用法

  1. 编写如下示例程序,并将示例程序存储为example.py

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. example.py程序上传到OSS中。具体操作,请参见控制台上传文件

  3. 进入Spark开发编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击作业开发>Spark Jar 开发

  4. 在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。

  5. 在编辑器中执行以下作业内容。

    {
    
     "name": "Spark Python Test",
     "file": "oss://{your oss bucket path}/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
     }
    }

    参数说明请参见参数说明

使用Python依赖

使用方法

如果您使用自行开发或第三方开发的依赖开发Python程序时,需将使用的依赖上传至OSS中,并在提交Spark作业时配置pyFiles参数。

示例

本文示例以引入自定义函数计算员工的税后收入为例。示例将数据文件staff.csv上传至OSS中。staff.csv中的示例数据如下:

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. 开发依赖并上传至OSS中。

    1. 创建名为tools的文件夹,并在该文件夹下创建名为func.py的程序

      def tax(salary):
          """
          convert string to int and cut 15% tax from the salary
      
          :param salary: The salary of staff worker
          :return:
          """
          return 0.15 * int(salary)
      
    2. tools文件夹压缩后上传至OSS中。本文示例为tools.tar.gz

      说明

      如果依赖多个Python文件,建议您使用gz压缩包进行压缩。您可以在Python代码中以module方式引用Python文件。

  2. 编写名为example.py示例程序。

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    import sys
    
    # import third party file
    from tools import func
    
    if __name__ == "__main__":
        # init pyspark context
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # read csv from oss to a dataframe, show the table
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # print schema and data to the console
        df.printSchema()
        df.show()
        # create an udf
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # cut tax from salary and show result
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. example.py程序上传到OSS中。具体操作,请参见控制台上传文件

  4. 进入Spark开发编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击作业开发>Spark Jar 开发

  5. 在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。

  6. 在编辑器中执行以下作业内容。

    {
     "name": "Spark Python",
     "file": "oss://<bucket name>/example.py",
     "pyFiles": ["oss://<bucket name>/tools.tar.gz"],
     "args": [
     "oss://<bucket name>/staff.csv"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
     }
    }

    参数说明:

    • filePython程序所在的OSS路径。

    • pyFilesPySpark依赖的Python文件所在的OSS路径,后缀可以是tar或tar.gz。多个压缩包使用英文逗号(,)分隔。

      说明

      PySpark应用所依赖的所有Python文件必须存储在OSS中。

    • args:使用JAR包时需要使用的参数。本文为staff.csv示例数据所在的OSS路径。

    更多参数,请参见参数说明

使用Virtual Environments打包依赖环境

开发Python作业时,如果您遇到复杂的依赖环境,可以通过Python的Virtual Environments技术进行环境管理和隔离。AnalyticDB for MySQL Spark支持使用Virtual Environments将本地依赖的环境打包并上传到OSS中。关于Virtual Environments的更多信息,请参见Python官方社区文档

重要

AnalyticDB for MySQL Spark使用的glibc-devel版本为2.28,若Virtual Environments不兼容2.28版本,PySpark任务可能无法正常执行。

使用方法

使用Virtual Environments打包Python环境,需将压缩包上传至OSS中,并在提交Spark作业时配置相关参数,以指定Python环境压缩包所在的OSS路径和使用的Python解释器的本地路径。

  • 指定Python环境压缩包所在的OSS路径:

    • 若Python环境的压缩包较小,您可配置archives参数。

    • 若Python环境的压缩包较大,您可配置spark.executorEnv.ADB_SPARK_DOWNLOAD_FILESspark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES参数。

  • 指定使用的Python解释器的本地路径:spark.pyspark.python参数。

示例

  1. 准备Linux环境。

    Virtual Environments需在Linux操作系统中打包Python环境,您可以通过以下三种方式准备Linux环境。本文以购买阿里云ECS实例为例。

    • 购买操作系统为Centos 7或AnolisOS 8的阿里云ECS实例。具体操作,请参见自定义购买实例

    • 在本地安装Centos 7或者AnolisOS 8以上版本的操作系统。

    • 使用Centos或AnolisOS的官方Docker镜像,在镜像内部打包Python环境。

  2. 使用Virtual Environments打包Python运行环境,并将压缩包上传至OSS中。

    使用Virtualenv或Conda打包项目依赖的Python环境,打包时可自定义Python的版本。此处以Virtualenv打包为例。

    # Create directory venv at current path with python3
    # MUST ADD --copies !
    virtualenv --copies --download --python python3.7 venv
    
    # active environment
    source venv/bin/activate
    
    # install third party modules
    pip install scikit-spark==0.4.0
    
    # check the result
    pip list
    
    # compress the environment
    tar -czvf venv.tar.gz venv
    说明

    如果您想通过Conda打包项目依赖,请参见Conda管理虚拟环境

  3. 进入Spark开发编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击作业开发>Spark Jar 开发

  4. 在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。

  5. 在编辑器中执行以下作业内容。

    {
     "name": "venv example",
     "archives": [
     "oss://testBucketname/venv.tar.gz#PY3"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    说明

    Python环境的压缩包过大时,可参考如下代码。

    {
     "name": "venv example",
     "conf": {
     "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
     "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    参数说明:

    • archives:Python环境压缩包所在的OSS路径。本文示例为venv.tar.gz压缩包所在的OSS路径。

    • spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Executor节点参数,用于指定Python环境压缩包所在的OSS路径。

    • spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES:Spark Driver节点参数,用于指定Python环境压缩包所在的OSS路径。

    • spark.pyspark.python:指定要使用的Python解释器的本地路径。

    其他参数,请参见参数说明