全部產品
Search
文件中心

Elasticsearch:通過ES-Hadoop實現Spark讀寫Elasticsearch資料

更新時間:Jun 30, 2024

Spark是一種通用的巨量資料計算架構,擁有Hadoop MapReduce所具有的計算優點,能夠通過記憶體快取資料為大型資料集提供快速的迭代功能。與MapReduce相比,減少了中間資料讀取磁碟的過程,進而提高了處理能力。本文介紹如何通過ES-Hadoop實現Hadoop的Spark服務讀寫Elasticsearch資料。

準備工作

  1. 建立Elasticsearch執行個體,並開啟自動建立索引功能。

    具體操作步驟請參見建立Elasticsearch執行個體配置YML參數。本文以6.7.0版本的執行個體為例。

    重要

    在生產環境中,建議關閉自動建立索引功能,提前建立好索引和Mapping。由於本文僅用於測試,因此開啟了自動建立索引功能。

  2. 建立與Elasticsearch執行個體在同一專用網路下的E-MapReduce(以下簡稱EMR)執行個體。

    執行個體配置如下:

    • 產品版本:EMR-3.29.0

    • 必選服務:Spark(2.4.5),其他服務保持預設

    具體操作步驟,請參見建立叢集

    重要

    Elasticsearch執行個體的私網訪問白名單預設為0.0.0.0/0,您可在安全配置頁面查看,如果未使用預設配置,您還需要在白名單中加入EMR叢集的內網IP地址:

  3. 準備Java環境,要求JDK版本為8.0及以上。

