All Products
Search
Document Center

DataWorks:Create an EMR Spark node

最終更新日:Dec 05, 2024

Spark is a general-purpose big data analytics engine. Spark features high performance, ease of use, and widespread use. You can use Spark to perform complex memory analysis and build large, low-latency data analysis applications. DataWorks provides E-MapReduce (EMR) Spark nodes that you can use to develop and periodically schedule Spark tasks in DataWorks. This topic describes how to create an EMR Spark node and provides examples on how the features of an EMR Spark node work.

Prerequisites

  • An EMR cluster is registered to DataWorks. For more information, see Register an EMR cluster to DataWorks.

  • (Required if you use a RAM user to develop tasks) The RAM user is added to the DataWorks workspace as a member and is assigned the Development or Workspace Manager role. The Workspace Manager role has more permissions than necessary. Exercise caution when you assign the Workspace Manager role. For more information about how to add a member, see Add workspace members and assign roles to them.

  • A resource group is purchased and configured. The configurations include association with a workspace and network configuration. For more information, see Create and use a serverless resource group.

  • A workflow is created. Development operations in different types of compute engines are performed based on workflows in DataStudio. Therefore, before you create a node, you must create a workflow. For more information, see Create a workflow.

  • If you want to use a specific development environment to develop a task, you can create a custom image in the DataWorks console. For more information, see Manage images.

Limits

  • This type of node can be run only on a serverless resource group or an exclusive resource group for scheduling. We recommend that you use a serverless resource group.

  • If you want to manage metadata for a DataLake or custom cluster in DataWorks, you must configure EMR-HOOK in the cluster first. If you do not configure EMR-HOOK in the cluster, metadata cannot be displayed in real time, audit logs cannot be generated, and data lineages cannot be displayed in DataWorks. In addition, the related EMR governance tasks cannot be run. For more information about how to configure EMR-HOOK, see Use the Spark SQL extension feature to record data lineage and historical access information.

  • You cannot view data lineages of a Spark cluster that is created on the EMR on ACK page or data lineages of an EMR Serverless Spark cluster.

  • For Spark clusters that are created on the EMR on ACK page and EMR Serverless Spark clusters, you can use only the Object Storage Service (OSS) REF method to reference OSS resources and upload resources to OSS. You cannot upload resources to Hadoop Distributed File System (HDFS).

  • For DataLake and custom clusters, you can use the OSS REF method to reference OSS resources and upload resources to OSS or HDFS.

Preparations: Prepare Spark task code and obtain a JAR package

Before you use DataWorks to schedule an EMR Spark task, you must prepare Spark task code in EMR and compile the task code to generate a JAR package. For more information about preparation of Spark task code, see Overview.

Note

You must upload the obtained JAR package to the DataWorks console. This way, DataWorks can periodically schedule EMR Spark tasks.

Step 1: Create an EMR Spark node

  1. Go to the DataStudio page.

    Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and Governance > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

  2. Create an EMR Spark node.

    1. Find the desired workflow, right-click the workflow name, and then choose Create Node > EMR > EMR Spark.

      Note

      Alternatively, you can move the pointer over the Create icon and choose Create Node > EMR > EMR Spark.

    2. In the Create Node dialog box, configure the Name, Engine Instance, Node Type, and Path parameters. Click Confirm. The configuration tab of the EMR Spark node appears.

      Note

      The node name can contain letters, digits, underscores (_), and periods (.).

Step 2: Develop a Spark task

You can use one of the following methods based on your business requirements to develop a Spark task on the configuration tab of the EMR Spark node:

Method 1: Upload and reference an EMR JAR resource

DataWorks allows you to upload a resource from your on-premises machine to DataStudio before you reference the resource. You must obtain and store the JAR package that is generated after the code of a Spark task is compiled in EMR. The method for storing a JAR package varies based on the size of the JAR package.

You can upload the JAR package to the DataWorks console as an EMR JAR resource and commit the resource. You can also store the JAR package in HDFS of EMR. For a Spark cluster that is created on the EMR on ACK page or an EMR Serverless Spark cluster, you cannot upload resources to HDFS.

