All Products
Search
Document Center

AnalyticDB:Use PySpark to develop Spark applications

Last Updated:Jul 29, 2024

This topic describes how to develop AnalyticDB for MySQL Spark Python jobs and use the virtual environments technology to package the runtime environment of Python jobs.

Prerequisites

  • An AnalyticDB for MySQL Data Lakehouse Edition cluster is created.

  • The AnalyticDB for MySQL cluster resides in the same region as an Object Storage Service (OSS) bucket.

  • A job resource group is created for the AnalyticDB for MySQL Data Lakehouse Edition cluster. For more information, see Create a resource group.

  • A database account is created for the AnalyticDB for MySQL cluster.

Use PySpark

  1. Write the following sample code and save the code in a file named example.py:

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. Upload the example.py file to OSS. For more information, see Upload objects.

  3. Go to the Spark editor.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  4. In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.

  5. Run the following code in the editor:

    {
    
     "name": "Spark Python Test",
     "file": "oss://{your oss bucket path}/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
     }
    }

    For information about the parameters, see Overview.

Use Python dependencies

Method

If you develop Python applications by using self-developed or third-party dependencies, you must upload the dependencies to OSS and configure the pyFiles parameter when you submit a Spark job.

Example

This section shows an example on how to calculate the after-tax incomes of employees by using a custom function. In this example, a file named staff.csv is uploaded to OSS. The staff.csv file contains the following data:

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. Compile and upload dependencies to OSS.

    1. Create a folder named tools. Create a file named func.py in the folder.

      def tax(salary):
          """
          convert string to int and cut 15% tax from the salary
      
          :param salary: The salary of staff worker
          :return:
          """
          return 0.15 * int(salary)
      
    2. Compress and upload the tools folder to OSS. In this example, the folder is compressed into the tools.tar.gz package.

      Note

      If multiple dependent Python files are required, we recommend that you compress the files into a .gz package. You can reference Python files in Python code as modules.

  2. Write the following sample code and save the code in a file named example.py:

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    import sys
    
    # import third party file
    from tools import func
    
    if __name__ == "__main__":
        # init pyspark context
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # read csv from oss to a dataframe, show the table
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # print schema and data to the console
        df.printSchema()
        df.show()
        # create an udf
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # cut tax from salary and show result
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. Upload the example.py file to OSS. For more information, see Upload objects.

  4. Go to the Spark editor.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  5. In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.

  6. Run the following code in the editor:

    {
     "name": "Spark Python",
     "file": "oss://<bucket name>/example.py",
     "pyFiles": ["oss://<bucket name>/tools.tar.gz"],
     "args": [
     "oss://<bucket name>/staff.csv"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
     }
    }

    Parameters:

    • file: the OSS path of the Python code.

    • pyFiles: the OSS path of the Python dependencies that are required for PySpark. The suffix of the path can be .tar or .tar.gz. Separate multiple packages with commas (,).

      Note

      All Python dependencies that are required for PySpark must be stored in OSS.

    • args: the parameters that are required for JAR packages. In this example, the OSS path of the staff.csv file is used.

    For information about other parameters, see Overview.

Use the virtual environments technology to package dependent environments

If you encounter complex dependent environments when you develop Python jobs, you can use the virtual environments technology of Python to manage and isolate the environments. AnalyticDB for MySQL Spark allows you to use the virtual environments technology to package and upload the on-premises dependent environments to OSS. For more information about virtual environments, see Python documentation.

Important

AnalyticDB for MySQL Spark uses glibc-devel 2.28. If this version is not compatible with the virtual environments technology, PySpark jobs may fail to be run.

Method

To use the virtual environments technology to package Python environments, you must compress and upload the Python environments to OSS and configure parameters to complete the following settings when you submit a Spark job:

  • Specify the OSS path of the Python environment package:

    • If the Python environment package is small, configure the archives parameter.

    • If the Python environment package is large, configure the spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES and spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES parameters.

  • Specify the path of the Python interpreter on your on-premises device by configuring the spark.pyspark.python parameter.

Example

  1. Prepare a Linux operating system.

    A Linux operating system is required to package Python environments based on the virtual environments technology. You can use one of the following methods to prepare a Linux operating system. In this example, an Elastic Compute Service (ECS) instance is purchased.

    • Purchase an ECS instance that runs Centos 7 or AnolisOS 8. For more information, see Create an instance on the Custom Launch tab.

    • Install an operating system of Centos 7, AnolisOS 8, or later on your on-premises device.

    • Use an official Docker image of Centos or AnolisOS and package Python environments in the image.

  2. Use the virtual environments technology to package Python environments and upload the package to OSS.

    Use the virtualenv or conda tool to package the dependent Python environments. You can customize the Python version during packaging. In this example, the virtualenv tool is used.

    # Create directory venv at current path with python3
    # MUST ADD --copies !
    virtualenv --copies --download --python python3.7 venv
    
    # active environment
    source venv/bin/activate
    
    # install third party modules
    pip install scikit-spark==0.4.0
    
    # check the result
    pip list
    
    # compress the environment
    tar -czvf venv.tar.gz venv
    Note

    For information about how to use the conda tool to package the dependent Python environments, see Managing environments.

  3. Go to the Spark editor.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  4. In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.

  5. Run the following code in the editor:

    {
     "name": "venv example",
     "archives": [
     "oss://testBucketname/venv.tar.gz#PY3"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    or

    Note

    Configure additional parameters if the Python environment package is excessively large.

    {
     "name": "venv example",
     "conf": {
     "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
     "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    Parameters:

    • archives: the OSS path of the Python environment package. In this example, the OSS path of the venv.tar.gz package is used.

    • spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES: the OSS path of the Python environment package, which is a Spark executor parameter.

    • spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: the OSS path of the Python environment package, which is a Spark driver parameter.

    • spark.pyspark.python: the on-premises path of the Python interpreter.

    For information about other parameters, see Overview.