All Products
Search
Document Center

AnalyticDB:Use AnalyticDB for MySQL SDK for Python to develop Spark applications

Last Updated:May 30, 2024

This topic describes how to use AnalyticDB for MySQL SDK for Python to submit a Spark job, query the status and logs of a Spark job, terminate a Spark job, and query historical Spark jobs.

Prerequisites

  • Python 3.7 or later is installed.

  • An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster is created. For more information, see Create a Data Lakehouse Edition cluster.

  • A job resource group is created for the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. For more information, see Create a resource group.

  • AnalyticDB for MySQL SDK for Python is installed. For more information, see AnalyticDB for MySQL SDK for Python.

  • The environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.

  • The path to store Spark job logs is configured.

    Note

    You can use one of the following methods to configure the log path:

    • Log on to the AnalyticDB for MySQL console and go to the Spark JAR Development page. In the upper-right corner of the page, click Log Settings to configure the log path.

    • Use the spark.app.log.rootPath parameter to specify an Object Storage Service (OSS) path to store Spark job logs.

Sample code

The following code provides examples on how to use AnalyticDB for MySQL SDK for Python to submit a Spark job, query the status and logs of a Spark job, terminate a Spark job, and query historical Spark jobs:

from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
    GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
    KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

import os


def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
    """
    Submit a Spark SQL job

    :param client:             The Alibaba Cloud client.
    :param cluster_id:         The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
    :param rg_name:            The name of the resource group.
    :param sql:                SQL
    :return:                   The ID of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request.
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=sql,
        app_type="SQL",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # Submit the SQL statements to obtain the results.
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # Obtain the ID of the Spark job.
    print(response)
    return response.body.data.app_id


def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
    """
    Submit a Spark job

    :param client:             The Alibaba Cloud client.
    :param cluster_id:         The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
    :param rg_name:            The name of the resource group.
    :param json_conf:          The JSON configurations.
    :return:                   The ID of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request.
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=json_conf,
        app_type="BATCH",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # Submit the SQL statements to obtain the results.
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # Obtain the ID of the Spark job.
    print(response)
    return response.body.data.app_id


def get_status(client: Client, app_id):
    """
    Query the status of a Spark job

    :param client:             The Alibaba Cloud client.
    :param app_id:             The ID of the Spark job.
    :return:                   The status of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request.
    print(app_id)
    request = GetSparkAppStateRequest(app_id=app_id)
    # Obtain the status of the Spark job.
    response: GetSparkAppStateResponse = client.get_spark_app_state(request)
    print(response)
    return response.body.data.state


def get_log(client: Client, app_id):
    """
    Query the logs of a Spark job

    :param client:             The Alibaba Cloud client.
    :param app_id:             The ID of the Spark job.
    :return:                   The logs of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request.
    request = GetSparkAppLogRequest(app_id=app_id)

    # Obtain the logs of the Spark job.
    response: GetSparkAppLogResponse = client.get_spark_app_log(request)
    print(response)
    return response.body.data.log_content


def kill_app(client: Client, app_id):
    """
    Terminate a Spark job

    :param client:             The Alibaba Cloud client.
    :param app_id:             The ID of the Spark job.
    :return:                   The status of the Spark job.
    :exception                 ClientException
    """

    # Initialize the request.
    request = KillSparkAppRequest(app_id=app_id)

    # Obtain the status of the Spark job.
    response: KillSparkAppResponse = client.kill_spark_app(request)
    print(response)
    return response.body.data.state


def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
    """
    Query historical Spark jobs

    :param client:             The Alibaba Cloud client.
    :param cluster_id:         The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
    :param page_number:        The page number. Pages start from page 1. Default value: 1.
    :param page_size           The number of entries per page.
    :return:                   The details of the Spark job.
    :exception                 ClientException
    """

    # Initialize the request.
    request = ListSparkAppsRequest(
        dbcluster_id=cluster_id,
        page_number=page_number,
        page_size=page_size
    )

    # Obtain the details of the Spark job.
    response: ListSparkAppsResponse = client.list_spark_apps(request)
    print("Total App Number:", response.body.data.page_number)
    for app_info in response.body.data.app_info_list:
        print(app_info.app_id)
        print(app_info.state)
        print(app_info.detail)


if __name__ == '__main__':
    # client config
    config = Config(
        # Obtain the AccessKey ID from the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable.
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        # Obtain the AccessKey secret from the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable.
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
        # Obtain the endpoint. cn-hangzhou indicates the ID of the region where the cluster resides.
        endpoint="adb.cn-hangzhou.aliyuncs.com"
    )

    # new client
    adb_client = Client(config)

    sql_str = """
        -- Here is just an example of SparkSQL. Modify the content and run your spark program.
        set spark.driver.resourceSpec=medium;
        set spark.executor.instances=2;
        set spark.executor.resourceSpec=medium;
        set spark.app.name=Spark SQL Test;
        -- Here are your sql statements
        show databases;
    """

    json_str = """
    {
        "comments": [
            "-- Here is just an example of SparkPi. Modify the content and run your spark program."
        ],
        "args": [
            "1000"
        ],
        "file": "local:///tmp/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }
    }
    """
    """
    Submit a Spark SQL job

    cluster_id:    The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
    rg_name:       The name of the resource group.
    """

    sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
    print(sql_app_id)

    """
    Submit a Spark job

    cluster_id:    The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
    rg_name:       The name of the resource group.
    """

    json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
                                   rg_name="test", json_conf=json_str)
    print(json_app_id)

    # Query the status of the Spark job.
    get_status(client=adb_client, app_id=sql_app_id)
    get_status(client=adb_client, app_id=json_app_id)

    """
    Query historical Spark jobs
    cluster_id:      The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.     
    page_number:     The page number. Pages start from page 1. Default value: 1.
    page_size:       The number of entries per page.
    """

    list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)

    # Terminate the Spark job.
    kill_app(client=adb_client, app_id=json_app_id)