Lindorm Distributed Processing System (LDPS) provides a RESTful API that allows you to submit Python-based Spark jobs. You can submit Python-based Spark jobs to run stream processing tasks, batch processing tasks, machine learning tasks, and graph computing tasks. This topic describes how to create a job in Python and submit the job to LDPS for execution.
Prerequisites
LDPS is activated for your Lindorm instance. For more information, see Activate LDPS and modify the configurations.
Procedure
Step 1: Define a Python-based Spark job
Click Sample Spark job to download the demos provided by LDPS.
Extract files from the downloaded package. The demos of sample jobs are contained in the
lindorm-spark-examples
directory. Go to thelindorm-spark-examples/python
directory and view the files in the python folder.In this example, the root directory of the project is
your_project
. Perform the following steps to create a Python-based Spark job:Create an empty file named
__init__.py
in theyour_project
directory.Modify the main.py file.
Open the
lindorm-spark-examples/python/your_project/main.py
file and modify the code in the main.py file to add the path of theyour_project
file tosys.path
. For more information, see the Notice1 section in the main.py file.# Notice1: You need to do the following step to complete the code modification: # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py # Step2: Please add current dir to sys.path, you should add the following code to your main file current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
Encapsulate the entry logic into the
main(argv)
method in the main.py file. For more information, see the Notice2 section in thelindorm-spark-examples/python/your_project/main.py
file.# Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function, # so that launcher.py in parent directory just call main(sys.argv) def main(argv): print("Receive arguments: %s \n" % str(argv)) print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__))) # Write your code here if __name__ == "__main__": main(sys.argv)
Create an entry file that is used to call the
main(argv)
method to start the Python-based Spark job. In the demo, thelauncher.py
file in thelindorm-spark-examples/python
directory is the entry file. You can use the code in thelauncher.py
file to start the Python-based Spark job.
Step 2: Package the Python-based Spark job
Package the Python runtime files and third-party class libraries on which the project depends. We recommend that you use Conda or Virtualenv to package the dependent class libraries into a tar package. For more information, see Python Package Management.
ImportantYou can configure the spark.archives parameter to specify the packages generated by Conda or Virtualenv as the files to be decompressed and distributed. Therefore, the job can be packaged into files of all types supported by this parameter. For more information, see spark.archives.
You must complete this operation in Linux. This ensures that LDPS can recognize the binary files in Python.
Package the files of the project. Package the files of the
your_project
project to aZIP
orEGG
file.You can run the following command to package the files of the your_project project to a
ZIP
file:zip -r project.zip your_project
For information about how to package the files of the your_project project to an
EGG
file:, see Building Eggs.
Step 3: Upload the files of the Python-based Spark job
Upload the files that are described in the following list to an Object Storage Service (OSS) bucket. For more information, see Simple upload.
Step 4: Submit the Python-based Spark job
LDPS allows you to submit and manage jobs by using the following two methods:
Submit jobs in the Lindorm console. For more information, see Manage jobs in the Lindorm console.
Submit jobs by using Data Management (DMS). For more information, see Use DMS to manage jobs.
You must configure the following two types of request parameters when you submit a job:
Parameters that are used to specify the runtime environment of the Python-based job. Example:
{"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
To submit the files of the project, configure the spark.submit.pyFiles parameter in the value of the configs parameter. You can specify the OSS path of the
ZIP
orEGG
file that contains the files of the project, or thelauncher.py
file of the project.To submit the .tar file that contains the Python runtime files and third-party class libraries, configure the spark.archives parameter and the spark.kubernetes.driverEnv.PYSPARK_PYTHON parameter in the value of the configs parameter.
Use a number sign (#) to specify the value of the targetDir parameter when you configure the spark.archives parameter.
The spark.kubernetes.driverEnv.PYSPARK_PYTHON parameter specifies the directory in which the files of the Python runtime environment are stored.
Parameters that are required to upload a file to OSS. You must include the parameters in the value of the configs parameters.
Table 1 Parameters in configs
Parameter
Example
Description
spark.hadoop.fs.oss.endpoint
oss-cn-beijing-internal.aliyuncs.com
The endpoint of the OSS bucket in which you want to store the file.
spark.hadoop.fs.oss.accessKeyId
testAccessKey ID
The Access Key ID and Access Key Secret that are used for identity authorization. You can create or obtain Access Key IDs and Access Key Secrets in the Alibaba Cloud Management Console. For more information, see Obtain an AccessKey pair.
spark.hadoop.fs.oss.accessKeySecret
testAccessKey Secret
spark.hadoop.fs.oss.impl
Set the value to org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.
The class that is used to access OSS.
NoteFor more information about the parameters, see Parameters.
Examples
Click Sample Spark job to download the file that contains demos and then extract the files from the file that is downloaded.
Open the your_project/main.py file and modify the entry point of the Python project.
Add the path of your_project file to sys.path.
current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
Specify the entry logic in the main.py file. The following sample code shows how to initialize the SparkSession session.
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PythonImportTest") \ .getOrCreate() print(spark.conf) spark.stop()
Package the your_project file in the Python directory to a .zip file.
zip -r your_project.zip your_project
In Linux, use Conda to package the files of the Python runtime environment.
conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz
Upload the your_project.zip file, pyspark_conda_env.tar.gz file, and launcher.py file to OSS.
Submit the job by using one of the following methods:
Submit the job in the Lindorm console. For more information, see Manage jobs in the Lindorm console.
Submit the job by using DMS. For more information, see Use DMS to manage jobs.
Job diagnostics
After the job is submitted, you can view the status of the job and the address of the Spark web user interface (UI) on the Jobs page. For more information, see View the details of a Spark job. If you have any problems when you submit the job, submit a ticket and provide the job ID and the address of the Spark web UI.