すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:AnalyticDB for MySQL SDK for Pythonを使用したSparkアプリケーションの開発

最終更新日:Jun 12, 2024

このトピックでは、AnalyticDB for MySQL SDK for Pythonを使用してSparkジョブを送信し、Sparkジョブのステータスとログを照会し、Sparkジョブを終了し、履歴Sparkジョブを照会する方法について説明します。

前提条件

  • Python 3.7以降がインストールされます。

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが作成されます。 詳細については、「Data Lakehouse Editionクラスターの作成」をご参照ください。

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • AnalyticDB for MySQL SDK for Pythonがインストールされています。 詳細については、「AnalyticDB For MySQL SDK for Python」をご参照ください。

  • 環境変数ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRETが設定されています。 詳細については、「Linux、macOS、およびWindowsでの環境変数の設定」をご参照ください。

  • Sparkジョブログを保存するパスが設定されています。

    説明

    次のいずれかの方法を使用して、ログパスを設定できます。

    • AnalyticDB for MySQLコンソールにログインし、Spark JAR Developmentページに移動します。 ページの右上隅にある [ログ設定] をクリックして、ログパスを設定します。

    • spark.app.log.rootPathパラメーターを使用して、Sparkジョブログを保存するObject Storage Service (OSS) パスを指定します。

サンプルコード

次のコードでは、AnalyticDB for MySQL SDK for Pythonを使用してSparkジョブを送信し、Sparkジョブのステータスとログを照会し、Sparkジョブを終了し、Sparkジョブの履歴を照会する方法の例を示します。

alibabacloud_adb20211201.modelsからの
は、SubmitSparkAppRequest、SubmitSparkAppResponse、GetSparkAppStateRequest、\をインポートします。
    GetSparkAppStateResponse、GetSparkAppLogResponse、GetSparkAppLogRequest、KillSparkAppRequest、\
    KillSparkAppResponse、ListSparkAppsRequest、ListSparkAppsResponse
alibabacloud_tea_openapi.modelsからのインポートConfig
からalibabacloud_adb20211201.clientインポートクライアント

osのインポート


def submit_spark_sql (クライアント: client, cluster_id, rg_name, sql):
    """
    Spark SQLジョブの送信

    : param client: Alibaba Cloudクライアント。
    : param cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
    : param rg_name: リソースグループの名前。
    : param sql: SQL
    : return: SparkジョブのID。
    : rtype: basestring
    : exception ClientException
    """

    # リクエストを初期化します。
    request = SubmitSparkAppRequest (
        dbcluster_id=cluster_id、
        resource_group_name=rg_name,
        data=sql、
        app_type="SQL" 、
        agent_source="Python SDK" 、
        agent_version="1.0.0"
    )

    # SQL文を送信して結果を取得します。
    response: SubmitSparkAppResponse = client.submit_spark_app (リクエスト)
    # SparkジョブのIDを取得します。
    print(response)
    return response.body.data.app_id


def submit_spark_jar (クライアント: クライアント、cluster_id: str、rg_name: str、json_conf: str):
    """
    Sparkジョブを送信する

    : param client: Alibaba Cloudクライアント。
    : param cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
    : param rg_name: リソースグループの名前。
    : param json_conf: JSON設定。
    : return: SparkジョブのID。
    : rtype: basestring
    : exception ClientException
    """

    # リクエストを初期化します。
    request = SubmitSparkAppRequest (
        dbcluster_id=cluster_id、
        resource_group_name=rg_name,
        data=json_conf、
        app_type="BATCH" 、
        agent_source="Python SDK" 、
        agent_version="1.0.0"
    )

    # SQL文を送信して結果を取得します。
    response: SubmitSparkAppResponse = client.submit_spark_app (リクエスト)
    # SparkジョブのIDを取得します。
    print(response)
    return response.body.data.app_id


def get_status (クライアント: クライアント、app_id):
    """
    Sparkジョブのステータスの照会

    : param client: Alibaba Cloudクライアント。
    : param app_id: SparkジョブのID。
    : return: Sparkジョブのステータス。
    : rtype: basestring
    : exception ClientException
    """

    # リクエストを初期化します。
    print(app_id)
    request = GetSparkAppStateRequest(app_id=app_id)
    # Sparkジョブのステータスを取得します。
    レスポンス: GetSparkAppStateResponse = client.get_spark_app_state (リクエスト)
    print(response)
    return response.body.data.state