A JAR package is less than 200 MB in size

  1. Create an EMR JAR resource.

    You can upload the JAR package from your on-premises machine to the DataWorks console as an EMR JAR resource. This way, you can manage the JAR package in the DataWorks console in a visualized manner. After you create an EMR JAR resource, you must commit the resource. For more information, see Create and use an EMR resource.

    image.png

    Note

    The first time you create an EMR JAR resource, you must perform authorization as prompted first if you want the JAR package to be stored in OSS after the JAR package is uploaded.

  2. Reference the EMR JAR resource.

    1. Double-click the name of the created EMR Spark node to go to the configuration tab of the node.

    2. Find the desired EMR JAR resource under Resource in the EMR folder, right-click the resource name, and then select Insert Resource Path.

    3. Resource reference code is automatically added to the configuration tab of the EMR Spark node. Sample code:

      ##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"}
      spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

      If the automatic addition of the preceding code is successful, the resource is referenced. spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar is the name of the JAR package that you uploaded.

    4. Rewrite the code of the EMR Spark node and add the spark-submit command. The following sample code is only for reference.

      Note

      You cannot add comments when you write code for an EMR Spark node. If you add comments, an error is reported when you run the EMR Spark node. You can refer to the following sample code to rewrite the code of an EMR Spark node.

      ##@resource_reference{"spark-examples_2.11-2.4.0.jar"}
      spark-submit --class org.apache.spark.examples.SparkPi --master yarn  spark-examples_2.11-2.4.0.jar 100

      Components:

      • org.apache.spark.examples.SparkPi: the main class of the task in the compiled JAR package.

      • spark-examples_2.11-2.4.0.jar: the name of the JAR package that you uploaded.

      • You can keep the settings of other parameters unchanged. You can also run the following command to view the help documentation for using the spark-submit command and modify the spark-submit command based on your business requirements.

        Note
        • If you want to use a parameter that is simplified by running the spark-submit command, such as --executor-memory 2G, in an EMR Spark node, you need to add the parameter to the code of the EMR Spark node.

        • You can use Spark nodes on YARN to submit jobs only if your nodes are in cluster mode.

        • If you commit a node by using spark-submit, we recommend that you set deploy-mode to cluster rather than client.

        spark-submit --help

        image.png

A JAR package is greater than or equal to 200 MB in size

  1. Store the JAR package in HDFS of EMR.

    You cannot upload the JAR package from your on-premises machine to the DataWorks console as a DataWorks resource. We recommend that you store the JAR package in HDFS of EMR and record the storage path of the JAR package. This way, you can reference the JAR package in this path when you use DataWorks to schedule Spark tasks.

  2. Reference the JAR package.

    You can reference the JAR package by specifying the storage path of the JAR package in the code of an EMR Spark node.

    1. Double-click the name of the created EMR Spark node to go to the configuration tab of the node.

    2. Write the spark-submit command. Example:

      spark-submit --master yarn
      --deploy-mode cluster
      --name SparkPi
      --driver-memory 4G
      --driver-cores 1
      --num-executors 5
      --executor-memory 4G
      --executor-cores 1
      --class org.apache.spark.examples.JavaSparkPi
      hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100

      Parameter description:

      • hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar: the storage path of the JAR package in HDFS.

      • org.apache.spark.examples.JavaSparkPi: the main class of the task in the compiled JAR package.

      • Other parameters are configured in the EMR cluster that is used. You can modify the parameters based on your business requirements. You can also run the following command to view the help documentation for using the spark-submit command and modify the spark-submit command based on your business requirements.

        Important
        • If you want to use a parameter that is simplified by running the spark-submit command, such as --executor-memory 2G, in an EMR Spark node, you need to add the parameter to the code of the EMR Spark node.

        • You can use Spark nodes on YARN to submit jobs only if your nodes are in cluster mode.

        • If you commit a node by using spark-submit, we recommend that you set deploy-mode to cluster rather than client.

        spark-submit --help

        image.png

Method 2: Reference an OSS resource

