Apache Airflow is a powerful workflow automation and scheduling tool that allows developers to orchestrate, schedule, and monitor the running of data pipelines. E-MapReduce (EMR) Serverless Spark provides a serverless computing environment for processing large-scale data processing jobs. This topic describes how to use the LivyOperator of Apache Airflow to enable automatic job submission to EMR Serverless Spark. This way, you can automate job scheduling and running to manage data processing jobs more efficiently.
Background information
Apache Livy can interact with Spark over a REST interface. This greatly simplifies the communication between Spark and application servers. For more information about Livy REST APIs, see REST API.
Prerequisites
Airflow is installed and started. For more information, see Installation of Airflow.
A workspace is created. For more information, see Create a workspace.
Procedure
Step 1: Create a gateway and a token
Create a gateway.
Go to the Compute page.
Log on to the EMR console.
In the left-side navigation pane, choose
.On the Spark page, click the name of the workspace that you want to manage.
In the left-side navigation pane of the EMR Serverless Spark page, click Compute.
On the Compute page, click Gateways.
On the Gateways tab, click Create Gateway.
On the Create Gateway page, specify a gateway name in the Name field and click Create. In this example, the gateway name is Livy-gateway.
Create a token.
On the Gateways page, click Tokens in the Actions column of Livy-gateway.
On the Tokens tab, click Create Token.
In the Create Token dialog box, specify a token name in the Name field and click OK. In this example, the token name is Livy-token.
Copy the token.
ImportantAfter you create a token, you must immediately copy the token. You cannot view the token later. If your token expires or is lost, reset the token or create a new token.
Step 2: Configure Apache Airflow
Run the following command to install Apache Livy in the Apache Airflow environment:
pip install apache-airflow-providers-apache-livy
Add a connection.
Use the UI
In the Airflow web UI, find the default connection whose ID is livy_default and modify the connection properties. You can also create a connection with the Airflow web UI. For more information, see Creating a Connection with the UI.
You can modify or configure the following connection properties:
Host: Enter the value specified in the Endpoint parameter of the gateway.
Schema: Enter https.
Extra: Enter a JSON string. Use the token that you copied in the previous step for
x-acs-spark-livy-token
.{ "x-acs-spark-livy-token": "6ac**********kfu" }
Use the CLI
Use the Airflow command-line interface (CLI) to run commands to establish a connection. For more information, see Creating a Connection.
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # The endpoint of the gateway. "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # The token that you copied in the previous step. } }'
Step 3: Sample DAGs
Apache Airflow provides Directed Acyclic Graphs (DAGs), which allow you to declare how jobs should run. The following is an example of how to use the LivyOperator of Apache Airflow to run a Spark job.
Run the Python script file obtained from Object Storage Service (OSS).
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# define livy task with LivyOperator
# Replace the example values in the file based on your business requirements.
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
file
indicates the file path of your Spark job. In this example, the file path of the spark-examples_2.12-3.3.1.jar package in OSS is used. Replace it with the actual file path. For more information, see Simple upload.
Step 4: View the jobs that are submitted to EMR Serverless Spark
In the left-side navigation pane of the EMR Serverless Spark page, click Job Runs.
On the Development Job Runs tab of the Job Runs page, you can view all submitted jobs.
References
You can also call the EmrServerlessSparkStartJobRunOperator
operation provided by EMR to submit jobs to EMR Serverless Spark on Apache Airflow. For more information, see Use Apache Airflow to submit a job to EMR Serverless Spark.