Prerequisites
An AnalyticDB for MySQL Data Lakehouse Edition cluster is created.
An Object Storage Service (OSS) bucket is created in the same region as the AnalyticDB for MySQL cluster.
A job resource group is created for the AnalyticDB for MySQL Data Lakehouse Edition cluster. For more information, see Create a resource group.
A database account is created for the AnalyticDB for MySQL cluster.
Use PySpark
Write the following sample code and save the code in a file named example.py
:
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
df = spark.sql("SELECT 1+1")
df.printSchema()
df.show()
Upload the example.py
file to OSS. For more information, see Upload objects.
Go to the Spark editor.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Clusters page, click an edition tab. Find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.
Run the following code in the editor:
{
"name": "Spark Python Test",
"file": "oss://{your oss bucket path}/example.py",
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 1,
"spark.executor.resourceSpec": "small"
}
}
For information about the parameters, see Overview.
Use Python dependencies
Method
If you develop Python applications by using self-developed or third-party dependencies, you must upload the dependencies to OSS and configure the pyFiles
parameter when you submit a Spark job.
Example
This section shows an example on how to calculate the after-tax incomes of employees by using a custom function. In this example, a file named staff.csv
is uploaded to OSS. The staff.csv
file contains the following data:
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
Compile and upload dependencies to OSS.
Create a folder named tools
. Create a file named func.py
in the folder.
def tax(salary):
"""
convert string to int and cut 15% tax from the salary
:param salary: The salary of staff worker
:return:
"""
return 0.15 * int(salary)
Compress and upload the tools
folder to OSS. In this example, the folder is compressed into the tools.tar.gz
package.
Note
If multiple dependent Python files are required, we recommend that you compress the files into a .gz package. You can reference Python files in Python code as modules.
Write the following sample code and save the code in a file named example.py
:
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import sys
from tools import func
if __name__ == "__main__":
spark = SparkSession.builder.appName("Python Example").getOrCreate()
cvs_file = sys.argv[1]
df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
df.printSchema()
df.show()
taxCut = udf(lambda salary: func.tax(salary), FloatType())
df.select("name", taxCut("salary").alias("final salary")).show()
spark.stop()
Upload the example.py
file to OSS. For more information, see Upload objects.
Go to the Spark editor.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Clusters page, click an edition tab. Find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.
Run the following code in the editor:
{
"name": "Spark Python",
"file": "oss://<bucket name>/example.py",
"pyFiles": ["oss://<bucket name>/tools.tar.gz"],
"args": [
"oss://<bucket name>/staff.csv"
],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small"
}
}
Parameters:
file: the OSS path of the Python code.
pyFiles: the OSS path of the Python dependencies that are required for PySpark. The suffix of the path can be .tar or .tar.gz. Separate multiple packages with commas (,).
Note
All Python dependencies that are required for PySpark must be stored in OSS.
args: the parameters that are required for JAR packages. In this example, the OSS path of the staff.csv
file is used.
For information about other parameters, see Overview.
Use the virtual environments technology to package dependent environments
If you encounter complex dependent environments when you develop Python jobs, you can use the virtual environments technology of Python to manage and isolate the environments. AnalyticDB for MySQL Spark allows you to use the virtual environments technology to package and upload the on-premises dependent environments to OSS. For more information about virtual environments, see Python documentation.
Important
AnalyticDB for MySQL Spark uses glibc-devel 2.28. If this version is not compatible with the virtual environments technology, PySpark jobs may fail to be run.
Method
To use the virtual environments technology to package Python environments, you must compress and upload the Python environments to OSS and configure parameters to complete the following settings when you submit a Spark job:
Example
Prepare a Linux operating system.
A Linux operating system is required to package Python environments based on the virtual environments technology. You can use one of the following methods to prepare a Linux operating system. In this example, an Elastic Compute Service (ECS) instance is purchased.
Purchase an ECS instance that runs Centos 7 or AnolisOS 8. For more information, see Create an instance on the Custom Launch tab.
Install an operating system of Centos 7, AnolisOS 8, or later on your on-premises device.
Use an official Docker image of Centos or AnolisOS and package Python environments in the image.
Use the virtual environments technology to package Python environments and upload the package to OSS.
Use the virtualenv or conda tool to package the dependent Python environments. You can customize the Python version during packaging. In this example, the virtualenv tool is used.
# Create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python python3.7 venv
# active environment
source venv/bin/activate
# install third party modules
pip install scikit-spark==0.4.0
# check the result
pip list
# compress the environment
tar -czvf venv.tar.gz venv
Note
For information about how to use the conda tool to package the dependent Python environments, see Managing environments.
Go to the Spark editor.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Clusters page, click an edition tab. Find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is selected.
Run the following code in the editor:
{
"name": "venv example",
"archives": [
"oss://testBucketname/venv.tar.gz#PY3"
],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 1,
"spark.pyspark.python": "./PY3/venv/bin/python3",
"spark.executor.resourceSpec": "small"
},
"file": "oss://testBucketname/example.py"
}
or
Note
Configure additional parameters if the Python environment package is excessively large.
{
"name": "venv example",
"conf": {
"spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
"spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 1,
"spark.pyspark.python": "./PY3/venv/bin/python3",
"spark.executor.resourceSpec": "small"
},
"file": "oss://testBucketname/example.py"
}
Parameters:
archives: the OSS path of the Python environment package. In this example, the OSS path of the venv.tar.gz
package is used.
spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES: the OSS path of the Python environment package, which is a Spark executor parameter.
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: the OSS path of the Python environment package, which is a Spark driver parameter.
spark.pyspark.python: the on-premises path of the Python interpreter.
For information about other parameters, see Overview.