In the serverless Spark engine of Data Lake Analytics (DLA), a Spark job must be described in the JSON format. The job information includes the job name, directory in which JAR packages are saved, and job configuration parameters. This topic describes how to configure a Spark job.
DLA is discontinued. AnalyticDB for MySQL Data Lakehouse Edition supports the existing features of DLA and provides more features and better performance. For more information about the configuration parameters of AnalyticDB for MySQL Spark, see Spark application configuration parameters
Sample code of a Spark job
The following sample code provides an example on how to write a Spark job. In this example, the Spark job is configured to read data from Object Storage Service (OSS). The command-line parameters are in the JSON format.
{
"args": ["oss://${oss-buck-name}/data/test/test.csv"],
"name": "spark-oss-test",
"file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
"className": "com.aliyun.spark.oss.SparkReadOss",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.dla.connectors": "oss"
}
}
The preceding code shows how to configure an offline Spark job. The parameters include the job name, main JAR file, entry class, parameters related to the entry class, and Spark job configurations. These parameters are similar to those in the spark-submit scripts defined by the Apache Spark community. This is because the configurations of the serverless Spark engine of DLA are the same as those defined by the Apache Spark community. The configurations include parameter names and semantics.
Job parameters
The following table describes the parametersthat are used to configure a Spark job.
Parameter | Required | Example | Description |
args | No |
| The parameters that you need to configure for a Spark job. The parameters are separated by commas (,). |
name | No |
| The name of the Spark job. |
file | Required if the Spark job is written in Python, Java, or Scala |
| The directory in which the main file of the Spark job is stored. The main file can be a JAR package that includes the entry class or entry execution file of Python. Note The main file of the Spark job must be stored in OSS. |
className | Required if the Spark job is written in Java or Scala |
| The entry class of the Java or Scala program. This parameter is not required if the Spark job is written in Python. |
sqls | Required if the Spark job is a SQL job |
| This parameter provides a feature that is developed by the Alibaba Cloud DLA team and is not defined by the Apache Spark community. You can specify this parameter to submit offline SQL jobs without the need to submit JAR packages or Python files. This parameter cannot be used with the file, className, or args parameter. You can specify multiple SQL statements for a Spark job. Separate multiple SQL statements with commas (,) and execute them in a specific order. |
jars | No |
| The JAR packages that are required for the Spark job. Multiple JAR packages are separated by commas (,). When the Spark job runs, JAR packages are added to the classpaths of the driver and executor Java virtual machines (JVMs). Note The JAR packages that are required for the Spark job must be stored in OSS. |
files | No |
| The files that are required for a Spark job. These files are downloaded to the working directories of the driver and executors. You can configure an alias for a file. For example, the alias of the yy.txt file in the Note
|
archives | No |
| The packages that are required for the Spark job. These packages must be in the ZIP, TAR, or TAR.GZ format. The packages are decompressed to the directory in which the Spark process is running. You can configure an alias for a package. For example, the alias of the yy.zip package in the oss://bucket/xx/ directory is yy and the zz.txt file is included in the package. In this case, you need to only enter ./yy/zz.txt in the code to access the zz.txt file. If you do not configure an alias for the package, you must use ./yy.zip/zz.txt to access the file. Multiple packages are separated by commas (,). Note The packages that are required for the Spark job must be stored in OSS. If a package fails to be decompressed, the job also fails. |
pyFiles | Optional if the Spark job is written in Python |
| The Python files that are required for a PySpark job. These files must be in the ZIP, PY, or EGG format. If multiple Python files are required, we recommend that you use the files in the ZIP or EGG format. These files can be referenced in the Python code by using a module method. Multiple packages are separated by commas (,). Note The Python files required for the PySpark job must be stored in OSS. |
conf | No |
| The parameters that are required for the Spark job. The parameters are the same as those configured for an Apache Spark job. The parameters are in the If you do not specify these parameters, the default settings that you configured when you create a virtual cluster are used. |
Configuration parameters
The configuration parameters of the serverless Spark engine of DLA are basically the same as those defined by the Apache Spark community. This section describes only the parameter differences and the parameters supported by the serverless Spark engine of DLA.
Differences
The serverless Spark engine of DLA uses different parameters to configure the resource specifications of the Spark driver and executors. These parameters are mapped to the parameters defined by the Apache Spark community.
Parameter
Description
Parameter defined by the Apache Spark community
spark.driver.resourceSpec
The resource specifications of the Spark driver. Valid values:
small: indicates 1 CPU core and 4 GB of memory.
medium: indicates 2 CPU cores and 8 GB of memory.
large: indicates 4 CPU cores and 16 GB of memory.
xlarge: indicates 8 CPU cores and 32 GB of memory.
spark.driver.cores and spark.driver.memory
spark.executor.resourceSpec
The resource specifications of Spark executors. The value of this parameter are the same as that of the spark.driver.resourceSpec parameter.
spark.executor.cores and spark.executor.memory
Parameters supported by the serverless Spark engine of DLA
Parameter used to access Apache Spark web UI
Parameter
Default value
Description
spark.dla.job.log.oss.uri
N/A
The uniform resource identifiers (URIs) of the directories in which the logs generated by a Spark job are saved and the URIs of the directories in which Spark web UI event logs are saved. Only OSS directories are supported. If you do not specify this parameter, you cannot query job logs or access Spark web UI after a job is complete.
Parameter used to grant permissions to a Resource Access Management (RAM) user
Parameter
Default value
Description
spark.dla.roleArn
N/A
The Alibaba Cloud Resource Name (ARN) of the RAM user that is granted the permissions to submit a job in the RAM console. This parameter is required only if you submit a job as a RAM user.
Parameters for built-in data source connectors
Parameter
Default value
Description
spark.dla.connectors
N/A
The names of the built-in connectors in the serverless Spark engine of DLA. Multiple connector names are separated by commas (,). Valid values: oss, hbase1.x, and tablestore.
spark.hadoop.job.oss.fileoutputcommitter.enable
false
The parameters that are required for optimizing the committer for a Parquet file. For more information, see OSS.
ImportantThe two parameters must be used together.
Parquet files cannot be used with files in other formats.
The spark.dla.connectors parameter must be set to oss. .
spark.sql.parquet.output.committer.class
com.aliyun.hadoop.mapreduce.lib.output.OSSFileOutputCommitter
spark.hadoop.io.compression.codec.snappy.native
false
Specifies whether a Snappy file is in the standard Snappy format. By default, Hadoop recognizes Snappy files edited in Hadoop. If this parameter is set to true, the standard Snappy library is used for decompression. Otherwise, the default Snappy library of Hadoop is used for decompression.
Parameters used to configure the network of data sources and connect to a data source
Parameter
Default value
Description
spark.dla.eni.enable
false
Specifies whether DLA can access a virtual private cloud (VPC). If this parameter is set to true, DLA can access the VPC.
spark.dla.eni.vswitch.id
N/A
The ID of the vSwitch that is associated with an elastic network interface (ENI). This ID is used to access the VPC. In most cases, if your Elastic Compute Service (ECS) instance can access a destination data source, you can set this parameter to the ID of the vSwitch that is connected to the ECS instance.
spark.dla.eni.security.group.id
N/A
The ID of a security group that is associated with an ENI. This ID is used to access a VPC. In most cases, if your ECS instance can access a destination data source, you can set this parameter to the ID of the security group to which the ECS instance is added.
spark.dla.eni.extra.hosts
N/A
The mappings between IP addresses and hostnames. This parameter enables the serverless Spark engine of DLA to correctly parse the domain names of data sources. You must specify this parameter if you use DLA to access a Hive data source.
ImportantSeparate IP addresses and hostnames with spaces. Separate multiple groups of IP addresses and hostnames with commas (,). Example: ip0 master0, ip1 master1.
Parameter for Spark SQL
Parameter
Default value
Description
spark.sql.hive.metastore.version
1.2.1
The version of a Hive metastore. The serverless Spark engine of DLA supports more values of this parameter in addition to the values that are defined by the Apache Spark community. If this parameter is set to dla, you can use Spark SQL to access the metadata of DLA.
Parameter for PySpark jobs
Parameter
Default value
Description
spark.kubernetes.pyspark.pythonVersion
2
The Python version that is used by the serverless Spark engine of DLA. Valid values: 2 and 3. A value of 2 indicates Python 2.0. A value of 3 indicates Python 3.0.
Parameters related to job attempts
Parameter
Default value
Description
Example
spark.dla.job.maxAttempts
1
The maximum number of job attempts that can be made for a failed job. The default value is 1, which indicates that the failed job cannot be retried.
NoteValid values: [1, 9999]. If a job succeeds, the job does not need to be retried. If a job fails and the value of this parameter is greater than 1, the failed job is automatically retried.
If the spark.dla.job.maxAttempts parameter is set to 3, a failed job can be retried up to three times.
spark.dla.job.attemptFailuresValidityInterval
-1
The interval for job attempt tracking. The default value is -1, which indicates that job attempt tracking is not enabled.
ImportantIf the difference between the end time of a job attempt and the current time exceeds the value of this parameter, this attempt is not counted as a failure.
If this parameter is set to a small value, failed jobs may be retried repeatedly. Therefore, we recommend that you do not specify this parameter.
Supported units:
ms: millisecond. This is the default unit.
m: minutes.
h: hour.
d: day.
For example, the spark.dla.job.attemptFailuresValidityInterval parameter is set to 30m, and the current time is 12:40. The end time of JobAttempt0 is 12:00, the end time of JobAttempt1 is 12:30, and the end time of JobAttempt2 is 12:35. In this case, JobAttempt0 is not counted as a job attempt, and the valid job attempts are JobAttempt1 and JobAttempt2. The total number of job attempts is 2.
Parameters related to resource configurations
Parameter
Default value
Description
spark.dla.driver.cpu-vcores-ratio
1
The ratio between vCPU cores and actual CPU cores used by the driver.
If the driver uses the medium resource specifications and this parameter is set to 2, the driver can run 4 vCPU cores in parallel. You can also set the spark.driver.cores parameter to 4 to achieve the same performance.
spark.dla.executor.cpu-vcores-ratio
1
The ratio between vCPU cores and actual CPU cores used by an executor. If the CPU utilization of a task is low, you can specify this parameter to increase the CPU utilization.
If an executor uses the medium specifications and this parameter is set to 2, the executor can run four vCPU cores in parallel. This indicates that four jobs are scheduled in parallel. You can also set the spark.executor.cores parameter to 4 to achieve the same affect.
Parameter related to monitoring
Parameter
Default value
Description
spark.dla.monitor.enabled
true
Specifies whether to enable monitoring for the job. The default value is true, which indicates that job monitoring is enabled.
If this parameter is set to false, the monitoring data of the job is not collected.