全部產品
Search
文件中心

DataWorks:建立EMR MR節點

更新時間:Sep 03, 2024

在EMR任務開發中,通過建立EMR(E-MapReduce) MR節點,可將大規模資料集分為多個Map任務平行處理,加速資料集的並行運算。本文將以建立EMR MR節點實現從OSS中讀取文本,並統計文本中的單詞數為例,為您展示EMR MR節點的作業開發流程。

前提條件

  • 登入EMR叢集至DataWorks,詳情請參見註冊EMR叢集至DataWorks

  • (可選,RAM帳號需要)進行任務開發的RAM帳號已被添加至對應工作空間中,並具有開發空間管理員(許可權較大,謹慎添加)角色許可權,新增成員的操作詳情請參見為工作空間增加空間成員

  • 已購買Serverless資源群組並完成資源群組配置,包括綁定工作空間、網路設定等,詳情請參見新增和使用Serverless資源群組

  • 資料開發(DataStudio)中已建立商務程序,操作詳情請參見建立商務程序

  • 使用EMR MR節點進行作業開發時,如果需要引用開原始碼資源,您需先將開原始碼作為資源上傳至EMR JAR資源節點中,詳情請參見建立和使用EMR資源

  • 使用EMR MR節點進行作業開發時,如果需要引用自訂函數時,您需要先將自訂函數作為資源上傳至EMR JAR資源節點中,建立註冊此函數,詳情請參見建立EMR函數

  • 如果您使用本文的作業開發樣本執行相關作業流程,則還需要建立好OSS的儲存空間Bucket。建立OSS的儲存空間Bucket,詳情請參見控制台建立儲存空間

使用限制

  • 僅支援使用Serverless資源群組(推薦)或獨享調度資源群組運行該類型任務。

  • DataLake或自訂叢集若要在DataWorks管理中繼資料,需先在叢集側配置EMR-HOOK。若未配置,則無法在DataWorks中即時展示中繼資料、產生審計日誌、展示血緣關係、開展EMR相關治理任務。配置EMR-HOOK,詳情請參見配置Hive的EMR-HOOK

準備初始資料及JAR資源套件

準備初始資料

建立樣本檔案input01.txt,檔案內容如下。

hadoop emr hadoop dw
hive hadoop
dw emr

上傳初始資料檔案

  1. 登入OSS管理主控台單擊左側導覽列的Bucket列表

  2. 單擊目標Bucket名稱,進入檔案管理頁面。

    本文樣本使用的Bucket為onaliyun-bucket-2

  3. 單擊建立目錄,建立初始資料及JAR資源的存放目錄。

    • 配置目錄名emr/datas/wordcount02/inputs,建立初始資料的存放目錄。

    • 配置目錄名emr/jars,建立JAR資源的存放目錄。

  4. 上傳初始資料檔案至初始資料的存放目錄。

    • 進入/emr/datas/wordcount02/inputs路徑,單擊上傳檔案

    • 待上傳檔案地區單擊掃描檔案,添加input01.txt檔案至Bucket,單擊上傳檔案

使用MapReduce讀取OSS檔案並產生JAR包

  1. 開啟已建立的IDEA專案,添加pom依賴。

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!--因為EMR-MR用的是2.8.5-->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. 在MapReduce中讀寫OSS檔案,需要配置如下參數。

    重要

    風險提示: 阿里雲帳號AccessKey擁有所有API的存取權限,建議您使用RAM使用者進行API訪問或日常營運。強烈建議不要將AccessKey ID和AccessKey Secret儲存到工程代碼裡或者任何容易被泄露的地方,AccessKey泄露會威脅您帳號下所有資源的安全。以下程式碼範例僅供參考,請妥善保管好您的AccessKey資訊。

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    參數說明如下:

    • ${accessKeyId}:阿里雲帳號的AccessKey ID。

    • ${accessKeySecret}:阿里雲帳號的AccessKey Secret。

    • ${endpoint}:OSS對外服務的訪問網域名稱。由您叢集所在的地區決定,對應的OSS也需要是在叢集對應的地區,詳情請參見訪問網域名稱和資料中心

    以Java代碼為例,修改Hadoop官網WordCount樣本,即在代碼中添加AccessKey ID和AccessKey Secret的配置,以便作業有許可權訪問OSS檔案。

    package cn.apache.hadoop.onaliyun.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
    
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
    
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
                                    
  3. 編輯完上述Java代碼後將該代碼產生JAR包。樣本產生的JAR包為onaliyun_mr_wordcount-1.0-SNAPSHOT.jar