def get_log (クライアント: クライアント、app_id):
    """
    Sparkジョブのログを照会する

    : param client: Alibaba Cloudクライアント。
    : param app_id: SparkジョブのID。
    : return: Sparkジョブのログ。
    : rtype: basestring
    : exception ClientException
    """

    # リクエストを初期化します。
    request = GetSparkAppLogRequest(app_id=app_id)

    # Sparkジョブのログを取得します。
    レスポンス: GetSparkAppLogResponse = client.get_spark_app_log (リクエスト)
    print(response)
    return response.body.data.log_content


def kill_app (クライアント: クライアント、app_id):
    """
    Sparkジョブを終了する

    : param client: Alibaba Cloudクライアント。
    : param app_id: SparkジョブのID。
    : return: Sparkジョブのステータス。
    : exception ClientException
    """

    # リクエストを初期化します。
    request = KillSparkAppRequest(app_id=app_id)

    # Sparkジョブのステータスを取得します。
    応答: KillSparkAppResponse = client.kill_spark_app (リクエスト)
    print(response)
    return response.body.data.state


def list_apps (クライアント: クライアント、cluster_id: str、page_number: int、page_size: int):
    """
    履歴Sparkジョブの照会

    : param client: Alibaba Cloudクライアント。
    : param cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
    : param page_number: ページ番号。 ページは 1 ページ目から始まります。 デフォルト値は 1 です。
    : param page_sizeページあたりのエントリ数。
    : return: Sparkジョブの詳細。
    : exception ClientException
    """

    # リクエストを初期化します。
    request = ListSparkAppsRequest (
        dbcluster_id=cluster_id、
        page_number=page_number、
        page_size=page_size
    )

    # Sparkジョブの詳細を取得します。
    応答: ListSparkAppsResponse = client.list_spark_apps (リクエスト)
    print("Total App Number:", response.body.data.page_number)
    response.body.data.app_info_listのapp_infoの場合:
        print(app_info.app_id)
        プリント (app_info.state)
        プリント (app_info.detail)


if __name__ ='__main__':
    # クライアント設定
    config = Config (
        # ALIBABA_CLOUD_ACCESS_KEY_ID環境変数からAccessKey IDを取得します。
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID '] 、
        # ALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数からAccessKey secretを取得します。
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET '] 、
        # エンドポイントを取得します。 cn-hangzhouは、クラスターが存在するリージョンのIDを示します。
        endpoint="adb.cn-hangzhou.aliyuncs.com"
    )

    # 新しいクライアント
    adb_client=クライアント (設定)

    sql_str = """
        -- SparkSQLの例にすぎません。 コンテンツを変更し、sparkプログラムを実行します。
        spark.driver.resourceSpec=mediumを設定します。
        set spark.exe cutor.instances=2;
        set spark.exe cutor.resourceSpec=medium;
        se t spark.app.name=Spark SQLテスト;
        -- ここにあなたのsqlステートメントがあります
        show databases;
    """

    json_str = """
    {
        "comments": [
            "-これはSparkPiのほんの一例です。 コンテンツを変更し、スパークプログラムを実行します。
        ],
        "args": [
            「1000」
        ],
        "file": "local:/// tmp/spark-examples.jar" 、
        "name": "SparkPi" 、
        "className": "org.apache.spark.examples.SparkPi" 、
        "conf": {
            "spark.driver.resourceSpec": "medium" 、
            "spark.exe cutor.instances": 2、
            "spark.exe cutor.resourceSpec": "medium"
        }
    }
    """
    """
    Spark SQLジョブの送信

    cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
    rg_name: リソースグループの名前。
    """

    sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c ****", rg_name="test", sql=sql_str)
    print(sql_app_id)

    """
    Sparkジョブを送信する

    cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。
    rg_name: リソースグループの名前。
    """

    json_app_id = submit_spark_jar(client=adb_client、cluster_id="amv-bp1wo70f0k3c ****" 、
                                   rg_name="test", json_conf=json_str)
    print(json_app_id)

    # Sparkジョブのステータスを照会します。
    get_status(client=adb_client, app_id=sql_app_id)
    get_status(client=adb_client, app_id=json_app_id)

    """
    履歴Sparkジョブの照会
    cluster_id: AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのID。     
    page_number: ページ番号。 ページは 1 ページ目から始まります。 デフォルト値は 1 です。
    page_size: 1ページあたりのエントリ数。
    """

    list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c ****", page_size=10, page_number=1)

    # Sparkジョブを終了します。
    kill_app(client=adb_client, app_id=json_app_id)