Spark on MaxCompute作業可通過Local模式、Cluster模式執行,此外,您也可在DataWorks中運行Spark on MaxCompute離線作業(Cluster模式),以便與其它類型執行節點整合和調度。本文為您介紹如何通過DataWorks實現Spark on MaxCompute作業的配置與調度。
前提條件
已建立ODPS Spark節點,詳情請參見建立並管理MaxCompute節點。
使用限制
當ODPS Spark節點選擇Spark3.x版本時,若提交節點報錯,請提交工單聯絡技術支援人員升級獨享調度資源群組的版本。
背景資訊
Spark on MaxCompute是MaxCompute提供的相容開源Spark的計算服務。它在統一的計算資源和資料集許可權體系基礎上,提供Spark計算架構,支援您以熟悉的開發使用方式提交運行Spark作業,滿足更豐富的資料處理分析需求。在DataWorks中,您可通過ODPS Spark節點實現Spark on MaxCompute任務的調度運行,以及與其他作業的整合操作。
Spark on MaxCompute支援使用Java、Scala和Python語言進行開發,並通過Local、Cluster模式運行任務,在DataWorks中運行Spark on MaxCompute離線作業時採用Cluster模式執行。更多關於Spark on MaxCompute運行模式的介紹,詳情請參見運行模式。
準備工作
ODPS Spark節點支援使用Java/Scala
和Python
語言運行Spark on MaxCompute離線作業,不同語言開發步驟及配置介面存在差異,您可根據業務需要選擇使用。
Java/Scala
在ODPS Spark節點執行Java或Scala語言類型代碼前,您需先在本地開發好Spark on MaxCompute作業代碼,再通過DataWorks上傳為MaxCompute的資源。步驟如下:
準備開發環境。
根據所使用系統類別型,準備運行Spark on MaxCompute任務的開發環境,詳情請參見搭建Linux開發環境、搭建Windows開發環境。
開發Java/Scala代碼。
在ODPS Spark節點執行Java或Scala語言類型代碼前,需先在本地或已有環境開發好Spark on MaxCompute代碼,建議使用Spark on MaxCompute提供的專案樣本工程模板進行開發。
打包代碼並上傳至DataWorks。
代碼開發完成後,需將其打包,並通過DataWorks上傳為MaxCompute資源,詳情請參見建立並使用MaxCompute資源。
Python(使用預設Python環境實現)
DataWorks可通過將代碼線上寫入DataWorks Python資源的方式,實現PySpark作業開發,並通過ODPS Spark節點提交運行該代碼邏輯。DataWorks上建立Python資源,詳情請參見建立並使用MaxCompute資源;PySpark開發樣本,詳情請參見PySpark開發樣本。
該方式使用DataWorks提供的預設Python環境,可直接依賴的三方包有限,若預設環境無法滿足PySpark作業第三方依賴包的需求,可參考下文《開發語言:Python(使用自訂Python環境實現)》方式,自行準備Python環境執行任務。當然,您也可選擇對Python資源支援性更好的PyODPS 2節點和PyODPS 3節點。
Python(使用自訂Python環境實現)
若平台提供的預設Python環境無法滿足您的業務需求,則可根據如下步驟自訂Python環境,執行Spark on MaxCompute任務。
本地準備Python環境。
您可參考PySpark Python版本和依賴支援,根據業務需要配置可用的Python環境。
打包環境並上傳至DataWorks。
將Python環境壓縮為一個Zip包,並通過DataWorks上傳為MaxCompute資源,作為後續運行Spark on MaxCompute任務的執行環境。詳情請參見建立並使用MaxCompute資源。
配置項說明
DataWorks運行Spark on MaxCompute離線作業採用Cluster模式,在Cluster模式中,您需指定自訂程式入口main
。main
運行結束(即狀態為Success
或Fail
)時,對應的Spark作業便會結束。此外,spark-defaults.conf
中的配置需逐條加到ODPS Spark節點配置項中。例如,Executor
的數量、記憶體大小和spark.hadoop.odps.runtime.end.point
的配置。
您無需上傳spark-defaults.conf
檔案,而是需將spark-defaults.conf
檔案中的配置都逐條加到ODPS Spark節點的配置項中。
參數 | 描述 | 對應的spark-submit命令 |
spark版本 | 包括Spark1.x、Spark2.x、Spark3.x版本。 說明 當ODPS Spark節點選擇Spark3.x版本時,若提交節點報錯,請提交工單聯絡技術支援人員升級獨享調度資源群組的版本。 | — |
語言 | 此處選擇Java/Scala或Python。請根據實際Spark on MaxCompute開發語言進行選擇。 | — |
選擇主資源 | 指定任務所使用的主JAR資源檔或主Python資源。 此處的資源檔需提前上傳至DataWorks並已提交,詳情請參見建立並使用MaxCompute資源。 |
|
配置項 | 指定提交作業時的配置項。其中:
|
|
Main Class | 配置主類名稱。當開發語言為 |
|
參數 | 您可根據需要添加參數,多個參數之間用空格分隔。DataWorks支援使用調度參數,此處參數配置格式為${變數名}。配置完成後需在右側導覽列 處給變數賦值。 說明 調度參數支援的賦值格式請參見調度參數支援的格式。 |
|
選擇其他資源 | 您可根據需要,選擇使用如下資源。
此處的資源檔需提前上傳至DataWorks並已提交,詳情請參見建立並使用MaxCompute資源。 | 不同資源分別對應如下命令:
|
編輯代碼:簡單樣本
以下以一個簡單樣本為您介紹ODPS Spark節點的使用:判斷一個字串是否可以轉換為數字。
建立資源。
在資料開發頁面建立Python類型的資源,並命名為spark_is_number.py,詳情請參見建立並使用MaxCompute資源。代碼如下:
# -*- coding: utf-8 -*- import sys from pyspark.sql import SparkSession try: # for python 2 reload(sys) sys.setdefaultencoding('utf8') except: # python 3 not needed pass if __name__ == '__main__': spark = SparkSession.builder\ .appName("spark sql")\ .config("spark.sql.broadcastTimeout", 20 * 60)\ .config("spark.sql.crossJoin.enabled", True)\ .config("odps.exec.dynamic.partition.mode", "nonstrict")\ .config("spark.sql.catalogImplementation", "odps")\ .getOrCreate() def is_number(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False print(is_number('foo')) print(is_number('1')) print(is_number('1.3')) print(is_number('-1.37')) print(is_number('1e3'))
儲存並提交資源。
在已建立的ODPS Spark節點中,根據配置項說明配置節點參數和調度配置參數,並儲存提交節點。
配置項
說明
spark版本
Spark2.x
語言
Python
選擇主python資源
在下拉式清單中選擇上述已建立的python資源spark_is_number.py
進入開發環境的營運中心,執行補資料,具體操作請參見執行補資料並查看補資料執行個體(新版)。
說明由於資料開發中的ODPS Spark節點沒有運行入口,因此您需要在開發環境的營運中心執行Spark任務。
查看返回結果。
待補資料執行個體運行成功後,進入其作業記錄的tracking URL中查看運行結果,如下:
False True True True True
編輯代碼:進階樣本
更多情境的Spark on MaxCompute任務開發,請參考:
後續步驟
當您完成當前節點的任務開發後,通常您可進行以下操作。
調度配置:配置節點的周期性調度屬性。任務需要周期性調度運行時,您需要設定節點後續實際運行過程中的重跑屬性、調度依賴關係等,操作詳情請參見任務調度屬性配置概述。
任務調試:對當前節點的代碼進行測試回合,確認代碼邏輯符合預期,操作詳情請參見任務調試流程。
任務發布:完成所有開發相關操作後,您需要將所有任務節點進行發布,發布後節點即會根據調度配置結果進行周期性運行,操作詳情請參見發布任務。
Spark作業診斷:MaxCompute為Spark作業提供Logview工具以及Spark Web-UI,您可通過作業日誌檢查作業是否已正常提交並執行。