步驟一:建立EMR MR節點

  1. 進入資料開發頁面。

    登入DataWorks控制台,切換至目標地區後,單擊左側導覽列的資料建模與開發 > 資料開發,在下拉框中選擇對應工作空間後單擊進入資料開發

  2. 建立EMR MR節點。

    1. 按右鍵目標商務程序,選擇建立節點 > EMR > EMR MR

      說明

      您也可以滑鼠移至上方至建立,選擇建立節點 > EMR > EMR MR

    2. 建立節點對話方塊中,輸入名稱,並選擇引擎執行個體節點類型路徑。單擊確認,進入EMR MR節點編輯頁面。

      說明

      節點名稱支援大小寫字母、中文、數字、底線(_)和小數點(.)。

步驟二:開發EMR MR任務

在EMR MR節點編輯頁面雙擊已建立的節點,進入任務開發頁面,您可以根據不同情境需求選擇適合您的操作方案:

方案一:直接引用OSS資源

當前節點可通過OSS REF的方式直接引用OSS資源,在運行EMR節點時,DataWorks會自動載入代碼中的OSS資源至本地使用。該方式常用於“需要在EMR任務中運行JAR依賴”、“EMR任務需依賴指令碼”等情境。引用格式如下:

ossref://{endpoint}/{bucket}/{object}
  • endpoint:OSS對外服務的訪問網域名稱。Endpoint為空白時,僅支援使用與當前訪問的EMR叢集同地區的OSS,即OSS的Bucket需要與EMR叢集所在地區相同。

  • Bucket:OSS用於儲存物件的容器,每一個Bucket有唯一的名稱,登入OSS管理主控台,可查看當前登入帳號下所有Bucket

  • object:儲存在Bucket中的一個具體的對象(檔案名稱或路徑)。

方案二:先上傳資源後引用EMR JAR資源

DataWorks也支援您從本地先上傳資源至DataStudio,再引用資源。若EMR MR節點依賴的資源較大,則無法通過DataWorks頁面上傳。您可將資源存放至HDFS上,然後在代碼中進行引用。

  1. 建立EMR JAR資源。

    詳情請參見建立和使用EMR資源。樣本將本文《準備初始資料及JAR資源套件》中產生的JAR包儲存在JAR資源的存放目錄emr/jars下。首次使用需要進行一鍵授權,然後單擊點擊上傳按鈕,上傳JAR資源。建立JAR資源

  2. 引用EMR JAR資源。

    1. 開啟建立的EMR MR節點,停留在代碼編輯頁面。

    2. EMR > 資源節點下,找到待引用資源(樣本為onaliyun_mr_wordcount-1.0-SNAPSHOT.jar),右鍵選擇引用資源引用資源

    3. 選擇引用後,當EMR MR節點的代碼編輯頁面出現如下引用成功提示時,表明已成功引用代碼資源。此時,需要執行下述命令。如下命令涉及的資源套件、Bucket名稱、路徑資訊等為本文樣本的內容,使用時,您需要替換為實際使用資訊。

      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      說明

      EMR MR節點編輯代碼時不支援備註陳述式。

(可選)配置進階參數

您可在節點進階設定處配置特有屬性參數。更多屬性參數設定,請參考Spark Configuration。不同類型EMR叢集可配置的進階參數存在部分差異,具體如下表。

