全部產品
Search
文件中心

Elasticsearch:通過ES-Hadoop將HDFS中的資料寫入Elasticsearch

更新時間:Jun 30, 2024

ES-Hadoop是Elasticsearch推出的專門用於對接Hadoop生態的工具,可以讓資料在Elasticsearch和Hadoop之間雙向移動,無縫銜接Elasticsearch與Hadoop服務,充分使用Elasticsearch的快速搜尋及Hadoop批處理能力,實現互動式資料處理。對於一些較複雜的分析任務,需要通過MapReduce任務讀取HDFS上的JSON檔案,寫入Elasticsearch叢集。本文介紹如何通過ES-Hadoop,藉助MapReduce任務向Elasticsearch寫入資料。

操作流程

  1. 準備工作

    建立同一專用網路下的Elasticsearch和E-MapReduce(以下簡稱EMR)執行個體、開啟Elasticsearch執行個體的自動建立索引功能、準備測試資料和Java環境。

  2. 步驟一:上傳ES-Hadoop JAR包至HDFS

    下載ES-Hadoop安裝包,並上傳至EMR Master節點的HDFS目錄下。

  3. 步驟二:配置pom依賴

    建立Java Maven工程,並配置pom依賴。

  4. 步驟三:編寫並運行MapReduce任務

    編寫MapReduce寫資料到Elasticsearch的Java代碼,並打成Jar包上傳至EMR叢集,最後運行程式碼完成寫資料任務。

  5. 步驟四:驗證結果

    在Elasticsearch的Kibana控制台上,查看通過MapReduce寫入的資料。

