本文介紹如何使用AnalyticDB for MySQLSpark通過ENI網路讀取Elasticsearch資料來源。
前提條件
AnalyticDB for MySQL叢集的產品系列為湖倉版。
已在AnalyticDB for MySQL叢集中建立Job型資源群組。具體操作,請參見建立資源群組。
已建立資料庫帳號。
如果是通過阿里雲帳號訪問,只需建立高許可權帳號。具體操作,請參見建立高許可權帳號。
如果是通過RAM使用者訪問,需要建立高許可權帳號和普通帳號並且將RAM使用者綁定到普通帳號上。具體操作,請參見建立資料庫帳號和綁定或解除綁定RAM使用者與資料庫帳號。
AnalyticDB for MySQL叢集與OSS儲存空間位於相同地區。
AnalyticDB for MySQL叢集與Elasticsearch執行個體位於同一地區。具體操作,請參見建立Elasticsearch執行個體。
已將AnalyticDB for MySQL的IP地址添加至Elasticsearch執行個體的白名單中。具體操作,請參見配置執行個體公網或私網訪問白名單。
準備工作
在Elasticsearch控制台的基本資料頁面,擷取交換器ID。
在ECS管理主控台的安全性群組頁面,擷取Elasticsearch執行個體所屬的安全性群組ID。如未添加安全性群組,請參見建立安全性群組。
使用Scala串連Elasticsearch
下載與Elasticsearch執行個體版本對應的JAR包,下載連結,請參見Elasticsearch Spark。本文下載的樣本JAR包為Elasticsearch-spark-30_2.12-7.17.9.jar。
在pom.xml檔案的dependencies中添加依賴項。
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>重要請確保pom.xml檔案中Elasticsearch-spark-30_2.12的版本與Elasticsearch執行個體的版本一致,Spark-core_2.12的版本與AnalyticDB for MySQL Spark版本一致。
編寫如下樣本程式,並進行編譯打包,本文產生的JAR包名稱為
spark-example.jar。package org.example import org.apache.spark.sql.{SaveMode, SparkSession} object SparkEs { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate(); // 產生一個dataframe val columns = Seq("language","users_count") val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000")) val writeDF = spark.createDataFrame(data).toDF(columns:_*) // 寫入資料 writeDF.write.format("es").mode(SaveMode.Overwrite) // Elasticsearch執行個體的私網地址 .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // Elasticsearch執行個體的私網連接埠號碼 .option("es.port", "9200") // Elasticsearch執行個體的使用者名稱,固定寫為elastic .option("es.net.http.auth.user", "elastic") // Elasticsearch執行個體的密碼 .option("es.net.http.auth.pass", "password") // 串連Elasticsearch執行個體時,必須配置為true .option("es.nodes.wan.only", "true") // 串連Elasticsearch執行個體時,必須配置為false .option("es.nodes.discovery", "false") // Spark讀取的Elasticsearch執行個體的資料類型 .save("spark/_doc") // 讀取資料 spark.read.format("es") // Elasticsearch執行個體的私網地址 .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // Elasticsearch執行個體的私網連接埠號碼 .option("es.port", "9200") // Elasticsearch執行個體的使用者名稱,固定寫為elastic .option("es.net.http.auth.user", "elastic") // Elasticsearch執行個體的密碼 .option("es.net.http.auth.pass", "password") // 串連Elasticsearch執行個體時,必須配置為true .option("es.nodes.wan.only", "true") // 串連Elasticsearch執行個體時,必須配置為false .option("es.nodes.discovery", "false") // Spark讀取的Elasticsearch執行個體的資料類型,格式為<index>/<type> .load("spark/_doc").show } }將步驟1中下載的JAR包和樣本程式
spark-example.jar上傳至OSS。具體操作,請參見上傳檔案。登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊。
在編輯器視窗上方,選擇Job型資源群組和Spark應用類型。本文以Batch類型為例。
在編輯器中執行以下作業內容。
{ "name": "ES-SPARK-EXAMPLE", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/spark-example.jar", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }參數說明如下:
參數
說明
name
Spark作業名稱。
className
Java或者Scala程式入口類,Python不需要指定入口類。
conf
與開源Spark中的配置項基本一致,參數格式為
key:value形式,多個參數之間以英文逗號(,)分隔。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明。spark.adb.eni.enabled
是否開啟ENI訪問。使用企業版、基礎版及湖倉版Spark訪問Elasticsearch資料來源時,需要開啟ENI訪問。
spark.adb.eni.vswitchId
Elasticsearch執行個體的交換器ID。擷取方法,請參見準備工作。
spark.adb.eni.securityGroupId
Elasticsearch執行個體的安全性群組ID。擷取方法,請參見準備工作。
file
樣本程式
spark-example.jar所在的OSS路徑。jars
Spark作業依賴的JAR包所在的OSS路徑。
單擊運行。
使用PySpark串連Elasticsearch
下載與Elasticsearch執行個體版本對應的JAR包,下載連結,請參見Elasticsearch Spark。本文下載的樣本JAR包為Elasticsearch-spark-30_2.12-7.17.9.jar。
在pom.xml檔案的dependencies中添加依賴項。
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>重要請確保pom.xml檔案中Elasticsearch-spark-30_2.12的版本與Elasticsearch執行個體的版本一致,Spark-core_2.12的版本與AnalyticDB for MySQL Spark版本一致。
編寫如下樣本程式,並將樣本程式儲存為
es-spark-example.py。from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession \ .builder \ .getOrCreate() # 產生DataFrame dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40) ] deptColumns = ["dept_name", "dept_id"] deptDF = spark.createDataFrame(data=dept, schema=deptColumns) deptDF.printSchema() deptDF.show(truncate=False) # 寫入資料 deptDF.write.format('es').mode("overwrite") \ #Elasticsearch執行個體的私網地址 .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ #Elasticsearch執行個體的私網連接埠號碼 .option('es.port', '9200') \ #Elasticsearch執行個體的使用者名稱,固定寫為elastic .option('es.net.http.auth.user', 'elastic') \ #Elasticsearch執行個體的密碼 .option('es.net.http.auth.pass', 'password') \ #串連Elasticsearch執行個體時,必須配置為true .option("es.nodes.wan.only", "true") \ #串連Elasticsearch執行個體時,必須配置為false .option("es.nodes.discovery", "false") \ #Spark讀取的Elasticsearch執行個體的資料類型,格式為<index>/<type> .save("spark/_doc") # 讀取資料 df = spark.read.format("es") \ #Elasticsearch執行個體的私網地址 .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ #Elasticsearch執行個體的私網連接埠號碼 .option('es.port', '9200') \ #Elasticsearch執行個體的使用者名稱,固定寫為elastic .option('es.net.http.auth.user', 'elastic') \ #Elasticsearch執行個體的密碼 .option('es.net.http.auth.pass', 'password') \ #串連Elasticsearch執行個體時,必須配置為true .option("es.nodes.wan.only", "true") \ #串連Elasticsearch執行個體時,必須配置為false .option("es.nodes.discovery", "false") \ #Spark讀取的Elasticsearch執行個體的資料類型,格式為<index>/<type> .load("spark/_doc").show將步驟1中下載的JAR包和
es-spark-example.py程式上傳到OSS中。具體操作,請參見上傳檔案。登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊。
在編輯器視窗上方,選擇Job型資源群組和Spark應用類型。本文以Batch類型為例。
在編輯器中執行以下作業內容。
{ "name": "ES-SPARK-EXAMPLE", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/es-spark-example.py", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }參數說明如下:
參數
說明
name
Spark作業的名稱。
conf
與開源Spark中的配置項基本一致,參數格式為
key:value形式,多個參數之間以英文逗號(,)分隔。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明。spark.adb.eni.enabled
是否開啟ENI訪問。使用企業版、基礎版及湖倉版Spark訪問Elasticsearch資料來源時,需要開啟ENI訪問。
spark.adb.eni.vswitchId
Elasticsearch執行個體的交換器ID。擷取方法,請參見準備工作。
spark.adb.eni.securityGroupId
Elasticsearch執行個體的安全性群組ID。擷取方法,請參見準備工作。
file
es-spark-example.py程式所在的OSS路徑。jars
Spark作業依賴的JAR包所在的OSS路徑。
單擊運行。