編寫並運行Spark任務

  1. 準備測試資料。

    1. 登入E-MapReduce控制台,擷取Master節點的IP地址,並通過SSH登入對應的ECS機器。

      具體操作步驟,請參見登入叢集

    2. 將測試資料寫入檔案中。

      本文使用的JSON資料樣本如下,將該資料儲存在http_log.txt檔案中。

      {"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
      {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
      {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
    3. 執行以下命令,將測試資料上傳至EMR Master節點的tmp/hadoop-es檔案中。

      hadoop fs -put http_log.txt /tmp/hadoop-es
  2. 配置pom依賴。

    建立Java Maven工程,並將如下的pom依賴添加到Java工程的pom.xml檔案中。

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.7.0</version>
        </dependency>
    </dependencies>
    重要

    請確保pom依賴中版本與雲端服務對應版本保持一致,例如elasticsearch-spark-20_2.11版本與Elasticsearch版本一致;spark-core_2.12與HDFS版本一致。

  3. 編寫範例程式碼。

    1. 寫資料

      以下範例程式碼用來將測試資料寫入Elasticsearch的company索引中。

      import java.util.Map;
      import java.util.concurrent.atomic.AtomicInteger;
      import org.apache.spark.SparkConf;
      import org.apache.spark.SparkContext;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      
      import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
      import org.spark_project.guava.collect.ImmutableMap;
      public class SparkWriteEs {
          public static void main(String[] args) {
              SparkConf conf = new SparkConf();
              conf.setAppName("Es-write");
              conf.set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com");
              conf.set("es.net.http.auth.user", "elastic");
              conf.set("es.net.http.auth.pass", "xxxxxx");
              conf.set("es.nodes.wan.only", "true");
              conf.set("es.nodes.discovery","false");
              conf.set("es.input.use.sliced.partitions","false");
              SparkSession ss = new SparkSession(new SparkContext(conf));
              final AtomicInteger employeesNo = new AtomicInteger(0);
              //以下的/tmp/hadoop-es/http_log.txt需要替換為您測試資料的路徑。
              JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("/tmp/hadoop-es/http_log.txt")
                      .javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees"   employeesNo.getAndAdd(1), row.mkString()));
              JavaEsSpark.saveToEs(javaRDD, "company/_doc");
          }
      }
    2. 讀資料

      以下範例程式碼用來讀取上一步寫入Elasticsearch的資料,並進行列印。

      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
      import  java.util.Map;
      
      public class ReadES {
      
          public static void main(String[] args) {
      
              SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[*]")
                      .set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com")
                      .set("es.port", "9200")
                      .set("es.net.http.auth.user", "elastic")
                      .set("es.net.http.auth.pass", "xxxxxx")
                      .set("es.nodes.wan.only", "true")
                      .set("es.nodes.discovery","false")
                      .set("es.input.use.sliced.partitions","false")
                      .set("es.resource", "company/_doc")
                      .set("es.scroll.size","500");
      
              JavaSparkContext sc = new JavaSparkContext(conf);
      
              JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
      
              for ( Map<String, Object> item : rdd.values().collect()) {
                  System.out.println(item);
              }
      
              sc.stop();
          }
      
      }
    表 1. 參數說明

    參數

    預設值

    說明

    es.nodes

    localhost

    指定Elasticsearch執行個體的訪問地址,建議使用私網地址,可在執行個體的基本資料頁面查看。更多資訊,請參見查看執行個體的基本資料

    es.port

    9200

    Elasticsearch執行個體的訪問連接埠號碼。

    es.net.http.auth.user

    elastic

    Elasticsearch執行個體的訪問使用者名稱。

    說明

    如果程式中指定elastic帳號訪問Elasticsearch服務,後續在修改elastic帳號對應密碼後需要一些時間來生效,在密碼生效期間會影響服務訪問,因此不建議通過elastic來訪問。建議在Kibana控制台中建立一個符合預期的Role角色使用者進行訪問,詳情請參見通過Elasticsearch X-Pack角色管理實現使用者權限管控

    es.net.http.auth.pass

    /

    對應使用者的密碼,在建立執行個體時指定。如果忘記可進行重設,具體操作步驟,請參見重設執行個體訪問密碼

    es.nodes.wan.only

    false

    開啟Elasticsearch叢集在雲上使用虛擬IP進行串連,是否進行節點嗅探:

    • true:設定

    • false:不設定

    es.nodes.discovery

    true

    是否禁用節點發現:

    • true:禁用

    • false:不禁用

    重要

    使用Elasticsearch,必須將此參數設定為false。

    es.input.use.sliced.partitions

    true

    是否使用slice分區:

    • true:使用。設定為true,可能會導致索引在預讀階段的時間明顯變長,有時會遠遠超出查詢資料所耗費的時間。建議設定為false,以提高查詢效率。

    • false:不使用。

    es.index.auto.create

    true

    通過Hadoop組件向Elasticsearch叢集寫入資料,是否自動建立不存在的index:

    • true:自動建立

    • false:不會自動建立

    es.resource

    /

    指定要讀寫的index和type。

    es.mapping.names

    /

    表欄位與Elasticsearch的索引欄位名映射。

    更多的ES-Hadoop配置項說明,請參見官方配置說明

  4. 將代碼打成Jar包,上傳至EMR用戶端機器(例如Gateway或EMR叢集主節點)。

  5. 在EMR用戶端機器上,運行如下命令執行Spark程式:

    • 寫資料

      cd /usr/lib/spark-current
      ./bin/spark-submit  --master yarn --executor-cores 1 --class "SparkWriteEs" /usr/local/spark_es.jar
      重要

      /usr/local/spark_es.jar需要替換為您Jar包上傳的路徑。

    • 讀資料

      cd /usr/lib/spark-current
      ./bin/spark-submit  --master yarn --executor-cores 1 --class "ReadES"  /usr/local/spark_es.jar

      讀資料成功後,列印結果如下。列印成功結果

驗證結果

  1. 登入對應Elasticsearch執行個體的Kibana控制台。

    具體操作步驟請參見登入Kibana控制台

  2. 在左側導覽列,單擊Dev Tools

  3. Console中,執行以下命令,查看通過Spark任務寫入的資料。

    GET company/_search
    {
      "query": {
        "match_all": {}
      }
    }

    查詢成功後,返回結果如下。查詢成功結果

總結

本文以Elasticsearch和EMR為例,介紹了如何通過ES-Hadoop,實現Spark讀寫Elasticsearch資料。與其他EMR組件相比,ES-Hadoop與Spark的整合,不僅包括RDD,還包括Spark Streaming、scale、DataSet與Spark SQL等,您可以根據需求進行配置。詳細資料,請參見Apache Spark support