The current node can reference an OSS resource by using the OSS REF method. When you run a task on the node, DataWorks automatically loads the OSS resource specified in the node code. This method is commonly used in scenarios in which JAR dependencies are required in EMR tasks or EMR tasks need to depend on scripts.

  1. Develop a JAR package.

    1. Prepare code dependencies.

      You can access the EMR cluster and view required code dependencies in the /usr/lib/emr/spark-current/jars/ path of the master node. The following information uses Spark 3.4.2 as an example. You must open an existing IntelliJ IDEA project, and add Project Object Model (POM) dependencies and reference plug-ins.

      Add POM dependencies

      <dependencies>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-core_2.12</artifactId>
                  <version>3.4.2</version>
              </dependency>
              <!-- Apache Spark SQL -->
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.12</artifactId>
                  <version>3.4.2</version>
              </dependency>
      </dependencies>

      Reference plug-ins

      <build>
              <sourceDirectory>src/main/scala</sourceDirectory>
              <testSourceDirectory>src/test/scala</testSourceDirectory>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <version>3.7.0</version>
                      <configuration>
                          <source>1.8</source>
                          <target>1.8</target>
                      </configuration>
                  </plugin>
                  <plugin>
                      <artifactId>maven-assembly-plugin</artifactId>
                      <configuration>
                          <descriptorRefs>
                              <descriptorRef>jar-with-dependencies</descriptorRef>
                          </descriptorRefs>
                      </configuration>
                      <executions>
                          <execution>
                              <id>make-assembly</id>
                              <phase>package</phase>
                              <goals>
                                  <goal>single</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
                  <plugin>
                      <groupId>net.alchim31.maven</groupId>
                      <artifactId>scala-maven-plugin</artifactId>
                      <version>3.2.2</version>
                      <configuration>
                          <recompileMode>incremental</recompileMode>
                      </configuration>
                      <executions>
                          <execution>
                              <goals>
                                  <goal>compile</goal>
                                  <goal>testCompile</goal>
                              </goals>
                              <configuration>
                                  <args>
                                      <arg>-dependencyfile</arg>
                                      <arg>${project.build.directory}/.scala_dependencies</arg>
                                  </args>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
    2. Write code. Sample code:

      package com.aliyun.emr.example.spark
      
      import org.apache.spark.sql.SparkSession
      
      object SparkMaxComputeDemo {
        def main(args: Array[String]): Unit = {
          // Create a Spark session.
          val spark = SparkSession.builder()
            .appName("HelloDataWorks")
            .getOrCreate()
      
          // Display the Spark version.
          println(s"Spark version: ${spark.version}")
        }
      }
    3. Package the code into a JAR file.

      After you write and save the preceding code, package the code into a JAR file. In this example, a file named SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar is generated.

  2. Upload the JAR file.

    1. Log on to the OSS console. In the top navigation bar, select a desired region. Then, in the left-side navigation pane, click Buckets.

    2. On the Buckets page, find the desired bucket and click the bucket name to go to the Objects page.

      In this example, the onaliyun-bucket-2 bucket is used.

    3. On the Objects page, click Create Directory to create a directory that is used to store the JAR file.

      In the Create Directory panel, set Directory Name to emr/jars and click OK.

    4. Upload the JAR file to the created directory.

      Go to the created directory. Click Upload Object. In the Files to Upload section, click Select Files and add the SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar file. Then, click Upload Object.

  3. Reference the JAR file.

    1. Write code that is used to reference the JAR file.

      On the configuration tab of the EMR Spark node, write code that is used to reference the JAR file.

      spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar

      Parameter description:

      Parameter

      Description

      class

      The full name of the main class to be executed.

      master

      The running mode of the Spark application.

      ossref file path

      Format: ossref://{endpoint}/{bucket}/{object}

      • endpoint: the endpoint of OSS. If the endpoint parameter is left empty, only a resource in an OSS bucket that resides in the same region as the current EMR cluster can be referenced.

      • bucket: a container that is used to store objects in OSS. Each bucket has a unique name. You can log on to the OSS console to view all buckets within the current logon account.

      • object: a file name or path that is stored in a bucket.

    2. Run a task on the EMR Spark node.

      After you write code, click the image icon and select a created serverless resource group to run a task on the EMR Spark node. After the task finishes running, record application IDs that are displayed in the console, such as application_1730367929285_xxxx.

    3. View results.

      Create an EMR Shell node and run the yarn logs -applicationId application_1730367929285_xxxx command on the EMR Shell node to view running results.

      image

