This topic describes how to use the spark-submit CLI of Data Lake Analytics (DLA) and provides some examples of using the spark-submit CLI.
DLA is discontinued. AnalyticDB for MySQL Data Lakehouse Edition supports the existing features of DLA and provides more features and better performance. For more information about how to use AnalyticDB for MySQL to develop Spark applications, see Use spark-submit to develop Spark applications.
Download and install the spark-submit CLI
Download the installation package of the spark-submit CLI.
You can also run the following wget command to download this package whose file name is dla-spark-toolkit.tar.gz:
wget https://dla003.oss-cn-hangzhou.aliyuncs.com/dla_spark_toolkit/dla-spark-toolkit.tar.gz
After the package is downloaded, decompress the package.
tar zxvf dla-spark-toolkit.tar.gz
NoteTo use the spark-submit CLI, make sure that JDK 8 or later is installed.
Configure parameters
Use the command line to configure common parameters in the spark-defaults.conf file. The following sample code shows the parameters that you can configure in the spark-defaults.conf file.
# cluster information
# AccessKeyId
#keyId =
# AccessKeySecret
#secretId =
# RegionId
#regionId =
# set vcName
#vcName =
# set ossKeyId, if not set will use --keyId value or keyId value
#ossKeyId =
# set ossSecretId if not set will use --secretId value or secretId value
#ossSecretId =
# set OssUploadPath, if you need upload local resource
#ossUploadPath =
##spark conf
# driver specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.driver.resourceSpec =
# executor instance number
#spark.executor.instances =
# executor specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.executor.resourceSpec =
# when use ram, role arn
#spark.dla.roleArn =
# config dla oss connectors
#spark.dla.connectors = oss
# config eni, if you want to use eni
#spark.dla.eni.enable = true
#spark.dla.eni.vswitch.id =
#spark.dla.eni.security.group.id =
# config log location, need an oss path to store logs
#spark.dla.job.log.oss.uri =
# config spark read dla table when use option -f or -e
#spark.sql.hive.metastore.version = dla
## any other user defined spark conf...
You must configure the keyId
, secretId
, regionId
, and vcName
parameters. The following table describes these parameters.
Parameter | Description |
keyId | The AccessKey ID of your Alibaba Cloud account. |
secretId | The AccessKey secret of your Alibaba Cloud account. |
vcName | The name of your Spark cluster. |
regionId | The ID of the region in which your Spark cluster resides. For more information about the mappings between regions, zones and region IDs, see Regions and Zones. |
You can run the following command to query the information to help you use the spark-submit CLI:
cd /path/to/dla-spark-toolkit
./bin/spark-submit --help
After you run the preceding command, the following results are returned:
Info: Usage: spark-submit [options] <app jar> [app arguments]
Usage: spark-submit --list [PAGE_NUMBER] [PAGE_SIZE]
Usage: spark-submit --kill [JOB_ID]
Info:
Options:
--keyId Your ALIYUN_ACCESS_KEY_ID, is same as `keyId` in conf/spark-defaults.conf or --conf spark.dla.access.key.id=<value>, required
--secretId Your ALIYUN_ACCESS_KEY_SECRET, is same as `secretId` in conf/spark-defaults.conf or --conf spark.dla.access.secret.id=<value>, required
--regionId Your Cluster Region Id, is same as `regionId` in conf/spark-defaults.conf or --conf spark.dla.region.id=<value>, required
--vcName Your Virtual Cluster Name, is same as `vcName` in conf/spark-defaults.conf or --conf spark.dla.vc.name=<value>, required
--oss-keyId Your ALIYUN_ACCESS_KEY_ID to upload local resource to oss,
by default, the value will take from --keyId, is same as `ossKeyId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.key.id=<value>
--oss-secretId Your ALIYUN_ACCESS_KEY_SECRET to upload local resource to oss,
default the value will take from --secretId, is same as `ossSecretId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.secret.id=<value>
--oss-endpoint Oss endpoint where the resource will upload. default is http://oss-$regionId.aliyuncs.com,
is same as `ossEndpoint` in conf/spark-defaults.conf or --conf spark.dla.oss.endpoint=<value>
--oss-upload-path The user oss path where the resource will upload
If you want to upload a local jar package to the OSS directory,
you need to specify this parameter. It is same as `ossUploadPath` in conf/spark-defaults.conf or --conf spark.dla.oss.upload.path=<value>
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of jars to include on the driver
and executor classpaths.
--conf PROP=VALUE Arbitrary Spark configuration property, or you can set conf in conf/spark-defaults.conf
--help, -h Show this help message and exit.
--driver-resource-spec Indicates the resource specifications used by the driver:
small | medium | large | xlarge | 2xlarge
you can also set this value through --conf spark.driver.resourceSpec=<value>
--executor-resource-spec Indicates the resource specifications used by the executor:
small | medium | large | xlarge | 2xlarge
you can also set this value through --conf spark.executor.resourceSpec=<value>
--num-executors Number of executors to launch, you can also set this value through --conf spark.executor.instances=<value>
--driver-memory MEM Memory for driver (e.g. 1000M, 2G)
you can also set this value through --conf spark.driver.memory=<value>
--driver-cores NUM Number of cores used by the driver
you can also set this value through --conf spark.driver.cores=<value>
--driver-java-options Extra Java options to pass to the driver
you can also set this value through --conf spark.driver.extraJavaOptions=<value>
--executor-memory MEM Memory per executor (e.g. 1000M, 2G)
you can also set this value through --conf spark.executor.memory=<value>
--executor-cores NUM Number of cores per executor.
you can also set this value through --conf spark.executor.cores=<value>
--properties-file Spark default conf file location, only local files are supported, default conf/spark-defaults.conf
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps
--files FILES Comma-separated list of files to be placed in the working
directory of each executor. File paths of these files
in executors can be accessed via SparkFiles.get(fileName).
Specially, you can pass in a custom log output format file named `log4j.properties`
Note: The file name must be `log4j.properties` to take effect
--archives Comma separated list of archives to be extracted into the
working directory of each executor. Support file types: zip, tgz, tar, tar.gz
--status job_id If given, requests the status and details of the job specified
--verbose Print additional debug output
--version, -v Print out the dla-spark-toolkit version.
List Spark Job Only:
--list List Spark Job, should use specify --vcName and --regionId
--pagenumber, -pn Set page number which want to list (default: 1)
--pagesize, -ps Set page size which want to list (default: 1)
Get Job Log Only:
--get-log job_id Get job log
Kill Spark Job Only:
--kill job_id Specifies the jobid to be killed
Spark Offline SQL options:
-e <quoted-query-string> SQL from command line. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
which requires the user to specified the --oss-upload-path
-f <filename> SQL from files. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
which requires the user to specified the --oss-upload-path
-d,--define <key=value> Variable substitution to apply to spark sql
commands. e.g. -d A=B or --define A=B
--hivevar <key=value> Variable substitution to apply to spark sql
commands. e.g. --hivevar A=B
--hiveconf <property=value> Use value for given property, DLA spark toolkit will add `spark.hadoop.` prefix to property
--database <databasename> Specify the database to use
--enable-inner-endpoint It means that DLA pop SDK and OSS SDK will use the endpoint of Intranet to access DLA,
you can turn on this option when you are on Alibaba cloud's ECS machine.
Inner API options:
--api-retry-times Specifies the number of retries that the client fails to call the API, default 3.
--time-out-seconds Specifies the timeout for the API(time unit is second (s)), which is considered a call failure.
default 10s.
The spark-submit CLI script automatically reads configurations from the spark-defaults.conf
file. If you use the command line to modify the parameters in the conf/spark-defaults.conf
file, the spark-submit CLI obtains the parameter values submitted by the command line.
Configure compatibility
To resolve compatibility issues with the open source spark-submit CLI, you can specify the following parameters in the spark-defaults.conf file.
--keyId #--conf spark.dla.access.key.id=<value>
--secretId #--conf spark.dla.access.secret.id=<value>
--regionId #--conf spark.dla.region.id=<value>
--vcName #--conf spark.dla.vc.name=<value>
--oss-keyId #--conf spark.dla.oss.access.key.id=<value>
--oss-secretId #--conf spark.dla.oss.access.secret.id=<value>
--oss-endpoint #--conf spark.dla.oss.endpoint=<value>
--oss-upload-path #--conf spark.dla.oss.upload.path=<value>
The following parameters are not supported or are meaningless for the serverless Spark engine of DLA. Therefore, the values of these parameters are ignored.
Useless options(these options will be ignored):
--deploy-mode
--master
--packages, please use `--jars` instead
--exclude-packages
--proxy-user
--repositories
--keytab
--principal
--queue
--total-executor-cores
--driver-library-path
--driver-class-path
--supervise
-S,--silent
-i <filename>
The driver and executors of the serverless Spark engine run on an elastic container. However, the elastic container supports only specific resource specifications. For more information about the resource specifications supported by the serverless Spark engine, see Overview. To resolve this issue, the Alibaba Cloud DLA team adjusts the parameters related to the resource specifications in the spark-defaults.conf file. The following table describes the adjusted parameters that are used by the spark-submit CLI of DLA. The adjusted parameters work in a different way from the parameters used by the open source spark-submit CLI.
Parameter | Description |
--driver-cores/--conf spark.driver.cores | Specifies the number of CPU cores that are used for the driver. The spark-submit CLI of DLA sets this parameter to a value that is closest to and greater than or equal to the user-specified number. |
--driver-memory/--conf spark.driver.memory | Specifies the memory that is used for the driver. The spark-submit CLI of DLA sets this parameter to a value that is closest to and greater than or equal to the user-specified memory. |
--executor-cores/--conf spark.executor.cores | Specifies the number of CPU cores that are used for each executor. The spark-submit CLI of DLA sets this parameter to a value that is closest to and greater than or equal to the user-specified number. |
--executor-memory/--conf spark.executor.memory | Specifies the memory that is used for each executor. The spark-submit CLI sets this parameter to a value that is closest to but greater than or equal to the user-specified memory. |
The following table describes the parameters that are supported only by the spark-submit CLI of DLA.
Parameter | Description |
--driver-resource-spec | Specifies the resource specifications of the driver. The priority of this parameter is higher than that of the |
--executor-resource-spec | Specifies the resource specifications of the driver. The priority of this parameter is higher than that of the |
--api-retry-times | Specifies the number of times a failed command can be rerun. The commands that are used to submit jobs cannot be rerun because a job submission operation is not an idempotent operation. If a job fails to be submitted due to network timeout, the job may still be successfully executed in the background. To prevent a job from being repeatedly submitted, the commands used to submit jobs cannot be rerun if they fail. You need to use --list to obtain the jobs that are submitted. You can also go to the DLA console to check whether jobs are successfully submitted based on the job list. |
--time-out-seconds | Specifies the default network timeout period (in seconds) of the spark-submit CLI. Default value: 10. If the network times out, the command may fail and be rerun. |
--enable-inner-endpoint | Specifies whether to access DLA POP SDK and OSS SDK over an internal network. You can specify this parameter if your services are deployed on an Elastic Compute Service (ECS) instance. After you specify this parameter, the spark-submit CLI accesses DLA POP SDK and OSS SDK over an internal network. This makes the network connection more stable. |
--list | Obtains the list of jobs. This parameter is usually used with the --pagesize and --pagenumber parameters. The --pagesize parameter specifies the number of pages to display the job list. The --pagenumber parameter specifies the number of jobs displayed on each page. By default, the --pagesize parameter is set to 1 and the --pagenumber parameter is set to 10. This indicates that 10 jobs on the first page are returned. |
--kill | Kills a job based on the ID of the job. |
--get-log | Obtains logs of a job based on the ID of the job. |
--status | Obtains the details of a job based on the ID of the job. |
Submit a job
On the Submit job page, submit a Spark job by using the configurations in the JSON format. The following sample code provides an example.
{ "name": "xxx", "file": "oss://{bucket-name}/jars/xxx.jar", "jars": "oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar" "className": "xxx.xxx.xxx.xxx.xxx", "args": [ "xxx", "xxx" ], "conf": { "spark.executor.instances": "1", "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://{bucket-name}/path/to/log/" } }
If you use the spark-submit CLI to submit a job, the job configurations are in the following format.
./bin/spark-submit \ --class xxx.xxx.xxx.xxx.xxx \ --verbose \ --name xxx \ --jars oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar --conf spark.driver.resourceSpec=medium \ --conf spark.executor.instances=1 \ --conf spark.executor.resourceSpec=medium \ oss://{bucket-name}/jars/xxx.jar \ args0 args1 ## The main program file can be a JAR package that is specified by the --jars parameter or a file that is specified by the --py-files or --files parameter. The main program file can be saved in a local directory or an Object Storage Service (OSS) directory. ## You must specify an absolute path for a local file. When you use the spark-submit CLI, the local file is automatically uploaded to the specified OSS directory. ## You can use the --oss-upload-path or ossUploadPath parameter in the spark-defaults.conf file to specify the OSS directory. ## When a local file is being uploaded, the file content is verified by using MD5. If a file that has the same name and MD5 value as your local file exists in the specified OSS directory, the file upload is canceled. ## If you manually update the JAR package in the specified OSS directory, delete the MD5 file that corresponds to the JAR package. ## Format: --jars /path/to/local/directory/XXX.jar,/path/to/local/directory/XXX.jar ## Separate multiple file names with commas (,) and specify an absolute path for each file. ## The --jars, --py-files, and --files parameters also allow you to specify a local directory from which all files are uploaded. Files in subdirectories are not recursively uploaded. ## You must specify an absolute path for the directory. Example: --jars /path/to/local/directory/,/path/to/local/directory2/ ## Separate multiple directories with commas (,) and specify an absolute path for each directory. ## View the program output. You can use the Spark web UI listed in the following output to access the Spark web UI of the job and view the job details to check whether the parameters submitted by the job meet your expectations. Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: running { "jobId": "", "jobName": "SparkPi", "status": "running", "detail": "", "sparkUI": "", "createTime": "2020-08-20 14:12:07", "updateTime": "2020-08-20 14:12:07", ... } Job Detail: { "name": "SparkPi", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": "1", "spark.executor.resourceSpec": "medium" }, "file": "", "sparkUI":"https://xxx" }
Exit codes for jobs submitted by the spark-submit CLI:
255 # Indicates that a job fails to run. 0 # Indicates that a job succeeds. 143 # Indicates that a job is killed.
NoteFor more information about how to use an AccessKey pair to submit jobs as a Resource Access Management (RAM) user, see Grant permissions to a RAM user (detailed version).
If you use the spark-submit CLI to upload a JAR package from your local directory, you must authorize the RAM user to access OSS. You can attach the AliyunOSSFullAccess policy to the RAM user on the Users page of the RAM console.
Kill a Spark job
Run the following command to kill a Spark job:
./spark-submit \
--kill <jobId>
The following result is returned:
## Display the result.
Info: kill job: jxxxxxxxx, response: null
Query Spark jobs
You can use command lines to query Spark jobs. For example, you can run the following command to query the one job on the first page:
./bin/spark-submit \
--list --pagenumber 1 --pagesize 1
The following result is returned:
## Display the result.
{
"requestId": "",
"dataResult": {
"pageNumber": "1",
"pageSize": "1",
"totalCount": "251",
"jobList": [
{
"createTime": "2020-08-20 11:02:17",
"createTimeValue": "1597892537000",
"detail": "",
"driverResourceSpec": "large",
"executorInstances": "4",
"executorResourceSpec": "large",
"jobId": "",
"jobName": "",
"sparkUI": "",
"status": "running",
"submitTime": "2020-08-20 11:01:58",
"submitTimeValue": "1597892518000",
"updateTime": "2020-08-20 11:22:01",
"updateTimeValue": "1597893721000",
"vcName": ""
}
]
}
}
Obtain the parameters and Spark web UI of a submitted job
Run the following command to obtain the parameters and Spark web UI of a submitted job:
./bin/spark-submit --status <jobId>
The following result is returned:
## Display the result.
Info: job status: success
Info:
{
"jobId": "jxxxxxxxx",
"jobName": "drop database if exists `",
"status": "success",
"detail": "xxxxxx",
"sparkUI": "xxxxxxx",
"createTime": "2021-05-08 20:02:28",
"updateTime": "2021-05-08 20:04:05",
"submitTime": "2021-05-08 20:02:28",
"createTimeValue": "1620475348180",
"updateTimeValue": "1620475445000",
"submitTimeValue": "1620475348180",
"vcName": "release-test"
}
Info: Job Detail:
set spark.sql.hive.metastore.version=dla;
set spark.dla.connectors=oss;
set spark.executor.instances=1;
set spark.sql.hive.metastore.version = dla;
set spark.dla.eni.enable = true;
set spark.dla.eni.security.group.id = xxxx ;
set spark.dla.eni.vswitch.id = xxxxx;
drop database if exists `my_hdfs_db_1` CASCADE;
Obtain job logs
Run the following command to obtain the parameters and Spark web UI of a submitted job:
./bin/spark-submit --get-log <jobId>
The following result is returned:
## Display the result.
20/08/20 06:24:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/08/20 06:24:58 INFO SparkContext: Running Spark version 2.4.5
20/08/20 06:24:58 INFO SparkContext: Submitted application: Spark Pi
20/08/20 06:24:58 INFO SecurityManager: Changing view acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing view acls groups to:
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls groups to:
20/08/20 06:24:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); groups with view permissions: Set(); users with modify permissions: Set(spark); groups with modify permissions: Set()
...