全部產品
Search
文件中心

AnalyticDB:通過Python SDK開發Spark應用

更新時間:Jul 06, 2024

本文主要介紹如何通過Python SDK提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業。

前提條件

  • 已安裝Python環境,且Python版本為3.7及以上版本。

  • 已建立企業版及湖倉版叢集。具體操作,請參見建立企業版或湖倉版叢集

  • 已建立Job型資源群組。具體操作,請參見建立資源群組

  • 已安裝Python SDK。具體操作,請參見AnalyticDB MySQL SDK for Python

  • 已設定環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數

  • 已配置Spark日誌的儲存地址。

    說明

    配置Spark日誌儲存地址的兩種方法如下:

    • AnalyticDB for MySQL控制台的Spark Jar開發頁面,單擊頁面右上方的日誌配置,設定Spark日誌的儲存地址。

    • 使用配置項spark.app.log.rootPath指定一個OSS路徑來儲存Spark作業的執行日誌。

樣本

以下為提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業的完整範例程式碼。

from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
    GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
    KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

import os


def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
    """
    提交Spark SQL作業

    :param client:             阿里雲用戶端
    :param cluster_id:         叢集ID
    :param rg_name:            資源群組名稱
    :param sql:                SQL
    :return:                   Spark作業ID
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化請求內容
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=sql,
        app_type="SQL",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # 提交SQL擷取結果
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # 擷取Spark作業ID
    print(response)
    return response.body.data.app_id


def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
    """
    提交Spark作業

    :param client:             阿里雲用戶端
    :param cluster_id:         叢集ID
    :param rg_name:            資源群組名稱
    :param json_conf:          JSON配置
    :return:                   Spark作業ID
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化請求內容
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=json_conf,
        app_type="BATCH",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # 提交SQL擷取結果
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # 擷取Spark作業ID
    print(response)
    return response.body.data.app_id


def get_status(client: Client, app_id):
    """
    查詢Spark作業的狀態

    :param client:             阿里雲用戶端
    :param app_id:             Spark作業ID
    :return:                   Spark作業的狀態
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化請求內容
    print(app_id)
    request = GetSparkAppStateRequest(app_id=app_id)
    # 擷取Spark作業的狀態
    response: GetSparkAppStateResponse = client.get_spark_app_state(request)
    print(response)
    return response.body.data.state


def get_log(client: Client, app_id):
    """
    查詢Spark作業的日誌資訊

    :param client:             阿里雲用戶端
    :param app_id:             Spark作業ID
    :return:                   Spark作業的日誌資訊
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化請求內容
    request = GetSparkAppLogRequest(app_id=app_id)

    # 擷取Spark作業的日誌資訊
    response: GetSparkAppLogResponse = client.get_spark_app_log(request)
    print(response)
    return response.body.data.log_content


def kill_app(client: Client, app_id):
    """
    結束Spark作業

    :param client:             阿里雲用戶端
    :param app_id:             Spark作業ID
    :return:                   Spark作業的狀態
    :exception                 ClientException
    """

    # 初始化請求內容
    request = KillSparkAppRequest(app_id=app_id)

    # 擷取Spark作業的狀態
    response: KillSparkAppResponse = client.kill_spark_app(request)
    print(response)
    return response.body.data.state


def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
    """
    查詢Spark歷史作業

    :param client:             阿里雲用戶端
    :param cluster_id:         叢集ID
    :param page_number:        頁碼,取值為正整數,預設值為1
    :param page_size:          每頁記錄數
    :return:                   Spark作業詳細資料
    :exception                 ClientException
    """

    # 初始化請求內容
    request = ListSparkAppsRequest(
        dbcluster_id=cluster_id,
        page_number=page_number,
        page_size=page_size
    )

    # 擷取Spark作業詳細資料
    response: ListSparkAppsResponse = client.list_spark_apps(request)
    print("Total App Number:", response.body.data.page_number)
    for app_info in response.body.data.app_info_list:
        print(app_info.app_id)
        print(app_info.state)
        print(app_info.detail)


if __name__ == '__main__':
    # client config
    config = Config(
        # 從環境變數ALIBABA_CLOUD_ACCESS_KEY_ID中擷取AccessKey ID
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        # 從環境變數ALIBABA_CLOUD_ACCESS_KEY_SECRET中擷取AccessKey Secret
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
        # 串連地址,cn-hangzhou為叢集所在的地區ID
        endpoint="adb.cn-hangzhou.aliyuncs.com"
    )

    # new client
    adb_client = Client(config)

    sql_str = """
        -- Here is just an example of SparkSQL. Modify the content and run your spark program.
        set spark.driver.resourceSpec=medium;
        set spark.executor.instances=2;
        set spark.executor.resourceSpec=medium;
        set spark.app.name=Spark SQL Test;
        -- Here are your sql statements
        show databases;
    """

    json_str = """
    {
        "comments": [
            "-- Here is just an example of SparkPi. Modify the content and run your spark program."
        ],
        "args": [
            "1000"
        ],
        "file": "local:///tmp/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }
    }
    """
    """
    提交Spark SQL作業

    cluster_id:    叢集ID
    rg_name:       資源群組名稱
    """

    sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
    print(sql_app_id)

    """
    提交Spark作業

    cluster_id:    叢集ID
    rg_name:       資源群組名稱
    """

    json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
                                   rg_name="test", json_conf=json_str)
    print(json_app_id)

    # 查詢Spark作業的狀態
    get_status(client=adb_client, app_id=sql_app_id)
    get_status(client=adb_client, app_id=json_app_id)

    """
    查詢Spark歷史作業
    cluster_id:      叢集ID     
    page_number:     頁碼,取值為正整數。預設值為1
    page_size:       每頁記錄數
    """

    list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)

    # 結束Spark作業
    kill_app(client=adb_client, app_id=json_app_id)