Spark是一種通用的巨量資料計算架構,擁有Hadoop MapReduce所具有的計算優點,能夠通過記憶體快取資料為大型資料集提供快速的迭代功能。與MapReduce相比,減少了中間資料讀取磁碟的過程,進而提高了處理能力。本文介紹如何通過ES-Hadoop實現Hadoop的Spark服務讀寫Elasticsearch資料。
準備工作
建立Elasticsearch執行個體,並開啟自動建立索引功能。
具體操作步驟請參見建立Elasticsearch執行個體和配置YML參數。本文以6.7.0版本的執行個體為例。
重要在生產環境中,建議關閉自動建立索引功能,提前建立好索引和Mapping。由於本文僅用於測試,因此開啟了自動建立索引功能。
建立與Elasticsearch執行個體在同一專用網路下的E-MapReduce(以下簡稱EMR)執行個體。
執行個體配置如下:
產品版本:EMR-3.29.0
必選服務:Spark(2.4.5),其他服務保持預設
具體操作步驟,請參見建立叢集。
重要Elasticsearch執行個體的私網訪問白名單預設為0.0.0.0/0,您可在安全配置頁面查看,如果未使用預設配置,您還需要在白名單中加入EMR叢集的內網IP地址:
請參見查看叢集列表與詳情,擷取EMR叢集的內網IP地址。
請參見配置執行個體公網或私網訪問白名單,配置Elasticsearch執行個體的VPC私網訪問白名單。
準備Java環境,要求JDK版本為8.0及以上。
編寫並運行Spark任務
準備測試資料。
登入E-MapReduce控制台,擷取Master節點的IP地址,並通過SSH登入對應的ECS機器。
具體操作步驟,請參見登入叢集。
將測試資料寫入檔案中。
本文使用的JSON資料樣本如下,將該資料儲存在http_log.txt檔案中。
{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"} {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"} {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
執行以下命令,將測試資料上傳至EMR Master節點的tmp/hadoop-es檔案中。
hadoop fs -put http_log.txt /tmp/hadoop-es
配置pom依賴。
建立Java Maven工程,並將如下的pom依賴添加到Java工程的pom.xml檔案中。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.7.0</version> </dependency> </dependencies>
重要請確保pom依賴中版本與雲端服務對應版本保持一致,例如elasticsearch-spark-20_2.11版本與Elasticsearch版本一致;spark-core_2.12與HDFS版本一致。
編寫範例程式碼。
寫資料
以下範例程式碼用來將測試資料寫入Elasticsearch的company索引中。
import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.spark_project.guava.collect.ImmutableMap; public class SparkWriteEs { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("Es-write"); conf.set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com"); conf.set("es.net.http.auth.user", "elastic"); conf.set("es.net.http.auth.pass", "xxxxxx"); conf.set("es.nodes.wan.only", "true"); conf.set("es.nodes.discovery","false"); conf.set("es.input.use.sliced.partitions","false"); SparkSession ss = new SparkSession(new SparkContext(conf)); final AtomicInteger employeesNo = new AtomicInteger(0); //以下的/tmp/hadoop-es/http_log.txt需要替換為您測試資料的路徑。 JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("/tmp/hadoop-es/http_log.txt") .javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees" employeesNo.getAndAdd(1), row.mkString())); JavaEsSpark.saveToEs(javaRDD, "company/_doc"); } }
讀資料
以下範例程式碼用來讀取上一步寫入Elasticsearch的資料,並進行列印。
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import java.util.Map; public class ReadES { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("readEs").setMaster("local[*]") .set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com") .set("es.port", "9200") .set("es.net.http.auth.user", "elastic") .set("es.net.http.auth.pass", "xxxxxx") .set("es.nodes.wan.only", "true") .set("es.nodes.discovery","false") .set("es.input.use.sliced.partitions","false") .set("es.resource", "company/_doc") .set("es.scroll.size","500"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc); for ( Map<String, Object> item : rdd.values().collect()) { System.out.println(item); } sc.stop(); } }
表 1. 參數說明 參數
預設值
說明
es.nodes
localhost
指定Elasticsearch執行個體的訪問地址,建議使用私網地址,可在執行個體的基本資料頁面查看。更多資訊,請參見查看執行個體的基本資料。
es.port
9200
Elasticsearch執行個體的訪問連接埠號碼。
es.net.http.auth.user
elastic
Elasticsearch執行個體的訪問使用者名稱。
說明如果程式中指定elastic帳號訪問Elasticsearch服務,後續在修改elastic帳號對應密碼後需要一些時間來生效,在密碼生效期間會影響服務訪問,因此不建議通過elastic來訪問。建議在Kibana控制台中建立一個符合預期的Role角色使用者進行訪問,詳情請參見通過Elasticsearch X-Pack角色管理實現使用者權限管控。
es.net.http.auth.pass
/
對應使用者的密碼,在建立執行個體時指定。如果忘記可進行重設,具體操作步驟,請參見重設執行個體訪問密碼。
es.nodes.wan.only
false
開啟Elasticsearch叢集在雲上使用虛擬IP進行串連,是否進行節點嗅探:
true:設定
false:不設定
es.nodes.discovery
true
是否禁用節點發現:
true:禁用
false:不禁用
重要使用Elasticsearch,必須將此參數設定為false。
es.input.use.sliced.partitions
true
是否使用slice分區:
true:使用。設定為true,可能會導致索引在預讀階段的時間明顯變長,有時會遠遠超出查詢資料所耗費的時間。建議設定為false,以提高查詢效率。
false:不使用。
es.index.auto.create
true
通過Hadoop組件向Elasticsearch叢集寫入資料,是否自動建立不存在的index:
true:自動建立
false:不會自動建立
es.resource
/
指定要讀寫的index和type。
es.mapping.names
/
表欄位與Elasticsearch的索引欄位名映射。
更多的ES-Hadoop配置項說明,請參見官方配置說明。
將代碼打成Jar包,上傳至EMR用戶端機器(例如Gateway或EMR叢集主節點)。
在EMR用戶端機器上,運行如下命令執行Spark程式:
寫資料
cd /usr/lib/spark-current ./bin/spark-submit --master yarn --executor-cores 1 --class "SparkWriteEs" /usr/local/spark_es.jar
重要/usr/local/spark_es.jar需要替換為您Jar包上傳的路徑。
讀資料
cd /usr/lib/spark-current ./bin/spark-submit --master yarn --executor-cores 1 --class "ReadES" /usr/local/spark_es.jar
讀資料成功後,列印結果如下。
驗證結果
登入對應Elasticsearch執行個體的Kibana控制台。
具體操作步驟請參見登入Kibana控制台。
在左側導覽列,單擊Dev Tools。
在Console中,執行以下命令,查看通過Spark任務寫入的資料。
GET company/_search { "query": { "match_all": {} } }
查詢成功後,返回結果如下。
總結
本文以Elasticsearch和EMR為例,介紹了如何通過ES-Hadoop,實現Spark讀寫Elasticsearch資料。與其他EMR組件相比,ES-Hadoop與Spark的整合,不僅包括RDD,還包括Spark Streaming、scale、DataSet與Spark SQL等,您可以根據需求進行配置。詳細資料,請參見Apache Spark support。