本文主要介紹如何通過Python SDK提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業。
前提條件
已安裝Python環境,且Python版本為3.7及以上版本。
已建立企業版及湖倉版叢集。具體操作,請參見建立企業版或湖倉版叢集。
已建立Job型資源群組。具體操作,請參見建立資源群組。
已安裝Python SDK。具體操作,請參見AnalyticDB MySQL SDK for Python。
已設定環境變數
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。已配置Spark日誌的儲存地址。
說明配置Spark日誌儲存地址的兩種方法如下:
在AnalyticDB for MySQL控制台的Spark Jar開發頁面,單擊頁面右上方的日誌配置,設定Spark日誌的儲存地址。
使用配置項
spark.app.log.rootPath
指定一個OSS路徑來儲存Spark作業的執行日誌。
樣本
以下為提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業的完整範例程式碼。
from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
import os
def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
"""
提交Spark SQL作業
:param client: 阿里雲用戶端
:param cluster_id: 叢集ID
:param rg_name: 資源群組名稱
:param sql: SQL
:return: Spark作業ID
:rtype: basestring
:exception ClientException
"""
# 初始化請求內容
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
# 提交SQL擷取結果
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# 擷取Spark作業ID
print(response)
return response.body.data.app_id
def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
"""
提交Spark作業
:param client: 阿里雲用戶端
:param cluster_id: 叢集ID
:param rg_name: 資源群組名稱
:param json_conf: JSON配置
:return: Spark作業ID
:rtype: basestring
:exception ClientException
"""
# 初始化請求內容
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json_conf,
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
# 提交SQL擷取結果
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# 擷取Spark作業ID
print(response)
return response.body.data.app_id
def get_status(client: Client, app_id):
"""
查詢Spark作業的狀態
:param client: 阿里雲用戶端
:param app_id: Spark作業ID
:return: Spark作業的狀態
:rtype: basestring
:exception ClientException
"""
# 初始化請求內容
print(app_id)
request = GetSparkAppStateRequest(app_id=app_id)
# 擷取Spark作業的狀態
response: GetSparkAppStateResponse = client.get_spark_app_state(request)
print(response)
return response.body.data.state
def get_log(client: Client, app_id):
"""
查詢Spark作業的日誌資訊
:param client: 阿里雲用戶端
:param app_id: Spark作業ID
:return: Spark作業的日誌資訊
:rtype: basestring
:exception ClientException
"""
# 初始化請求內容
request = GetSparkAppLogRequest(app_id=app_id)
# 擷取Spark作業的日誌資訊
response: GetSparkAppLogResponse = client.get_spark_app_log(request)
print(response)
return response.body.data.log_content
def kill_app(client: Client, app_id):
"""
結束Spark作業
:param client: 阿里雲用戶端
:param app_id: Spark作業ID
:return: Spark作業的狀態
:exception ClientException
"""
# 初始化請求內容
request = KillSparkAppRequest(app_id=app_id)
# 擷取Spark作業的狀態
response: KillSparkAppResponse = client.kill_spark_app(request)
print(response)
return response.body.data.state
def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
"""
查詢Spark歷史作業
:param client: 阿里雲用戶端
:param cluster_id: 叢集ID
:param page_number: 頁碼,取值為正整數,預設值為1
:param page_size: 每頁記錄數
:return: Spark作業詳細資料
:exception ClientException
"""
# 初始化請求內容
request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=page_number,
page_size=page_size
)
# 擷取Spark作業詳細資料
response: ListSparkAppsResponse = client.list_spark_apps(request)
print("Total App Number:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print(app_info.app_id)
print(app_info.state)
print(app_info.detail)
if __name__ == '__main__':
# client config
config = Config(
# 從環境變數ALIBABA_CLOUD_ACCESS_KEY_ID中擷取AccessKey ID
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 從環境變數ALIBABA_CLOUD_ACCESS_KEY_SECRET中擷取AccessKey Secret
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# 串連地址,cn-hangzhou為叢集所在的地區ID
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
# new client
adb_client = Client(config)
sql_str = """
-- Here is just an example of SparkSQL. Modify the content and run your spark program.
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- Here are your sql statements
show databases;
"""
json_str = """
{
"comments": [
"-- Here is just an example of SparkPi. Modify the content and run your spark program."
],
"args": [
"1000"
],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
"""
"""
提交Spark SQL作業
cluster_id: 叢集ID
rg_name: 資源群組名稱
"""
sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
print(sql_app_id)
"""
提交Spark作業
cluster_id: 叢集ID
rg_name: 資源群組名稱
"""
json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
rg_name="test", json_conf=json_str)
print(json_app_id)
# 查詢Spark作業的狀態
get_status(client=adb_client, app_id=sql_app_id)
get_status(client=adb_client, app_id=json_app_id)
"""
查詢Spark歷史作業
cluster_id: 叢集ID
page_number: 頁碼,取值為正整數。預設值為1
page_size: 每頁記錄數
"""
list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)
# 結束Spark作業
kill_app(client=adb_client, app_id=json_app_id)