This topic describes how to submit a PySpark job and use a custom virtual environment in Data Lake Analytics (DLA).
DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides additional features and enhanced performance. For more information about how to use PySpark and custom virtual environments in AnalyticDB for MySQL, see Use PySpark to develop Spark applications.
Use PySpark
1. Create a main program file
Create a file named example.py
by writing the following sample code. The sample code includes a main function that serves as the entry point for PySpark to start a program.
from __future__ import print_function
from pyspark.sql import SparkSession
# import third part file
from tools import func
if __name__ == "__main__":
# init pyspark context
spark = SparkSession\
.builder\
.appName("Python Example")\
.getOrCreate()
df = spark.sql("SELECT 2021")
# print schema and data to the console
df.printSchema()
df.show()
2. Execute the main program file
Similar to JAR files that are written in Scala or Java, the
example.py
file must be uploaded to Object Storage Service (OSS). In addition, you must specify the example.py file as the startup file by using thefile
parameter in your Spark startup configurationsLog on to the DLA console. In the left-side navigation pane, choose Serverless Spark > Submit job. On the Parameter Configuration page, enter the following code in the code editor.
{ "name": "Spark Python", "file": "oss://{your bucket name}/example.py" "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small", "spark.kubernetes.pyspark.pythonVersion": "3" } }
ImportantReplace
{your bucket name}
with the name of the OSS bucket to which the example.py file is uploaded.The preceding code is written in Python 3, which is the Python version used by Apache Spark. If you use the
spark.kubernetes.pyspark.pythonVersion
parameter to specify the Python version, Python 2.7 is used by default.
Click Execute.
Upload a self-developed or third-party module
When you develop a Python program, you can use self-developed or third-party modules. You can upload these modules and load them to the execution environment for the main program to call.
This section shows an example on how to calculate the after-tax incomes of employees.
1. Prepare test data
Create a CSV file named staff.csv
and upload the file to OSS. This file contains the following content that indicates the basic information and income of each employee.
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
For more information about how to upload a file to OSS, see Simple upload.
2. Compile dependencies
Create a folder named
tools
.Create a file named
func.py
in thetools
folder. This file contains the following content:def tax(salary): """ convert string to int then cut 15% tax from the salary return a float number :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)
Compress and upload the
tools
folder to OSS. In this example, the folder is compressed to thetools.zip
package. Run the following code to generate the tools.zip package.ImportantThe ZIP compression tool varies based on the operating system that you use. Make sure that the tools folder is the root directory after the tools.zip package is decompressed.
3. Develop the main program
Develop a Spark program in Python. This program allows DLA to read data from the staff.csv file in OSS. Then, register this file as a DataFrame
. In addition, register the tax
method in the package that is created in Step 2 as a Spark UDF
. Then, use the Spark UDF
to calculate the DataFrame
and generate results.
In the following sample code, replace {your bucket name}
with the name of the OSS bucket to which the tools.zip package is uploaded.
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
# import third part file
from tools import func
if __name__ == "__main__":
# init pyspark context
spark = SparkSession\
.builder\
.appName("Python Example")\
.getOrCreate()
# read csv from oss to a dataframe, show the table
df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
# print schema and data to the console
df.printSchema()
df.show()
# create an udf
taxCut = udf(lambda salary: func.tax(salary), FloatType())
# cut tax from salary and show result
df.select("name", taxCut("salary").alias("final salary")).show()
spark.stop()
Write the preceding code to the example.py
file and upload the file to OSS.
4. Submit a job
In the left-side navigation pane of the DLA console, choose Serverless Spark > Submit job. On the Parameter Configuration page, create a job and enter the following code in the code editor.
{
"name": "Spark Python",
"file": "oss://{your bucket name}/example.py",
"pyFiles": ["oss://{your bucket name}/tools.zip"],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small",
"spark.dla.connectors": "oss",
"spark.kubernetes.pyspark.pythonVersion": "3"
}
}
The following table describes the parameters in the code.
Parameter | Description | Required |
conf | The configuration parameters that are required for the Spark job.
| No |
For more information about other parameters, see Configure a Spark job.
Use a custom virtual environment for PySpark jobs
If third-party dependencies are required, you can use a virtual environment to upload the local debugging environment to the Spark cluster in the cloud. This way, a large number of complex system packages, such as Pandas, Numpy, and PyMySQL packages, can be loaded to an isolated environment and migrated to the same operating system. You can use one of the following methods to generate a virtual environment package:
Enable the system to automatically generate a virtual environment package.
Use an image tool to generate a virtual environment package.
For more information about virtual environments, see Virtual Environments and Packages.
Enable the system to automatically generate a virtual environment package
1. Prepare a Linux operating system
The operating system for packaging a virtual environment
must be the same as the operating system of the serverless Spark engine where the virtual environment package is used. Therefore, the virtual environment package that you want to upload to the serverless Spark engine of DLA must be installed in Linux. You can use one of the following methods to prepare a Linux operating system for packaging a virtual environment:
Prepare a computer that runs CentOS 7 to package the virtual environment.
Purchase an Elastic Compute Service (ECS) instance that runs CentOS 7 and select the pay-as-you-go billing method for the instance. After the virtual environment package is generated, release the ECS instance.
Use the Docker image of CentOS 7 to package the virtual environment.
2. Package a Python virtual environment in Linux
You can use a tool, such as virtualenv or conda, to package a virtual environment. Before you package a virtual environment, select a tool based on your business requirements and install the tool in Linux.
The serverless Spark engine of DLA supports Python 3.7 and earlier major Python versions.
The serverless Spark engine runs CentOS 7. We recommend that you use CentOS 7 in which Docker is installed to package a virtual environment.
The following code demonstrates how to use virtualenv
to generate a venv.zip
file. The venv.zip file contains scikit-spark
of a specific version.
# create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python Python3.7 venv
# active environment
source venv/bin/activate
# install third part modules
pip install scikit-spark==0.4.0
# check the result
pip list
# zip the environment
zip -r venv.zip venv
For more information about how to use conda to generate a virtual environment package, see Managing environments.
3. Run the virtual environment in the serverless Spark engine
Before you submit a Spark job, run the following code to configure the job. In the code, the spark.pyspark.python
parameter specifies the file that is used to run the virtual environment in the uploaded package. For more information about the parameters, see Configure a Spark job
{
"name": "venv example",
"archives": [
"oss://test/venv.zip#PY3"
],
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.dla.connectors": "oss",
"spark.executor.instances": 1,
"spark.dla.job.log.oss.uri": "oss://test/spark-logs",
"spark.pyspark.python": "./PY3/venv/bin/python3",
"spark.executor.resourceSpec": "medium"
},
"file": "oss://test/example.py"
}
As defined by the Apache Spark community, venv.zip#PY3
indicates that the package is decompressed to the PY3
folder in the working directory of a compute node for local data access. If you do not use the number sign (#
) to specify the folder name, the name of the file in the package is automatically used as the folder name.
Use an image tool to generate a virtual environment package
1. Run the following command to pull a Docker image from a registry:
-sudo docker pull registry.cn-hangzhou.aliyuncs.com/dla_spark/dla-venv:0.1
2. Place the requirements.txt
file in the /home/admin
directory and mount the admin folder to the Docker image.
-sudo docker run -ti -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
The requirements.txt
file describes the standard Python dependencies. For more information, see User Guide.
The packaging program is automatically executed. You can view the following log information during the execution process.
adding: venv-20210611-095454/lib64/ (stored 0%)
adding: venv-20210611-095454/lib64/python3.6/ (stored 0%)
adding: venv-20210611-095454/lib64/python3.6/site-packages/ (stored 0%)
adding: venv-20210611-095454/pyvenv.cfg (deflated 30%)
venv-20210611-095454.zip
3. In the /home/admin
directory, find the virtual environment package named venv-20210611-095454.zip
. For more information about how to use the virtual environment package, see the Run the virtual environment in the serverless Spark engine section of this topic.
4. Optional. Run the following command to obtain more information about how to use the Docker image:
-sudo docker run -i dla-venv:0.1
Used to create venv package for Aliyun DLA
Docker with host machine volumes: https://docs.docker.com/storage/volumes/
Please copy requirements.txt to a folder and mount the folder to docker as /tmp path
Usage example: docker run -it -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
-p python version, could be python2 or python3
-f path to requirements.txt, default is /tmp/requirements.txt
FAQ
If automatic packaging fails when you use an image tool to generate a virtual environment package, you can run the following command to start CentOS 7 and use the root account to perform operations.
-sudo docker run -ti --entrypoint bash dla-venv:0.1
For more information about subsequent operations, see the Enable the system to automatically generate a virtual environment package section of this topic.