DataLake叢集/自訂叢集:EMR on ECS

進階參數

配置說明

queue

提交作業的調度隊列,預設為default隊列。關於EMR YARN說明,詳情請參見隊列基礎配置

priority

優先順序,預設為1。

其他

您也可以直接在進階配置裡追加自訂MR任務參數。提交代碼時DataWorks會自動在命令中通過-D key=value語句加上新增的參數。

Hadoop叢集:EMR on ECS

進階參數

配置說明

queue

提交作業的調度隊列,預設為default隊列。關於EMR YARN說明,詳情請參見隊列基礎配置

priority

優先順序,預設為1。

USE_GATEWAY

設定本節點提交作業時,是否通過Gateway叢集提交。取值如下:

  • true:通過Gateway叢集提交。

  • false(預設值):不通過Gateway叢集提交,預設提交到header節點。

說明

如果本節點所在的叢集未關聯Gateway叢集,此處手動設定參數取值為true時,後續提交EMR作業時會失敗。

執行SQL任務

  1. 在工具列單擊進階運行表徵圖,在參數對話方塊選擇已建立的調度資源群組,單擊運行

    說明
    • 訪問公用網路或VPC網路環境的資料來源需要使用與資料來源測試連通性成功的調度資源群組。詳情請參見網路連通方案

    • 如果您後續執行任務需要修改使用的資源群組,您可單擊帶參運行進階運行表徵圖,選擇需要更換的調度資源群組。

  2. 單擊儲存表徵圖,儲存編寫的SQL語句。

  3. (可選)煙霧測試 (Smoke Test)。

    如果您希望在開發環境進行煙霧測試 (Smoke Test),可在執行節點提交或節點提交後執行煙霧測試 (Smoke Test),操作詳情請參見執行煙霧測試 (Smoke Test)

步驟三:配置節點調度

如您需要周期性執行建立的節點任務,可單擊節點編輯頁面右側的調度配置,根據業務需求配置該節點任務的調度資訊。配置詳情請參見任務調度屬性配置概述

說明

您需要設定節點的重跑屬性依賴的上遊節點,才可以提交節點。

步驟四:發布節點任務

節點任務配置完成後,需執行提交發佈動作,提交發布後節點即會根據調度配置內容進行周期性運行。

  1. 單擊工具列中的儲存表徵圖,儲存節點。

  2. 單擊工具列中的提交表徵圖,提交節點任務。

    提交時需在提交對話方塊中輸入變更描述,並根據需要選擇是否在節點提交後執行程式碼檢閱。

    說明
    • 您需設定節點的重跑屬性依賴的上遊節點,才可提交節點。

    • 程式碼檢閱可對任務的代碼品質進行把控,防止由於任務代碼有誤,未經審核直接發布上線後出現任務報錯。如進行程式碼檢閱,則提交的節點代碼必須通過評審人員的審核才可發布,詳情請參見程式碼檢閱

如您使用的是標準模式的工作空間,任務提交成功後,需單擊節點編輯頁面右上方的發布,將該任務發布至生產環境執行,操作請參見發布任務

後續步驟

任務提交發布後,會基於節點的配置周期性運行,您可單擊節點編輯介面右上方的營運,進入營運中心查看周期任務的調度運行情況。詳情請參見查看並管理周期任務

查看結果

  • 登入OSS管理主控台,您可以在目標Bucket的初始資料存放目錄下查看寫入結果。樣本路徑為emr/datas/wordcount02/outputs目標Bucket

  • 在DataWorks讀取統計結果。

    1. 建立EMR Hive節點,詳情請參見建立EMR Hive節點

    2. 在EMR Hive節點中建立掛載在OSS上的Hive外表,讀取表資料。程式碼範例如下。

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT '單詞',
          `cout` STRING COMMENT '計數'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      
      SELECT * FROM wordcount02_result_tb;

      運行結果如下圖。運行結果