All Products
Search
Document Center

DataWorks:Create an EMR MR node

Last Updated:Nov 13, 2024

You can create an E-MapReduce (EMR) MR node to process a large dataset by using multiple parallel map tasks. EMR MR nodes help accelerate parallel computing on large datasets. This topic describes how to create an EMR MR node and develop a task on the node. In this example, the task is used to read data from an Object Storage Service (OSS) object in an OSS bucket and count the number of words in the OSS object.

Prerequisites

  • An EMR cluster is registered to DataWorks. For more information, see Register an EMR cluster to DataWorks.

  • (Required if you use a RAM user to develop tasks) The RAM user is added to the DataWorks workspace as a member and is assigned the Development or Workspace Manager role. The Workspace Manager role has more permissions than necessary. Exercise caution when you assign the Workspace Manager role. For more information about how to add a member, see Add workspace members and assign roles to them.

  • A serverless resource group is purchased and configured. The configurations include association with a workspace and network configuration. For more information, see Create and use a serverless resource group.

  • A workflow is created in DataStudio. For more information, see Create a workflow.

  • If you want to reference open source code in your EMR MR node, make sure that the open source code is uploaded as an EMR JAR resource. For more information, see Create and use an EMR resource.

  • If you want to reference user-defined functions (UDFs) in your EMR MR node, make sure that the UDFs are uploaded as EMR JAR resources and are registered with EMR. For more information about how to register a UDF, see Create an EMR function.

  • An OSS bucket is created. To use the sample code for task development in this topic, you must prepare an OSS bucket. For more information about how to create an OSS bucket, see Create buckets.

Limits

  • This type of node can be run only on a serverless resource group or an exclusive resource group for scheduling. We recommend that you use a serverless resource group.

  • If you want to manage metadata for a DataLake or custom cluster in DataWorks, you must configure EMR-HOOK in your cluster first. If you do not configure EMR-HOOK in your cluster, metadata cannot be displayed in real time, audit logs cannot be generated, and data lineages cannot be displayed in DataWorks. EMR governance tasks also cannot be run. For information about how to configure EMR-HOOK, see Use the Hive extension feature to record data lineage and historical access information.

Prepare initial data and a JAR resource package

Prepare initial data

Create a file that is named input01.txt and contains the following initial data:

hadoop emr hadoop dw
hive hadoop
dw emr

Upload the file that stores the initial data

  1. Log on to the OSS console. In the left-side navigation pane, click Buckets.

  2. On the Buckets page, find the desired bucket and click the bucket name to go to the Objects page.

    In this example, the onaliyun-bucket-2 bucket is used.

  3. On the Objects page, click Create Directory to create directories that are used to store initial data and JAR resources.

    • Set Directory Name to emr/datas/wordcount02/inputs to create a directory that is used to store initial data.

    • Set Directory Name to emr/jars to create a directory that is used to store JAR resources.

  4. Upload the file that stores the initial data to the emr/datas/wordcount02/inputs directory.

    • Go to the /emr/datas/wordcount02/inputs directory and click Upload Object.

    • In the Files to Upload section, click Select Files and upload the input01.txt file to the bucket.

Use the EMR MR node to read the OSS object and generate a JAR package

  1. Open an existing IntelliJ IDEA project and add Project Object Model (POM) dependencies.

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!--The version used by EMR MR is 2.8.5.-->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. Configure the following parameters to read data from and write data to the OSS object.

    Important

    An Alibaba Cloud account has permissions to call all API operations. If the AccessKey pair of your Alibaba Cloud account is leaked, all resources in your Alibaba Cloud account may be exposed to high security risks. We recommend that you do not save the AccessKey ID and AccessKey secret of your Alibaba Cloud account into the project code or positions that can be easily located. We recommend that you use a RAM user to call API operations or perform routine O&M. The following sample code is provided only for reference. Keep the AccessKey pair of your Alibaba Cloud account confidential.

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    Parameter descriptions:

    • ${accessKeyId}: the AccessKey ID of your Alibaba Cloud account.

    • ${accessKeySecret}: the AccessKey secret of your Alibaba Cloud account.

    • ${endpoint}: the endpoint of OSS. The endpoint is determined by the region where your EMR cluster resides. You must activate OSS in the region where your EMR cluster resides. For more information, see Regions, endpoints and open ports.

    In this topic, the Java code is used to modify the WordCount example on the Hadoop official website. The configuration of the AccessKey ID and AccessKey secret is added to the code. This grants the job the permissions to access OSS objects.

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ?  0 : 1);
        }
    }
                                    
  3. After you write the preceding code, compress the code into a JAR package. In this example, a package named onaliyun_mr_wordcount-1.0-SNAPSHOT.jar is generated.

Step 1: Create an EMR MR node

  1. Go to the DataStudio page.

    Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and Governance > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

  2. Create an EMR MR node.

    1. Find the desired workflow, right-click the name of the workflow, and then choose Create Node > EMR > EMR MR.

      Note

      Alternatively, you can move the pointer over the Create icon and choose Create Node > EMR > EMR MR.

    2. In the Create Node dialog box, configure the Name, Engine Instance, Node Type, and Path parameters. Click Confirm. The configuration tab of the EMR MR node appears.

      Note

      The node name can contain only letters, digits, underscores (_), and periods (.).

Step 2: Develop an EMR MR task

You can use one of the following methods based on your business requirements to develop an MR task on the configuration tab of the EMR MR node:

Method 1: Reference an OSS resource

The current node can reference an OSS resource by using the OSS REF method. When you run a task on the node, DataWorks automatically loads the OSS resource specified in the node code. This method is commonly used in scenarios in which JAR dependencies are required in EMR tasks or EMR tasks need to depend on scripts. Reference format:

ossref://{endpoint}/{bucket}/{object}
  • endpoint: the endpoint of OSS. If the endpoint parameter is left empty, only a resource in an OSS bucket that resides in the same region as the current EMR cluster can be referenced.

  • bucket: a container that is used to store objects in OSS. Each bucket has a unique name. You can log on to the OSS console to view all buckets within the current logon account.

  • object: a file name or path that is stored in a bucket.

Method 2: Upload and reference an EMR JAR resource

DataWorks allows you to upload a resource from your on-premises machine to DataStudio before you reference the resource. If the EMR MR node depends on large amounts of resources, the resources cannot be uploaded by using the DataWorks console. In this case, you can store the resources in Hadoop Distributed File System (HDFS) and reference the resources in the code of the EMR MR node.

  1. Create an EMR JAR resource.

    For more information about how to create an EMR JAR resource, see Create and use an EMR resource. In this example, the JAR package that is generated in the Prepare initial data and a JAR resource package section is stored in the emr/jars directory. The directory is used to store JAR resources. The first time you use an EMR JAR resource, click Authorize to authorize DataWorks to access the EMR JAR resource. Then, click Upload to upload the JAR resource.新建JAR资源

  2. Reference the JAR package.

    1. Open the EMR MR node. The configuration tab of the node appears.

    2. Find the resource that you want to reference below Resource in the EMR folder, right-click the resource name, and then select Insert Resource Path. In this example, the resource is onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.引用资源

    3. If the message shown in the following figure appears on the configuration tab of the EMR MR node, the code resource is referenced. Then, run the following code. You must replace the information in the following code with the actual information. The information includes the resource package name, bucket name, and directory.

      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      Note

      You cannot add comments when you write code for the EMR MR node.

(Optional) Configure advanced parameters

You can configure advanced parameters on the Advanced Settings tab of the configuration tab of the current node. For more information about how to configure the parameters, see Spark Configuration. The following table describes the advanced parameters that can be configured for different types of EMR clusters.

DataLake cluster or custom cluster: created on the EMR on ECS page

Advanced parameter

Description

queue

The scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN, see YARN schedulers.

priority

The priority. Default value: 1.

Others

You can add a custom parameter for the EMR MR node as an advanced parameter on the Advanced Settings tab in the DataWorks console. When you commit the code for the EMR MR node in DataWorks, DataWorks adds the custom parameter to a command in the -D key=value format.

Hadoop cluster: created on the EMR on ECS page

Advanced parameter

Description

queue

The scheduling queue to which jobs are committed. Default value: default. For information about EMR YARN, see YARN schedulers.

priority

The priority. Default value: 1.

USE_GATEWAY

Specifies whether to use a gateway cluster to commit jobs on the current node. Valid values:

  • true: Use a gateway cluster to commit jobs.

  • false (default): Use no gateway cluster to commit jobs. Jobs are automatically committed to the master node.

Note

If the EMR cluster to which the node belongs is not associated with a gateway cluster but the USE_GATEWAY parameter is set to true, jobs may fail to be committed.

Run the MR task

  1. In the toolbar, click the 高级运行 icon. In the Parameters dialog box, select the desired resource group from the Resource Group Name drop-down list and click Run.

    Note
    • If you want to access a data source over the Internet or a virtual private cloud (VPC), you must use the resource group for scheduling that is connected to the data source. For more information, see Network connectivity solutions.

    • If you want to change the resource group in subsequent operations, you can click the 高级运行 icon to change the resource group in the Parameters dialog box.

  2. Click the 保存 icon in the top toolbar to save the SQL statements.

  3. Optional. Perform smoke testing.

    You can perform smoke testing on the node in the development environment when you commit the node or after you commit the node. For more information, see Perform smoke testing.

Step 3: Configure scheduling properties

If you want the system to periodically run a task on the node, you can click Properties in the right-side navigation pane on the configuration tab of the node to configure task scheduling properties based on your business requirements. For more information, see Overview.

Note

You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.

Step 4: Deploy the task

After a task on a node is configured, you must commit and deploy the task. After you commit and deploy the task, the system runs the task on a regular basis based on scheduling configurations.

  1. Click the 保存 icon in the top toolbar to save the task.

  2. Click the 提交 icon in the top toolbar to commit the task.

    In the Submit dialog box, configure the Change description parameter. Then, determine whether to review task code after you commit the task based on your business requirements.

    Note
    • You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.

    • You can use the code review feature to ensure the code quality of tasks and prevent task execution errors caused by invalid task code. If you enable the code review feature, the task code that is committed can be deployed only after the task code passes the code review. For more information, see Code review.

If you use a workspace in standard mode, you must deploy the task in the production environment after you commit the task. To deploy a task on a node, click Deploy in the upper-right corner of the configuration tab of the node. For more information, see Deploy nodes.

What to do next

After you commit and deploy the task, the task is periodically run based on the scheduling configurations. You can click Operation Center in the upper-right corner of the configuration tab of the corresponding node to go to Operation Center and view the scheduling status of the task. For more information, see View and manage auto triggered nodes.

View the results

  • Log on to the OSS console. Then, you can view the results in the emr/datas/wordcount02/outputs directory in which the initial data is stored.目标Bucket

  • View the statistical results in the DataWorks console.

    1. Create an EMR Hive node. For more information, see Create an EMR Hive node.

    2. On the EMR Hive node, create a Hive external table that is mounted to OSS. Then, use the Hive external table to read data from Hive tables in OSS. Sample code:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'Word',
          `count` STRING COMMENT 'Count'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      The following figure shows the results.运行结果