(Optional) Configure advanced parameters

You can configure Spark-specific parameters on the Advanced Settings tab of the configuration tab of the current node. For more information about how to configure the parameters, see Spark Configuration. The following table describes the advanced parameters that can be configured for different types of EMR clusters.

DataLake cluster or custom cluster: created on the EMR on ECS page

Advanced parameter

Description

queue

The scheduling queue to which jobs are committed. Default value: default.

If you have configured a workspace-level YARN queue when you register an EMR cluster to a DataWorks workspace, the following configurations apply:

  • If you select Yes for Global Settings Task Precedence, the YARN queue that is configured when you register the EMR cluster is used to run Spark tasks.

  • If you do not select Yes for Global Settings Task Precedence, the YARN queue that is configured for the EMR Spark node is used to run Spark tasks.

For information about EMR YARN, see YARN schedulers. For information about queue configuration when you register an EMR cluster, see Configure a global YARN queue.

priority

The priority. Default value: 1.

FLOW_SKIP_SQL_ANALYZE

The manner in which SQL statements are executed. Valid values:

  • true: Multiple SQL statements are executed at a time.

  • false (default): Only one SQL statement is executed at a time.

Note

This parameter is available only for testing in the development environment of a DataWorks workspace.

USE_GATEWAY

This parameter is not supported.

Others

  • You can also add a custom Spark parameter for the EMR Spark node on the Advanced Settings tab, such as spark.eventLog.enabled : false . When you commit the code of the EMR Spark node, DataWorks adds the custom parameter to the code in the --conf key=value format.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

Hadoop cluster: created on the EMR on ECS page

Advanced parameter

Description

queue

The scheduling queue to which jobs are committed. Default value: default.

If you have configured a workspace-level YARN queue when you register an EMR cluster to a DataWorks workspace, the following configurations apply:

  • If you select Yes for Global Settings Task Precedence, the YARN queue that is configured when you register the EMR cluster is used to run Spark tasks.

  • If you do not select Yes for Global Settings Task Precedence, the YARN queue that is configured for the EMR Spark node is used to run Spark tasks.

For information about EMR YARN, see YARN schedulers. For information about queue configuration when you register an EMR cluster, see Configure a global YARN queue.

priority

The priority. Default value: 1.

FLOW_SKIP_SQL_ANALYZE

The manner in which SQL statements are executed. Valid values:

  • true: Multiple SQL statements are executed at a time.

  • false: Only one SQL statement is executed at a time.

Note

This parameter is available only for testing in the development environment of a DataWorks workspace.

USE_GATEWAY

Specifies whether to use a gateway cluster to commit jobs on the current node. Valid values:

  • true: Use a gateway cluster to commit jobs.

  • false: Use no gateway cluster to commit jobs. Jobs are automatically committed to the master node.

Note

If the EMR cluster to which the node belongs is not associated with a gateway cluster but the USE_GATEWAY parameter is set to true, jobs may fail to be committed.

Others

  • You can also add a custom Spark parameter for the EMR Spark node on the Advanced Settings tab, such as spark.eventLog.enabled : false . When you commit the code of the EMR Spark node, DataWorks adds the custom parameter to the code in the --conf key=value format.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

Spark cluster: created on the EMR on ACK page

Advanced parameter

Description

queue

This parameter is not supported.

priority

This parameter is not supported.

FLOW_SKIP_SQL_ANALYZE

The manner in which SQL statements are executed. Valid values:

  • true: Multiple SQL statements are executed at a time.

  • false: Only one SQL statement is executed at a time.

Note

This parameter is available only for testing in the development environment of a DataWorks workspace.

USE_GATEWAY

