All Products
Search
Document Center

E-MapReduce:Use the LivyOperator of Apache Airflow to submit a job

Last Updated:Jul 15, 2024

Apache Airflow is a powerful workflow automation and scheduling tool that allows developers to orchestrate, schedule, and monitor the running of data pipelines. E-MapReduce (EMR) Serverless Spark provides a serverless computing environment for processing large-scale data processing jobs. This topic describes how to use the LivyOperator of Apache Airflow to enable automatic job submission to EMR Serverless Spark. This way, you can automate job scheduling and running to manage data processing jobs more efficiently.

Background information

Apache Livy can interact with Spark over a REST interface. This greatly simplifies the communication between Spark and application servers. For more information about Livy REST APIs, see REST API.

Prerequisites

Procedure

Step 1: Create a gateway and a token

  1. Create a gateway.

    1. Go to the Compute page.

      1. Log on to the EMR console.

      2. In the left-side navigation pane, choose EMR Serverless > Spark.

      3. On the Spark page, click the name of the workspace that you want to manage.

      4. In the left-side navigation pane of the EMR Serverless Spark page, click Compute.

    2. On the Compute page, click Gateways.

    3. On the Gateways tab, click Create Gateway.

    4. On the Create Gateway page, specify a gateway name in the Name field and click Create. In this example, the gateway name is Livy-gateway.

  2. Create a token.

    1. On the Gateways page, click Tokens in the Actions column of Livy-gateway.

    2. On the Tokens tab, click Create Token.

    3. In the Create Token dialog box, specify a token name in the Name field and click OK. In this example, the token name is Livy-token.

    4. Copy the token.

      Important

      After you create a token, you must immediately copy the token. You cannot view the token later. If your token expires or is lost, reset the token or create a new token.

Step 2: Configure Apache Airflow

  1. Run the following command to install Apache Livy in the Apache Airflow environment:

    pip install apache-airflow-providers-apache-livy
  2. Add a connection.

    Use the UI

    In the Airflow web UI, find the default connection whose ID is livy_default and modify the connection properties. You can also create a connection with the Airflow web UI. For more information, see Creating a Connection with the UI.

    You can modify or configure the following connection properties:

    • Host: Enter the value specified in the Endpoint parameter of the gateway.

    • Schema: Enter https.

    • Extra: Enter a JSON string. Use the token that you copied in the previous step for x-acs-spark-livy-token.

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    Use the CLI

    Use the Airflow command-line interface (CLI) to run commands to establish a connection. For more information, see Creating a Connection.

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # The endpoint of the gateway. 
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu" # The token that you copied in the previous step. 
            }
        }'

Step 3: Sample DAGs

Apache Airflow provides Directed Acyclic Graphs (DAGs), which allow you to declare how jobs should run. The following is an example of how to use the LivyOperator of Apache Airflow to run a Spark job.

Run the Python script file obtained from Object Storage Service (OSS).

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# define livy task with LivyOperator
# Replace the example values in the file based on your business requirements. 
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task
Note

file indicates the file path of your Spark job. In this example, the file path of the spark-examples_2.12-3.3.1.jar package in OSS is used. Replace it with the actual file path. For more information, see Simple upload.

Step 4: View the jobs that are submitted to EMR Serverless Spark

  1. In the left-side navigation pane of the EMR Serverless Spark page, click Job Runs.

  2. On the Development Job Runs tab of the Job Runs page, you can view all submitted jobs.

    image

References

You can also call the EmrServerlessSparkStartJobRunOperator operation provided by EMR to submit jobs to EMR Serverless Spark on Apache Airflow. For more information, see Use Apache Airflow to submit a job to EMR Serverless Spark.