This topic describes how to use AnalyticDB for MySQL SDK for Python to submit a Spark job, query the status and logs of a Spark job, terminate a Spark job, and query historical Spark jobs.
Prerequisites
Python 3.7 or later is installed.
An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster is created. For more information, see Create a Data Lakehouse Edition cluster.
A job resource group is created for the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. For more information, see Create a resource group.
AnalyticDB for MySQL SDK for Python is installed. For more information, see AnalyticDB for MySQL SDK for Python.
The environment variables
ALIBABA_CLOUD_ACCESS_KEY_ID
andALIBABA_CLOUD_ACCESS_KEY_SECRET
are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.The path to store Spark job logs is configured.
NoteYou can use one of the following methods to configure the log path:
Log on to the AnalyticDB for MySQL console and go to the Spark JAR Development page. In the upper-right corner of the page, click Log Settings to configure the log path.
Use the
spark.app.log.rootPath
parameter to specify an Object Storage Service (OSS) path to store Spark job logs.
Sample code
The following code provides examples on how to use AnalyticDB for MySQL SDK for Python to submit a Spark job, query the status and logs of a Spark job, terminate a Spark job, and query historical Spark jobs:
from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
import os
def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
"""
Submit a Spark SQL job
:param client: The Alibaba Cloud client.
:param cluster_id: The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
:param rg_name: The name of the resource group.
:param sql: SQL
:return: The ID of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request.
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
# Submit the SQL statements to obtain the results.
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# Obtain the ID of the Spark job.
print(response)
return response.body.data.app_id
def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
"""
Submit a Spark job
:param client: The Alibaba Cloud client.
:param cluster_id: The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
:param rg_name: The name of the resource group.
:param json_conf: The JSON configurations.
:return: The ID of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request.
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json_conf,
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
# Submit the SQL statements to obtain the results.
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# Obtain the ID of the Spark job.
print(response)
return response.body.data.app_id
def get_status(client: Client, app_id):
"""
Query the status of a Spark job
:param client: The Alibaba Cloud client.
:param app_id: The ID of the Spark job.
:return: The status of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request.
print(app_id)
request = GetSparkAppStateRequest(app_id=app_id)
# Obtain the status of the Spark job.
response: GetSparkAppStateResponse = client.get_spark_app_state(request)
print(response)
return response.body.data.state
def get_log(client: Client, app_id):
"""
Query the logs of a Spark job
:param client: The Alibaba Cloud client.
:param app_id: The ID of the Spark job.
:return: The logs of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request.
request = GetSparkAppLogRequest(app_id=app_id)
# Obtain the logs of the Spark job.
response: GetSparkAppLogResponse = client.get_spark_app_log(request)
print(response)
return response.body.data.log_content
def kill_app(client: Client, app_id):
"""
Terminate a Spark job
:param client: The Alibaba Cloud client.
:param app_id: The ID of the Spark job.
:return: The status of the Spark job.
:exception ClientException
"""
# Initialize the request.
request = KillSparkAppRequest(app_id=app_id)
# Obtain the status of the Spark job.
response: KillSparkAppResponse = client.kill_spark_app(request)
print(response)
return response.body.data.state
def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
"""
Query historical Spark jobs
:param client: The Alibaba Cloud client.
:param cluster_id: The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
:param page_number: The page number. Pages start from page 1. Default value: 1.
:param page_size The number of entries per page.
:return: The details of the Spark job.
:exception ClientException
"""
# Initialize the request.
request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=page_number,
page_size=page_size
)
# Obtain the details of the Spark job.
response: ListSparkAppsResponse = client.list_spark_apps(request)
print("Total App Number:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print(app_info.app_id)
print(app_info.state)
print(app_info.detail)
if __name__ == '__main__':
# client config
config = Config(
# Obtain the AccessKey ID from the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Obtain the AccessKey secret from the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# Obtain the endpoint. cn-hangzhou indicates the ID of the region where the cluster resides.
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
# new client
adb_client = Client(config)
sql_str = """
-- Here is just an example of SparkSQL. Modify the content and run your spark program.
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- Here are your sql statements
show databases;
"""
json_str = """
{
"comments": [
"-- Here is just an example of SparkPi. Modify the content and run your spark program."
],
"args": [
"1000"
],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
"""
"""
Submit a Spark SQL job
cluster_id: The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
rg_name: The name of the resource group.
"""
sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
print(sql_app_id)
"""
Submit a Spark job
cluster_id: The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
rg_name: The name of the resource group.
"""
json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
rg_name="test", json_conf=json_str)
print(json_app_id)
# Query the status of the Spark job.
get_status(client=adb_client, app_id=sql_app_id)
get_status(client=adb_client, app_id=json_app_id)
"""
Query historical Spark jobs
cluster_id: The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
page_number: The page number. Pages start from page 1. Default value: 1.
page_size: The number of entries per page.
"""
list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)
# Terminate the Spark job.
kill_app(client=adb_client, app_id=json_app_id)