すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark タスクの開始

最終更新日:Jan 11, 2025

E-MapReduce(EMR)コンソールで、または API オペレーションを呼び出すことによって、Spark タスクを送信できます。Alibaba Cloud は、API をカプセル化するために、さまざまな言語の SDK を提供しています。このトピックでは、Python に基づいて API オペレーションを呼び出して Spark タスクを送信する方法について説明します。

前提条件

  • アクセスキーペアが作成されていること。詳細については、アクセスキーペアの作成をご参照ください。

    説明

    Alibaba Cloud アカウントのアクセスキーペアの漏洩によるセキュリティリスクを防ぐために、RAM ユーザーを作成し、EMR Serverless Spark にアクセスするために必要な権限を RAM ユーザーに付与することをお勧めします。その後、RAM ユーザーのアクセスキーペアを使用して、目的の SDK を呼び出すことができます。

  • Python 3 が準備されていること。

  • ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認します。詳細については、Linux、macOS、および Windows での環境変数の設定をご参照ください。

Python 用 EMR Serverless Spark SDK のインストール

次のコマンドを実行して、Python 用 SDK をインストールします。

pip install alibabacloud_emr_serverless_spark20230808==1.0.0

次のサンプルコードは、参考のためにのみ提供されています。ビジネス要件に基づいてサンプルコードを変更できます。

EMR Serverless Spark のエンドポイントについては、エンドポイントをご参照ください。

# -*- coding: utf-8 -*-
import os

from typing import List
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_emr_serverless_spark20230808.client import Client
from alibabacloud_emr_serverless_spark20230808.models import (
    StartJobRunRequest,
    Tag,
    JobDriver,
    JobDriverSparkSubmit,
)

from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


# ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認します。
# プロジェクトコードが漏洩した場合、アクセスキーペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。次のサンプルコードは、環境変数を使用してアクセスキーペアを取得し、アクセスキーペアを使用して API オペレーションを呼び出す方法の例を示しています。サンプルコードは参考のためにのみ提供されています。より高いセキュリティを提供する Security Token Service(STS)を使用することをお勧めします。
# エンドポイントの環境変数を、EMR Serverless Spark が利用可能なリージョンの ID に置き換えます。
def create_client() -> Client:
    config = open_api_models.Config(
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
    config.endpoint = f'emr-serverless-spark.cn-hangzhou.aliyuncs.com'
    return Client(config)


def example_jar():
    print("Let's run a simple test...") # 簡単なテストを実行してみましょう...
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        ["1"],
        "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_sql():
    print("Let's run a simple test...") # 簡単なテストを実行してみましょう...
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql",
        ["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        "--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    # configuration_overrides = StartJobRunRequestConfigurationOverrides([StartJobRunRequestConfigurationOverridesConfigurations("test", "test", "test")])
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-sql-test",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver,
        # configuration_overrides=configuration_overrides
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_py():
    print("Let's run a simple test...") # 簡単なテストを実行してみましょう...
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        ["50"],
        "--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)


example_jar()
# example_sql()
# example_py()