This topic describes how to develop AnalyticDB for MySQL Spark Python jobs and use the virtual environments technology to package the runtime environment of Python jobs.
Prerequisites
An AnalyticDB for MySQL Data Lakehouse Edition cluster is created.
An Object Storage Service (OSS) bucket is created in the same region as the AnalyticDB for MySQL cluster.
A job resource group is created for the AnalyticDB for MySQL Data Lakehouse Edition cluster. For more information, see Create a resource group.
A database account is created for the AnalyticDB for MySQL cluster.
If you use an Alibaba Cloud account, you need to only create a privileged account. For more information, see the "Create a privileged account" section of the Create a database account topic.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user. For more information, see Create a database account and Associate or disassociate a database account with or from a RAM user.
Use PySpark
Write the following sample code and save the code in a file named
example.py
:from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show()
Upload the
example.py
file to OSS. For more information, see Upload objects.Go to the Spark editor.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Clusters page, click an edition tab. Find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.
Run the following code in the editor:
{ "name": "Spark Python Test", "file": "oss://{your oss bucket path}/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
For information about the parameters, see Overview.
Use Python dependencies
Method
If you develop Python applications by using self-developed or third-party dependencies, you must upload the dependencies to OSS and configure the pyFiles
parameter when you submit a Spark job.
Example
This section shows an example on how to calculate the after-tax incomes of employees by using a custom function. In this example, a file named staff.csv
is uploaded to OSS. The staff.csv
file contains the following data:
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
Compile and upload dependencies to OSS.
Create a folder named
tools
. Create a file namedfunc.py
in the folder.def tax(salary): """ convert string to int and cut 15% tax from the salary :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)
Compress and upload the
tools
folder to OSS. In this example, the folder is compressed into thetools.tar.gz
package.NoteIf multiple dependent Python files are required, we recommend that you compress the files into a .gz package. You can reference Python files in Python code as modules.
Write the following sample code and save the code in a file named
example.py
:from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # import third party file from tools import func if __name__ == "__main__": # init pyspark context spark = SparkSession.builder.appName("Python Example").getOrCreate() # read csv from oss to a dataframe, show the table cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # print schema and data to the console df.printSchema() df.show() # create an udf taxCut = udf(lambda salary: func.tax(salary), FloatType()) # cut tax from salary and show result df.select("name", taxCut("salary").alias("final salary")).show() spark.stop()
Upload the
example.py
file to OSS. For more information, see Upload objects.Go to the Spark editor.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Clusters page, click an edition tab. Find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.
Run the following code in the editor:
{ "name": "Spark Python", "file": "oss://<bucket name>/example.py", "pyFiles": ["oss://<bucket name>/tools.tar.gz"], "args": [ "oss://<bucket name>/staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }
Parameters:
file: the OSS path of the Python code.
pyFiles: the OSS path of the Python dependencies that are required for PySpark. The suffix of the path can be .tar or .tar.gz. Separate multiple packages with commas (,).
NoteAll Python dependencies that are required for PySpark must be stored in OSS.
args: the parameters that are required for JAR packages. In this example, the OSS path of the
staff.csv
file is used.
For information about other parameters, see Overview.
Use the virtual environments technology to package dependent environments
If you encounter complex dependent environments when you develop Python jobs, you can use the virtual environments technology of Python to manage and isolate the environments. AnalyticDB for MySQL Spark allows you to use the virtual environments technology to package and upload the on-premises dependent environments to OSS. For more information about virtual environments, see Python documentation.
AnalyticDB for MySQL Spark uses glibc-devel 2.28. If this version is not compatible with the virtual environments technology, PySpark jobs may fail to be run.
Method
To use the virtual environments technology to package Python environments, you must compress and upload the Python environments to OSS and configure parameters to complete the following settings when you submit a Spark job:
Specify the OSS path of the Python environment package:
If the Python environment package is small, configure the
archives
parameter.If the Python environment package is large, configure the
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES
andspark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES
parameters.
Specify the path of the Python interpreter on your on-premises device by configuring the
spark.pyspark.python
parameter.
Example
Prepare a Linux operating system.
A Linux operating system is required to package Python environments based on the virtual environments technology. You can use one of the following methods to prepare a Linux operating system. In this example, an Elastic Compute Service (ECS) instance is purchased.
Purchase an ECS instance that runs Centos 7 or AnolisOS 8. For more information, see Create an instance on the Custom Launch tab.
Install an operating system of Centos 7, AnolisOS 8, or later on your on-premises device.
Use an official Docker image of Centos or AnolisOS and package Python environments in the image.
Use the virtual environments technology to package Python environments and upload the package to OSS.
Use the virtualenv or conda tool to package the dependent Python environments. You can customize the Python version during packaging. In this example, the virtualenv tool is used.
# Create directory venv at current path with python3 # MUST ADD --copies ! virtualenv --copies --download --python python3.7 venv # active environment source venv/bin/activate # install third party modules pip install scikit-spark==0.4.0 # check the result pip list # compress the environment tar -czvf venv.tar.gz venv
NoteFor information about how to use the conda tool to package the dependent Python environments, see Managing environments.
Go to the Spark editor.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Clusters page, click an edition tab. Find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.
Run the following code in the editor:
{ "name": "venv example", "archives": [ "oss://testBucketname/venv.tar.gz#PY3" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
or
NoteConfigure additional parameters if the Python environment package is excessively large.
{ "name": "venv example", "conf": { "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3", "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
Parameters:
archives: the OSS path of the Python environment package. In this example, the OSS path of the
venv.tar.gz
package is used.spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES: the OSS path of the Python environment package, which is a Spark executor parameter.
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: the OSS path of the Python environment package, which is a Spark driver parameter.
spark.pyspark.python: the on-premises path of the Python interpreter.
For information about other parameters, see Overview.