E-MapReduce(EMR)コンソールで、または API オペレーションを呼び出すことによって、Spark タスクを送信できます。Alibaba Cloud は、API をカプセル化するために、さまざまな言語の SDK を提供しています。このトピックでは、Python に基づいて API オペレーションを呼び出して Spark タスクを送信する方法について説明します。
前提条件
アクセスキーペアが作成されていること。詳細については、アクセスキーペアの作成をご参照ください。
説明Alibaba Cloud アカウントのアクセスキーペアの漏洩によるセキュリティリスクを防ぐために、RAM ユーザーを作成し、EMR Serverless Spark にアクセスするために必要な権限を RAM ユーザーに付与することをお勧めします。その後、RAM ユーザーのアクセスキーペアを使用して、目的の SDK を呼び出すことができます。
RAM ユーザーを作成し、RAM ユーザーのアクセスキーペアを取得する方法については、RAM ユーザーの作成およびアクセスキーペアの取得をご参照ください。
RAM ユーザーに権限を付与する方法については、RAM ユーザーへの権限の付与をご参照ください。
Python 3 が準備されていること。
ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認します。詳細については、Linux、macOS、および Windows での環境変数の設定をご参照ください。
Python 用 EMR Serverless Spark SDK のインストール
次のコマンドを実行して、Python 用 SDK をインストールします。
pip install alibabacloud_emr_serverless_spark20230808==1.0.0例
次のサンプルコードは、参考のためにのみ提供されています。ビジネス要件に基づいてサンプルコードを変更できます。
EMR Serverless Spark のエンドポイントについては、エンドポイントをご参照ください。
# -*- coding: utf-8 -*-
import os
from typing import List
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_emr_serverless_spark20230808.client import Client
from alibabacloud_emr_serverless_spark20230808.models import (
StartJobRunRequest,
Tag,
JobDriver,
JobDriverSparkSubmit,
)
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
# ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定されていることを確認します。
# プロジェクトコードが漏洩した場合、アクセスキーペアが漏洩し、アカウント内のすべてのリソースのセキュリティが侵害される可能性があります。次のサンプルコードは、環境変数を使用してアクセスキーペアを取得し、アクセスキーペアを使用して API オペレーションを呼び出す方法の例を示しています。サンプルコードは参考のためにのみ提供されています。より高いセキュリティを提供する Security Token Service(STS)を使用することをお勧めします。
# エンドポイントの環境変数を、EMR Serverless Spark が利用可能なリージョンの ID に置き換えます。
def create_client() -> Client:
config = open_api_models.Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
config.endpoint = f'emr-serverless-spark.cn-hangzhou.aliyuncs.com'
return Client(config)
def example_jar():
print("Let's run a simple test...") # 簡単なテストを実行してみましょう...
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
["1"],
"--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="JAR",
name="emr-spark-task",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def example_sql():
print("Let's run a simple test...") # 簡単なテストを実行してみましょう...
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/sql/show_db.sql",
["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
"--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
# configuration_overrides = StartJobRunRequestConfigurationOverrides([StartJobRunRequestConfigurationOverridesConfigurations("test", "test", "test")])
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-sql-test",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver,
# configuration_overrides=configuration_overrides
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def example_py():
print("Let's run a simple test...") # 簡単なテストを実行してみましょう...
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
["50"],
"--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="PYTHON",
name="emr-spark-task",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
example_jar()
# example_sql()
# example_py()