ES-Hadoop是Elasticsearch推出的專門用於對接Hadoop生態的工具,可以讓資料在Elasticsearch和Hadoop之間雙向移動,無縫銜接Elasticsearch與Hadoop服務,充分使用Elasticsearch的快速搜尋及Hadoop批處理能力,實現互動式資料處理。對於一些較複雜的分析任務,需要通過MapReduce任務讀取HDFS上的JSON檔案,寫入Elasticsearch叢集。本文介紹如何通過ES-Hadoop,藉助MapReduce任務向Elasticsearch寫入資料。
操作流程
建立同一專用網路下的Elasticsearch和E-MapReduce(以下簡稱EMR)執行個體、開啟Elasticsearch執行個體的自動建立索引功能、準備測試資料和Java環境。
下載ES-Hadoop安裝包,並上傳至EMR Master節點的HDFS目錄下。
建立Java Maven工程,並配置pom依賴。
編寫MapReduce寫資料到Elasticsearch的Java代碼,並打成Jar包上傳至EMR叢集,最後運行程式碼完成寫資料任務。
在Elasticsearch的Kibana控制台上,查看通過MapReduce寫入的資料。
準備工作
建立Elasticsearch執行個體,並開啟自動建立索引功能。
具體操作步驟請參見建立Elasticsearch執行個體和配置YML參數。本文以6.7.0版本的執行個體為例。
重要在生產環境中,建議關閉自動建立索引功能,提前建立好索引和Mapping。由於本文僅用於測試,因此開啟了自動建立索引功能。
建立與Elasticsearch執行個體在同一專用網路下的EMR執行個體。
執行個體配置如下:
產品版本:EMR-3.29.0
必選服務:HDFS(2.8.5),其他服務保持預設
具體操作步驟請參見建立叢集。
重要Elasticsearch執行個體的私網訪問白名單預設為0.0.0.0/0,您可在安全配置頁面查看,如果未使用預設配置,您還需要在白名單中加入EMR叢集的內網IP地址:
請參見查看叢集列表與詳情,擷取EMR叢集的內網IP地址。
請參見配置執行個體公網或私網訪問白名單,配置Elasticsearch執行個體的VPC私網訪問白名單。
準備JSON測試資料,將其寫入到map.json檔案中,並上傳至HDFS的/tmp/hadoop-es目錄下。
本文使用的測試資料如下。
{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"} {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"} {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
準備Java環境,要求JDK版本為1.8.0及以上。
步驟一:上傳ES-Hadoop JAR包至HDFS
下載ES-Hadoop安裝包,其版本需要與Elasticsearch執行個體保持一致。
本文使用elasticsearch-hadoop-6.7.0.zip。
登入E-MapReduce控制台,擷取Master節點的IP地址,並通過SSH登入對應的ECS機器。
具體操作步驟請參見登入叢集。
將已下載的elasticsearch-hadoop-6.7.0.zip上傳至Master節點,並解壓獲得elasticsearch-hadoop-6.7.0.jar。
建立HDFS目錄,將elasticsearch-hadoop-6.7.0.jar上傳至該目錄下。
hadoop fs -mkdir /tmp/hadoop-es hadoop fs -put elasticsearch-hadoop-6.7.0/dist/elasticsearch-hadoop-6.7.0.jar /tmp/hadoop-es
步驟二:配置pom依賴
建立Java Maven工程,並將如下的pom依賴添加到Java工程的pom.xml檔案中。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>WriteToEsWithMR</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-mr</artifactId>
<version>6.7.0</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
請確保pom依賴中版本與雲端服務對應版本保持一致,例如elasticsearch-hadoop-mr版本與Elasticsearch版本一致;hadoop-hdfs與HDFS版本一致。
步驟三:編寫並運行MapReduce任務
編寫範例程式碼。
以下代碼會讀取HDFS上/tmp/hadoop-es目錄下的JSON檔案,並將這些JSON檔案中的每一行作為一個文檔寫入Elasticsearch。寫入過程由EsOutputFormat在Map階段完成。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.elasticsearch.hadoop.mr.EsOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WriteToEsWithMR extends Configured implements Tool { public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> { private Text doc = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (value.getLength() > 0) { doc.set(value); System.out.println(value); context.write(NullWritable.get(), doc); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); conf.set("es.nodes", "es-cn-4591jumei000u****.elasticsearch.aliyuncs.com"); conf.set("es.port","9200"); conf.set("es.net.http.auth.user", "elastic"); conf.set("es.net.http.auth.pass", "xxxxxx"); conf.set("es.nodes.wan.only", "true"); conf.set("es.nodes.discovery","false"); conf.set("es.input.use.sliced.partitions","false"); conf.set("es.resource", "maptest/_doc"); conf.set("es.input.json", "true"); Job job = Job.getInstance(conf); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(EsOutputFormat.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setJarByClass(WriteToEsWithMR.class); job.setMapperClass(EsMapper.class); FileInputFormat.setInputPaths(job, new Path(otherArgs[0])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new WriteToEsWithMR(), args); System.exit(ret); } }
表 1. ES-Hadoop相關參數說明 參數
預設值
說明
es.nodes
localhost
指定Elasticsearch執行個體的訪問地址,建議使用私網地址,可在執行個體的基本資料頁面查看,詳情請參見查看執行個體的基本資料。
es.port
9200
Elasticsearch執行個體的訪問連接埠號碼。
es.net.http.auth.user
elastic
Elasticsearch執行個體的訪問使用者名稱。
說明如果程式中指定elastic帳號訪問Elasticsearch服務,後續在修改elastic帳號對應密碼後需要一些時間來生效,在密碼生效期間會影響服務訪問,因此不建議通過elastic來訪問。建議在Kibana控制台中建立一個符合預期的Role角色使用者進行訪問,詳情請參見通過Elasticsearch X-Pack角色管理實現使用者權限管控。
es.net.http.auth.pass
/
Elasticsearch執行個體的訪問密碼。
es.nodes.wan.only
false
開啟Elasticsearch叢集在雲上使用虛擬IP進行串連,是否進行節點嗅探:
true:設定
false:不設定
es.nodes.discovery
true
是否禁用節點發現:
true:禁用
false:不禁用
重要使用Elasticsearch,必須將此參數設定為false。
es.input.use.sliced.partitions
true
是否使用slice分區:
true:使用。設定為true,可能會導致索引在預讀階段的時間明顯變長,有時會遠遠超出查詢資料所耗費的時間。建議設定為false,以提高查詢效率。
false:不使用。
es.index.auto.create
true
通過Hadoop組件向Elasticsearch叢集寫入資料,是否自動建立不存在的index:
true:自動建立
false:不會自動建立
es.resource
/
指定要讀寫的index和type。
es.input.json
false
輸入是否已經是JSON格式:
true:是JSON格式
false:不是JSON格式
es.mapping.names
/
表欄位與Elasticsearch的索引欄位名映射。
es.read.metadata
false
操作Elasticsearch欄位涉及到_id之類的內部欄位,請開啟此屬性。
更多的ES-Hadoop配置項說明,請參見官方配置說明。
將代碼打成Jar包,上傳至EMR用戶端機器(例如Gateway或EMR叢集主節點)。
在EMR用戶端機器上,運行如下命令執行MapReduce程式。
hadoop jar es-mapreduce-1.0-SNAPSHOT.jar /tmp/hadoop-es/map.json
說明es-mapreduce-1.0-SNAPSHOT.jar需要替換為您已上傳的Jar包名稱。
步驟四:驗證結果
登入對應Elasticsearch執行個體的Kibana控制台。
具體操作步驟請參見登入Kibana控制台。
在左側導覽列,單擊Dev Tools。
在Console頁簽下,執行以下命令,查看通過MapReduce任務寫入的資料。
GET maptest/_search { "query": { "match_all": {} } }
查詢成功後,返回結果如下。
總結
本文以Elasticsearch和EMR為例,介紹了如何通過ES-Hadoop,藉助MapReduce任務向Elasticsearch寫入資料。相反,您也可以藉助MapReduce任務查詢Elasticsearch資料。查詢配置和寫入類似,詳細說明可參見官方Reading data from Elasticsearch說明。