このトピックでは、AnalyticDB for MySQL SDK for Pythonを使用してSparkジョブを送信し、Sparkジョブのステータスとログを照会し、Sparkジョブを終了し、履歴Sparkジョブを照会する方法について説明します。
前提条件
Python 3.7以降がインストールされます。
AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが作成されます。 詳細については、「Data Lakehouse Editionクラスターの作成」をご参照ください。
AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQL SDK for Pythonがインストールされています。 詳細については、「AnalyticDB For MySQL SDK for Python」をご参照ください。
環境変数
ALIBABA_CLOUD_ACCESS_KEY_ID
およびALIBABA_CLOUD_ACCESS_KEY_SECRET
が設定されています。 詳細については、「Linux、macOS、およびWindowsでの環境変数の設定」をご参照ください。Sparkジョブログを保存するパスが設定されています。
説明次のいずれかの方法を使用して、ログパスを設定できます。
AnalyticDB for MySQLコンソールにログインし、Spark JAR Developmentページに移動します。 ページの右上隅にある [ログ設定] をクリックして、ログパスを設定します。
spark.app.log.rootPath
パラメーターを使用して、Sparkジョブログを保存するObject Storage Service (OSS) パスを指定します。
サンプルコード
次のコードでは、AnalyticDB for MySQL SDK for Pythonを使用してSparkジョブを送信し、Sparkジョブのステータスとログを照会し、Sparkジョブを終了し、Sparkジョブの履歴を照会する方法の例を示します。
alibabacloud_adb20211201.modelsからのは、SubmitSparkAppRequest、SubmitSparkAppResponse、GetSparkAppStateRequest、\をインポートします。
GetSparkAppStateResponse、GetSparkAppLogResponse、GetSparkAppLogRequest、KillSparkAppRequest、\
KillSparkAppResponse、ListSparkAppsRequest、ListSparkAppsResponse
alibabacloud_tea_openapi.modelsからのインポートConfig
からalibabacloud_adb20211201.clientインポートクライアント
osのインポート
def submit_spark_sql (クライアント: client, cluster_id, rg_name, sql):
"""
Spark SQLジョブの送信
: param client: Alibaba Cloudクライアント。
: param cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
: param rg_name: リソースグループの名前。
: param sql: SQL
: return: SparkジョブのID。
: rtype: basestring
: exception ClientException
"""
# リクエストを初期化します。
request = SubmitSparkAppRequest (
dbcluster_id=cluster_id、
resource_group_name=rg_name,
data=sql、
app_type="SQL" 、
agent_source="Python SDK" 、
agent_version="1.0.0"
)
# SQL文を送信して結果を取得します。
response: SubmitSparkAppResponse = client.submit_spark_app (リクエスト)
# SparkジョブのIDを取得します。
print(response)
return response.body.data.app_id
def submit_spark_jar (クライアント: クライアント、cluster_id: str、rg_name: str、json_conf: str):
"""
Sparkジョブを送信する
: param client: Alibaba Cloudクライアント。
: param cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
: param rg_name: リソースグループの名前。
: param json_conf: JSON設定。
: return: SparkジョブのID。
: rtype: basestring
: exception ClientException
"""
# リクエストを初期化します。
request = SubmitSparkAppRequest (
dbcluster_id=cluster_id、
resource_group_name=rg_name,
data=json_conf、
app_type="BATCH" 、
agent_source="Python SDK" 、
agent_version="1.0.0"
)
# SQL文を送信して結果を取得します。
response: SubmitSparkAppResponse = client.submit_spark_app (リクエスト)
# SparkジョブのIDを取得します。
print(response)
return response.body.data.app_id
def get_status (クライアント: クライアント、app_id):
"""
Sparkジョブのステータスの照会
: param client: Alibaba Cloudクライアント。
: param app_id: SparkジョブのID。
: return: Sparkジョブのステータス。
: rtype: basestring
: exception ClientException
"""
# リクエストを初期化します。
print(app_id)
request = GetSparkAppStateRequest(app_id=app_id)
# Sparkジョブのステータスを取得します。
レスポンス: GetSparkAppStateResponse = client.get_spark_app_state (リクエスト)
print(response)
return response.body.data.state
def get_log (クライアント: クライアント、app_id):
"""
Sparkジョブのログを照会する
: param client: Alibaba Cloudクライアント。
: param app_id: SparkジョブのID。
: return: Sparkジョブのログ。
: rtype: basestring
: exception ClientException
"""
# リクエストを初期化します。
request = GetSparkAppLogRequest(app_id=app_id)
# Sparkジョブのログを取得します。
レスポンス: GetSparkAppLogResponse = client.get_spark_app_log (リクエスト)
print(response)
return response.body.data.log_content
def kill_app (クライアント: クライアント、app_id):
"""
Sparkジョブを終了する
: param client: Alibaba Cloudクライアント。
: param app_id: SparkジョブのID。
: return: Sparkジョブのステータス。
: exception ClientException
"""
# リクエストを初期化します。
request = KillSparkAppRequest(app_id=app_id)
# Sparkジョブのステータスを取得します。
応答: KillSparkAppResponse = client.kill_spark_app (リクエスト)
print(response)
return response.body.data.state
def list_apps (クライアント: クライアント、cluster_id: str、page_number: int、page_size: int):
"""
履歴Sparkジョブの照会
: param client: Alibaba Cloudクライアント。
: param cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
: param page_number: ページ番号。 ページは 1 ページ目から始まります。 デフォルト値は 1 です。
: param page_sizeページあたりのエントリ数。
: return: Sparkジョブの詳細。
: exception ClientException
"""
# リクエストを初期化します。
request = ListSparkAppsRequest (
dbcluster_id=cluster_id、
page_number=page_number、
page_size=page_size
)
# Sparkジョブの詳細を取得します。
応答: ListSparkAppsResponse = client.list_spark_apps (リクエスト)
print("Total App Number:", response.body.data.page_number)
response.body.data.app_info_listのapp_infoの場合:
print(app_info.app_id)
プリント (app_info.state)
プリント (app_info.detail)
if __name__ ='__main__':
# クライアント設定
config = Config (
# ALIBABA_CLOUD_ACCESS_KEY_ID環境変数からAccessKey IDを取得します。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID '] 、
# ALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数からAccessKey secretを取得します。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET '] 、
# エンドポイントを取得します。 cn-hangzhouは、クラスターが存在するリージョンのIDを示します。
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
# 新しいクライアント
adb_client=クライアント (設定)
sql_str = """
-- SparkSQLの例にすぎません。 コンテンツを変更し、sparkプログラムを実行します。
spark.driver.resourceSpec=mediumを設定します。
set spark.exe cutor.instances=2;
set spark.exe cutor.resourceSpec=medium;
se t spark.app.name=Spark SQLテスト;
-- ここにあなたのsqlステートメントがあります
show databases;
"""
json_str = """
{
"comments": [
"-これはSparkPiのほんの一例です。 コンテンツを変更し、スパークプログラムを実行します。
],
"args": [
「1000」
],
"file": "local:/// tmp/spark-examples.jar" 、
"name": "SparkPi" 、
"className": "org.apache.spark.examples.SparkPi" 、
"conf": {
"spark.driver.resourceSpec": "medium" 、
"spark.exe cutor.instances": 2、
"spark.exe cutor.resourceSpec": "medium"
}
}
"""
"""
Spark SQLジョブの送信
cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
rg_name: リソースグループの名前。
"""
sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c ****", rg_name="test", sql=sql_str)
print(sql_app_id)
"""
Sparkジョブを送信する
cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
rg_name: リソースグループの名前。
"""
json_app_id = submit_spark_jar(client=adb_client、cluster_id="amv-bp1wo70f0k3c ****" 、
rg_name="test", json_conf=json_str)
print(json_app_id)
# Sparkジョブのステータスを照会します。
get_status(client=adb_client, app_id=sql_app_id)
get_status(client=adb_client, app_id=json_app_id)
"""
履歴Sparkジョブの照会
cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
page_number: ページ番号。 ページは 1 ページ目から始まります。 デフォルト値は 1 です。
page_size: 1ページあたりのエントリ数。
"""
list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c ****", page_size=10, page_number=1)
# Sparkジョブを終了します。
kill_app(client=adb_client, app_id=json_app_id)