Spark on MaxCompute作业可通过Local模式、Cluster模式执行,此外,您也可在DataWorks中运行Spark on MaxCompute离线作业(Cluster模式),以便与其它类型执行节点集成和调度。本文为您介绍如何通过DataWorks实现Spark on MaxCompute作业的配置与调度。
前提条件
已创建ODPS Spark节点,详情请参见创建并管理MaxCompute节点。
使用限制
当ODPS Spark节点选择Spark3.x版本时,若提交节点报错,请提交工单联系技术支持人员升级独享调度资源组的版本。
背景信息
Spark on MaxCompute是MaxCompute提供的兼容开源Spark的计算服务。它在统一的计算资源和数据集权限体系基础上,提供Spark计算框架,支持您以熟悉的开发使用方式提交运行Spark作业,满足更丰富的数据处理分析需求。在DataWorks中,您可通过ODPS Spark节点实现Spark on MaxCompute任务的调度运行,以及与其他作业的集成操作。
Spark on MaxCompute支持使用Java、Scala和Python语言进行开发,并通过Local、Cluster模式运行任务,在DataWorks中运行Spark on MaxCompute离线作业时采用Cluster模式执行。更多关于Spark on MaxCompute运行模式的介绍,详情请参见运行模式。
准备工作
ODPS Spark节点支持使用Java/Scala
和Python
语言运行Spark on MaxCompute离线作业,不同语言开发步骤及配置界面存在差异,您可根据业务需要选择使用。
Java/Scala
在ODPS Spark节点执行Java或Scala语言类型代码前,您需先在本地开发好Spark on MaxCompute作业代码,再通过DataWorks上传为MaxCompute的资源。步骤如下:
准备开发环境。
根据所使用系统类型,准备运行Spark on MaxCompute任务的开发环境,详情请参见搭建Linux开发环境、搭建Windows开发环境。
开发Java/Scala代码。
在ODPS Spark节点执行Java或Scala语言类型代码前,需先在本地或已有环境开发好Spark on MaxCompute代码,建议使用Spark on MaxCompute提供的项目示例工程模板进行开发。
打包代码并上传至DataWorks。
代码开发完成后,需将其打包,并通过DataWorks上传为MaxCompute资源,详情请参见创建并使用MaxCompute资源。
Python(使用默认Python环境实现)
DataWorks可通过将代码在线写入DataWorks Python资源的方式,实现PySpark作业开发,并通过ODPS Spark节点提交运行该代码逻辑。DataWorks上创建Python资源,详情请参见创建并使用MaxCompute资源;PySpark开发示例,详情请参见PySpark开发示例。
该方式使用DataWorks提供的默认Python环境,可直接依赖的三方包有限,若默认环境无法满足PySpark作业第三方依赖包的需求,可参考下文《开发语言:Python(使用自定义Python环境实现)》方式,自行准备Python环境执行任务。当然,您也可选择对Python资源支持性更好的PyODPS 2节点和PyODPS 3节点。
Python(使用自定义Python环境实现)
若平台提供的默认Python环境无法满足您的业务需求,则可根据如下步骤自定义Python环境,执行Spark on MaxCompute任务。
本地准备Python环境。
您可参考PySpark Python版本和依赖支持,根据业务需要配置可用的Python环境。
打包环境并上传至DataWorks。
将Python环境压缩为一个Zip包,并通过DataWorks上传为MaxCompute资源,作为后续运行Spark on MaxCompute任务的执行环境。详情请参见创建并使用MaxCompute资源。
配置项说明
DataWorks运行Spark on MaxCompute离线作业采用Cluster模式,在Cluster模式中,您需指定自定义程序入口main
。main
运行结束(即状态为Success
或Fail
)时,对应的Spark作业便会结束。此外,spark-defaults.conf
中的配置需逐条加到ODPS Spark节点配置项中。例如,Executor
的数量、内存大小和spark.hadoop.odps.runtime.end.point
的配置。
您无需上传spark-defaults.conf
文件,而是需将spark-defaults.conf
文件中的配置都逐条加到ODPS Spark节点的配置项中。
参数 | 描述 | 对应的spark-submit命令 |
spark版本 | 包括Spark1.x、Spark2.x、Spark3.x版本。 说明 当ODPS Spark节点选择Spark3.x版本时,若提交节点报错,请提交工单联系技术支持人员升级独享调度资源组的版本。 | — |
语言 | 此处选择Java/Scala或Python。请根据实际Spark on MaxCompute开发语言进行选择。 | — |
选择主资源 | 指定任务所使用的主JAR资源文件或主Python资源。 此处的资源文件需提前上传至DataWorks并已提交,详情请参见创建并使用MaxCompute资源。 |
|
配置项 | 指定提交作业时的配置项。其中:
|
|
Main Class | 配置主类名称。当开发语言为 |
|
参数 | 您可根据需要添加参数,多个参数之间用空格分隔。DataWorks支持使用调度参数,此处参数配置格式为${变量名}。配置完成后需在右侧导航栏 处给变量赋值。 说明 调度参数支持的赋值格式请参见调度参数支持的格式。 |
|
选择其他资源 | 您可根据需要,选择使用如下资源。
此处的资源文件需提前上传至DataWorks并已提交,详情请参见创建并使用MaxCompute资源。 | 不同资源分别对应如下命令:
|
编辑代码:简单示例
以下以一个简单示例为您介绍ODPS Spark节点的使用:判断一个字符串是否可以转换为数字。
创建资源。
在数据开发页面新建Python类型的资源,并命名为spark_is_number.py,详情请参见创建并使用MaxCompute资源。代码如下:
# -*- coding: utf-8 -*- import sys from pyspark.sql import SparkSession try: # for python 2 reload(sys) sys.setdefaultencoding('utf8') except: # python 3 not needed pass if __name__ == '__main__': spark = SparkSession.builder\ .appName("spark sql")\ .config("spark.sql.broadcastTimeout", 20 * 60)\ .config("spark.sql.crossJoin.enabled", True)\ .config("odps.exec.dynamic.partition.mode", "nonstrict")\ .config("spark.sql.catalogImplementation", "odps")\ .getOrCreate() def is_number(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False print(is_number('foo')) print(is_number('1')) print(is_number('1.3')) print(is_number('-1.37')) print(is_number('1e3'))
保存并提交资源。
在已创建的ODPS Spark节点中,根据配置项说明配置节点参数和调度配置参数,并保存提交节点。
配置项
说明
spark版本
Spark2.x
语言
Python
选择主python资源
在下拉列表中选择上述已创建的python资源spark_is_number.py
进入开发环境的运维中心,执行补数据,具体操作请参见执行补数据并查看补数据实例(新版)。
说明由于数据开发中的ODPS Spark节点没有运行入口,因此您需要在开发环境的运维中心执行Spark任务。
查看返回结果。
待补数据实例运行成功后,进入其运行日志的tracking URL中查看运行结果,如下:
False True True True True
编辑代码:进阶示例
更多场景的Spark on MaxCompute任务开发,请参考:
后续步骤
当您完成当前节点的任务开发后,通常您可进行以下操作。
调度配置:配置节点的周期性调度属性。任务需要周期性调度运行时,您需要设置节点后续实际运行过程中的重跑属性、调度依赖关系等,操作详情请参见任务调度属性配置概述。
任务调试:对当前节点的代码进行测试运行,确认代码逻辑符合预期,操作详情请参见任务调试流程。
任务发布:完成所有开发相关操作后,您需要将所有任务节点进行发布,发布后节点即会根据调度配置结果进行周期性运行,操作详情请参见发布任务。
Spark作业诊断:MaxCompute为Spark作业提供Logview工具以及Spark Web-UI,您可通过作业日志检查作业是否已正常提交并执行。