Apache Airflow is a powerful workflow automation and scheduling tool that allows developers to orchestrate, schedule, and monitor the running of data pipelines. E-MapReduce (EMR) Serverless Spark provides a serverless computing environment for processing large-scale data processing jobs. This topic describes how to use Apache Airflow to enable automatic job submission to EMR Serverless Spark. This way, you can automate job scheduling and running to manage data processing jobs more efficiently.
Prerequisites
Airflow is installed and started. For more information, see Installation of Airflow.
A workspace is created. For more information, see Create a workspace.
Usage notes
You cannot call the EmrServerlessSparkStartJobRunOperator operation to query job logs. If you want to view job logs, you must go to the EMR Serverless Spark page and find the job run whose logs you want to view by job run ID. Then, you can check and analyze the job logs on the Logs tab of the job details page or on the Spark Jobs page in the Spark UI.
Procedure
Step 1: Configure Apache Airflow
Install the airflow-alibaba-provider plug-in on each node of Airflow.
The airflow-alibaba-provider plug-in is provided by EMR Serverless Spark. It contains the EmrServerlessSparkStartJobRunOperator component, which is used to submit jobs to EMR Serverless Spark.
pip install airflow_alibaba_provider-0.0.1-py3-none-any.whl
Add a connection.
Use the CLI
Use the Airflow command-line interface (CLI) to run commands to establish a connection. For more information, see Creating a Connection.
airflow connections add 'emr-serverless-spark-id' \ --conn-json '{ "conn_type": "emr_serverless_spark", "extra": { "auth_type": "AK", # The AccessKey pair is used for authentication. "access_key_id": "<yourAccesskeyId>", # The AccessKey ID of your Alibaba Cloud account. "access_key_secret": "<yourAccesskeyKey>", # The AccessKey secret of your Alibaba Cloud account. "region": "<yourRegion>" } }'
Use the UI
You can manually create a connection with the Airflow web UI. For more information, see Creating a Connection with the UI.
On the Add Connection page, configure the parameters.
The following table describes the parameters:
Parameter
Description
Connection Id
The connection ID. In this example, enter emr-serverless-spark-id.
Connection Type
The connection type. In this example, select Generic. If Generic is not available, you can also select Email.
Extra
The additional configuration. In this example, enter the following content:
{ "auth_type": "AK", # The AccessKey pair is used for authentication. "access_key_id": "<yourAccesskeyId>", # The AccessKey ID of your Alibaba Cloud account. "access_key_secret": "<yourAccesskeyKey>", # The AccessKey secret of your Alibaba Cloud account. "region": "<yourRegion>" }
Step 2: Configure DAGs
Apache Airflow provides Directed Acyclic Graphs (DAGs), which allow you to declare how jobs should run. The following are examples of how to call the EmrServerlessSparkStartJobRunOperator operation to run different types of Spark jobs in Apache Airflow.
Submit a JAR package
Use an Airflow task to submit a precompiled Spark JAR job to EMR Serverless Spark.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_jar"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_jar",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="JAR",
name="airflow-emr-spark-jar",
entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
entry_point_args=["1"],
spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_jar
Submit an SQL file
Run SQL commands in Airflow DAGs.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql",
entry_point=None,
entry_point_args=["-e","show tables;show tables;"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None,
)
emr_spark_sql
Submit an SQL file from OSS
Run the SQL script file obtained from OSS.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_sql_2"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql_2",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql-2",
entry_point="",
entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_sql_2
Submit a Python script from OSS
Run the Python script file obtained from OSS.
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_python"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_python = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_python",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="PYTHON",
name="airflow-emr-spark-python",
entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
entry_point_args=["1"],
spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_python
The following table describes the parameters:
Parameter | Type | Description |
|
| The unique identifier of the Airflow task. |
|
| The ID of the connection between Airflow and EMR Serverless Spark. |
|
| The region in which the EMR Spark job is created. |
|
| The interval at which Airflow queries the state of the job. Unit: seconds. |
|
| The unique identifier of the workspace to which the EMR Spark job belongs. |
|
| The ID of the resource queue used by the EMR Spark job. |
|
| The job type. SQL, Python, and JAR jobs are supported. The meaning of the entry_point parameter varies based on the job type. |
|
| The name of the EMR Spark job. |
|
| The location of the file that is used to start the job. JAR, SQL, and Python files are supported. The meaning of this parameter varies based on |
|
| The parameters that are passed to the Spark application. |
|
| The additional parameters used for the |
|
| The environment in which the job runs. If this parameter is set to True, the job runs in the production environment. In this case, the |
|
| The version of the EMR Spark engine. Default value: esr-2.1-native, which indicates an engine that runs Spark 3.3.1 and Scala 2.12 in the native runtime. |