This parameter is not supported.

Others

  • You can also add a custom Spark parameter for the EMR Spark node on the Advanced Settings tab, such as spark.eventLog.enabled : false . When you commit the code of the EMR Spark node, DataWorks adds the custom parameter to the code in the --conf key=value format.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

EMR Serverless Spark cluster

For more information about parameter settings, see the Step 3: Submit a Spark job section of the "Use the spark-submit CLI to submit a Spark job" topic.

Advanced parameter

Description

queue

The scheduling queue to which jobs are committed. Default value: dev_queue.

priority

The priority. Default value: 1.

FLOW_SKIP_SQL_ANALYZE

The manner in which SQL statements are executed. Valid values:

  • true: Multiple SQL statements are executed at a time.

  • false: Only one SQL statement is executed at a time.

Note

This parameter is available only for testing in the development environment of a DataWorks workspace.

USE_GATEWAY

This parameter is not supported.

SERVERLESS_RELEASE_VERSION

The version of the Spark engine. By default, the value specified by the Default Engine Version parameter on the Register EMR Cluster page is used. To go to the Register EMR Cluster page, you can perform the following operations: Go to the SettingCenter page. In the left-side navigation pane, click Cluster Management. On the Cluster Management page, click Register Cluster and select E-MapReduce in the Select Cluster Type dialog box. You can configure this parameter to specify different engine versions for different types of tasks.

SERVERLESS_QUEUE_NAME

The resource queue. By default, the value specified by the Default Resource Queue parameter on the Register EMR Cluster page is used. You can add queues to meet resource isolation and management requirements. For more information, see Manage resource queues.

Others

  • You can also add a custom Spark parameter for the EMR Spark node on the Advanced Settings tab, such as spark.eventLog.enabled : false . When you commit the code of the EMR Spark node, DataWorks adds the custom parameter to the code in the --conf key=value format.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

Execute SQL statements

  1. Click the 高级运行 icon in the top toolbar. In the Parameters dialog box, select a created resource group for scheduling and click Run.

    Note
    • If you want to access a data source over the Internet or a virtual private cloud (VPC), you must use the resource group for scheduling that is connected to the data source. For more information, see Network connectivity solutions.

    • If you want to change the resource group in subsequent operations, you can click the 高级运行 (Run with Parameters) icon to change the resource group in the Parameters dialog box.

    • If you use an EMR Spark node to query data, a maximum of 10,000 data records can be returned, and the total size of the returned data records cannot exceed 10 MB.

  2. Click the 保存 icon in the top toolbar to save SQL statements.

  3. Optional. Perform smoke testing.

    You can perform smoke testing on the node in the development environment when you commit the node or after you commit the node. For more information, see Perform smoke testing.

Step 3: Configure scheduling properties

If you want the system to periodically run a task on the node, you can click Properties in the right-side navigation pane on the configuration tab of the node to configure task scheduling properties based on your business requirements. For more information, see Overview.

Note

You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.

Step 4: Deploy the task

After a task on a node is configured, you must commit and deploy the task. After you commit and deploy the task, the system runs the task on a regular basis based on scheduling configurations.

  1. Click the 保存 icon in the top toolbar to save the task.

  2. Click the 提交 icon in the top toolbar to commit the task.

    In the Submit dialog box, configure the Change description parameter. Then, determine whether to review task code after you commit the task based on your business requirements.

    Note
    • You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.

    • You can use the code review feature to ensure the code quality of tasks and prevent task execution errors caused by invalid task code. If you enable the code review feature, the task code that is committed can be deployed only after the task code passes the code review. For more information, see Code review.

If you use a workspace in standard mode, you must deploy the task in the production environment after you commit the task. To deploy a task on a node, click Deploy in the upper-right corner of the configuration tab of the node. For more information, see Deploy nodes.

What to do next

After you commit and deploy the task, the task is periodically run based on the scheduling configurations. You can click Operation Center in the upper-right corner of the configuration tab of the corresponding node to go to Operation Center and view the scheduling status of the task. For more information, see View and manage auto triggered tasks.

References