準備工作

  1. 建立Elasticsearch執行個體,並開啟自動建立索引功能。

    具體操作步驟請參見建立Elasticsearch執行個體配置YML參數。本文以6.7.0版本的執行個體為例。

    重要

    在生產環境中,建議關閉自動建立索引功能,提前建立好索引和Mapping。由於本文僅用於測試,因此開啟了自動建立索引功能。

  2. 建立與Elasticsearch執行個體在同一專用網路下的EMR執行個體。

    執行個體配置如下:

    • 產品版本:EMR-3.29.0

    • 必選服務:HDFS(2.8.5),其他服務保持預設

    具體操作步驟請參見建立叢集

    重要

    Elasticsearch執行個體的私網訪問白名單預設為0.0.0.0/0,您可在安全配置頁面查看,如果未使用預設配置,您還需要在白名單中加入EMR叢集的內網IP地址:

  3. 準備JSON測試資料,將其寫入到map.json檔案中,並上傳至HDFS的/tmp/hadoop-es目錄下。

    本文使用的測試資料如下。

    {"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
    {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
    {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
  4. 準備Java環境,要求JDK版本為1.8.0及以上。

步驟一:上傳ES-Hadoop JAR包至HDFS

  1. 下載ES-Hadoop安裝包,其版本需要與Elasticsearch執行個體保持一致。

    本文使用elasticsearch-hadoop-6.7.0.zip。

  2. 登入E-MapReduce控制台,擷取Master節點的IP地址,並通過SSH登入對應的ECS機器。

    具體操作步驟請參見登入叢集

  3. 將已下載的elasticsearch-hadoop-6.7.0.zip上傳至Master節點,並解壓獲得elasticsearch-hadoop-6.7.0.jar。

  4. 建立HDFS目錄,將elasticsearch-hadoop-6.7.0.jar上傳至該目錄下。

    hadoop fs -mkdir /tmp/hadoop-es
    hadoop fs -put elasticsearch-hadoop-6.7.0/dist/elasticsearch-hadoop-6.7.0.jar /tmp/hadoop-es

步驟二:配置pom依賴

建立Java Maven工程,並將如下的pom依賴添加到Java工程的pom.xml檔案中。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>WriteToEsWithMR</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.8.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
        <version>2.8.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.8.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>2.8.5</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop-mr</artifactId>
        <version>6.7.0</version>
    </dependency>

    <dependency>
        <groupId>commons-httpclient</groupId>
        <artifactId>commons-httpclient</artifactId>
        <version>3.1</version>
    </dependency>
</dependencies>
重要

請確保pom依賴中版本與雲端服務對應版本保持一致,例如elasticsearch-hadoop-mr版本與Elasticsearch版本一致;hadoop-hdfs與HDFS版本一致。

步驟三:編寫並運行MapReduce任務

  1. 編寫範例程式碼。

    以下代碼會讀取HDFS上/tmp/hadoop-es目錄下的JSON檔案,並將這些JSON檔案中的每一行作為一個文檔寫入Elasticsearch。寫入過程由EsOutputFormat在Map階段完成。

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.elasticsearch.hadoop.mr.EsOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    public class WriteToEsWithMR extends Configured implements Tool {
    
        public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
            private Text doc = new Text();
    
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                if (value.getLength() > 0) {
                    doc.set(value);
                    System.out.println(value);
                    context.write(NullWritable.get(), doc);
                }
            }
        }
        public int run(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            conf.setBoolean("mapreduce.map.speculative", false);
            conf.setBoolean("mapreduce.reduce.speculative", false);
            conf.set("es.nodes", "es-cn-4591jumei000u****.elasticsearch.aliyuncs.com");
            conf.set("es.port","9200");
            conf.set("es.net.http.auth.user", "elastic");
            conf.set("es.net.http.auth.pass", "xxxxxx");
            conf.set("es.nodes.wan.only", "true");
            conf.set("es.nodes.discovery","false");
            conf.set("es.input.use.sliced.partitions","false");
            conf.set("es.resource", "maptest/_doc");
            conf.set("es.input.json", "true");
    
            Job job = Job.getInstance(conf);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(EsOutputFormat.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setJarByClass(WriteToEsWithMR.class);
            job.setMapperClass(EsMapper.class);
    
            FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int ret = ToolRunner.run(new WriteToEsWithMR(), args);
            System.exit(ret);
        }
    }
    表 1. ES-Hadoop相關參數說明

    參數

    預設值

    說明

    es.nodes

    localhost

    指定Elasticsearch執行個體的訪問地址,建議使用私網地址,可在執行個體的基本資料頁面查看,詳情請參見查看執行個體的基本資料

    es.port

    9200

    Elasticsearch執行個體的訪問連接埠號碼。

    es.net.http.auth.user

    elastic

    Elasticsearch執行個體的訪問使用者名稱。

    說明

    如果程式中指定elastic帳號訪問Elasticsearch服務,後續在修改elastic帳號對應密碼後需要一些時間來生效,在密碼生效期間會影響服務訪問,因此不建議通過elastic來訪問。建議在Kibana控制台中建立一個符合預期的Role角色使用者進行訪問,詳情請參見通過Elasticsearch X-Pack角色管理實現使用者權限管控

    es.net.http.auth.pass

    /

    Elasticsearch執行個體的訪問密碼。

    es.nodes.wan.only

    false

    開啟Elasticsearch叢集在雲上使用虛擬IP進行串連,是否進行節點嗅探:

    • true:設定

    • false:不設定

    es.nodes.discovery

    true

    是否禁用節點發現:

    • true:禁用

    • false:不禁用

    重要

    使用Elasticsearch,必須將此參數設定為false。

    es.input.use.sliced.partitions

    true

    是否使用slice分區:

    • true:使用。設定為true,可能會導致索引在預讀階段的時間明顯變長,有時會遠遠超出查詢資料所耗費的時間。建議設定為false,以提高查詢效率。

    • false:不使用。

    es.index.auto.create

    true

    通過Hadoop組件向Elasticsearch叢集寫入資料,是否自動建立不存在的index:

    • true:自動建立

    • false:不會自動建立

    es.resource

    /

    指定要讀寫的index和type。

    es.input.json

    false

    輸入是否已經是JSON格式:

    • true:是JSON格式

    • false:不是JSON格式

    es.mapping.names

    /

    表欄位與Elasticsearch的索引欄位名映射。

    es.read.metadata

    false

    操作Elasticsearch欄位涉及到_id之類的內部欄位,請開啟此屬性。

    更多的ES-Hadoop配置項說明,請參見官方配置說明

  2. 將代碼打成Jar包,上傳至EMR用戶端機器(例如Gateway或EMR叢集主節點)。

  3. 在EMR用戶端機器上,運行如下命令執行MapReduce程式。

    hadoop jar es-mapreduce-1.0-SNAPSHOT.jar /tmp/hadoop-es/map.json
    說明

    es-mapreduce-1.0-SNAPSHOT.jar需要替換為您已上傳的Jar包名稱。

步驟四:驗證結果

  1. 登入對應Elasticsearch執行個體的Kibana控制台。

    具體操作步驟請參見登入Kibana控制台

  2. 在左側導覽列,單擊Dev Tools

  3. Console頁簽下,執行以下命令,查看通過MapReduce任務寫入的資料。

    GET maptest/_search
    {
      "query": {
        "match_all": {}
      }
    }

    查詢成功後,返回結果如下。返回結果

總結

本文以Elasticsearch和EMR為例,介紹了如何通過ES-Hadoop,藉助MapReduce任務向Elasticsearch寫入資料。相反,您也可以藉助MapReduce任務查詢Elasticsearch資料。查詢配置和寫入類似,詳細說明可參見官方Reading data from Elasticsearch說明。