本文主要介紹如何使用DLA Spark訪問使用者VPC中的Hive叢集。
雲原生資料湖分析(DLA)產品已退市,AnalyticDB for MySQL湖倉版支援DLA已有功能,並提供更多的功能和更好的效能。AnalyticDB for MySQL相關使用文檔,請參見訪問Hive資料來源。
前提條件
您已開通資料湖分析DLA(Data Lake Analytics)服務,如何開通,請參見開通雲原生資料湖分析服務。
您已登入雲原生資料庫分析DLA控制台,在雲原生資料湖分析DLA控制台上建立了Spark虛擬叢集。
您已開通Object Storage Service(Object Storage Service)服務。如何開通,請參見開通OSS服務
準備建立Spark計算節點所需要的交換器ID和安全性群組ID,可以選擇已有的交換器和安全性群組,也可以建立交換器和安全性群組。交換器和安全性群組需要滿足以下條件:
交換器需要與您的Hive服務叢集在同一VPC下。可使用您Hive叢集控制台上的交換器ID。
安全性群組需要與您的Hive服務叢集在同一VPC下。您可以前往ECS管理主控台-網路與安全-安全性群組按照Virtual Private CloudID搜尋該VPC下的安全性群組,任意選擇一個安全性群組ID即可。
如果您的Hive服務有白名單控制,需要您將交換器網段加入到您Hive服務的白名單中。
操作步驟
如果您的Hive中繼資料使用的是獨立的RDS且表資料存放在OSS中,則可以使用下列配置並跳過後續步驟,否則請您從第二步開始配置。
{ "name": "spark-on-hive", "className": "com.aliyun.spark.SparkHive", #串連Hive的測試代碼,按需修改名稱 "jars": [ "oss://path/to/mysql-connector-java-5.1.47.jar" ], "conf": { "spark.dla.eni.vswitch.id": "<交換器ID>", "spark.dla.eni.security.group.id": "<安全性群組ID>", "spark.dla.eni.enable": "true", "spark.driver.resourceSpec": "medium", "spark.dla.connectors": "oss", "spark.executor.instances": 1, "spark.sql.catalogImplementation": "hive", "spark.executor.resourceSpec": "medium", "spark.hadoop.javax.jdo.option.ConnectionDriverName": "com.mysql.jdbc.Driver", "spark.hadoop.javax.jdo.option.ConnectionUserName": "<hive_user_name>", #Hive RDS的使用者名稱 "spark.hadoop.javax.jdo.option.ConnectionPassword": "<your_pass_word>", #Hive RDS的密碼 "spark.hadoop.javax.jdo.option.ConnectionURL": "<jdbc串連>", #Hive RDS 的jdbc連結 "spark.dla.job.log.oss.uri": "<日誌目錄路徑>" }, "file": "<oss://主資源Jar包路徑>" }
說明jars中指定的Jar包是MySQL的jdbc連接器,可從官方Maven倉庫,並上傳到oss。
擷取需要在DLA Spark配置的Hive相關參數。
說明如果您無法在您的Hive服務所在的叢集中執行spark作業,可以跳過這步。
我們提供了工具來讀取你Hive服務所在的叢集的配置,您可以按照下面的地址下載
spark-examples-0.0.1-SNAPSHOT-shaded.jar
並上傳至OSS, 然後提交Spark作業到您的Hive
服務所在叢集上執行,即可在作業輸出中獲得訪問您Hive叢集所需的配置。wget https://dla003.oss-cn-hangzhou.aliyuncs.com/GetSparkConf/spark-examples-0.0.1-SNAPSHOT-shaded.jar
EMR叢集使用者將Jar包上傳至OSS後,可以通過以下命令提交作業到EMR叢集擷取配置作業:
--class com.aliyun.spark.util.GetConfForServerlessSpark --deploy-mode client ossref://{path/to}/spark-examples-0.0.1-SNAPSHOT-shaded.jar get hive hadoop
作業運行完畢後,可以通過SparkUI查看driver的stdout輸出或者從作業詳情中的提交日誌中查看輸出的配置。
雲Hbase-Spark使用者可以將Jar包上傳至資源管理目錄後,用以下命令提交擷取配置作業:
--class com.aliyun.spark.util.GetConfForServerlessSpark /{path/to}/spark-examples-0.0.1-SNAPSHOT-shaded.jar get hive hadoop
等待作業完成後,通過SparkUI的driver中的stdout查看輸出配置。
其他Hive叢集,如果您在叢集上未設定
HIVE_CONF_DIR
環境變數,則需要手動輸入HIVE_CONF_DIR
路徑。--class com.aliyun.spark.util.GetConfForServerlessSpark --deploy-mode client /{path/to}/spark-examples-0.0.1-SNAPSHOT-shaded.jar get --hive-conf-dir </path/to/your/hive/conf/dir> hive hadoop
編寫訪問Hive的SparkApplication。
以下範例程式碼可以首先根據使用者傳入的表名,在使用者
default namespace
建立一個表,該表只有一列字串類型的資料,內容為hello, dla-spark
,然後從該表讀出這一列資料,並列印到stdout:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkHive { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("Spark HIVE TEST") .enableHiveSupport() .getOrCreate() val welcome = "hello, dla-spark" //Hive表名 val tableName = args(0) import sparkSession.implicits._ //將只有一行一列資料的DataFrame: df 存入到Hive, 表名為使用者傳進來的tableName, 列名為welcome_col val df = Seq(welcome).toDF("welcome_col") df.write.format("hive").mode("overwrite").saveAsTable(tableName) //從Hive中讀取表 tableName val dfFromHive = sparkSession.sql( s""" |select * from $tableName |""".stripMargin) dfFromHive.show(10) } }
將SparkApplication Jar包和依賴上傳至OSS中。
詳情請參見控制台上傳檔案。
說明OSS所在的region和Serverless Spark所在的region需要保持一致。
在DLA Spark中提交作業並進行計算。
訪問Hive,如果您叢集中的HDFS是以高可用部署(即您的叢集有一個以上Master節點/NameNode),詳情請參見建立和執行Spark作業和作業配置指南。
{ "args": [ "hello_dla" ], "name": "spark-on-hive", "className": "com.aliyun.spark.SparkHive", "conf": { "spark.sql.catalogImplementation":"hive", "spark.dla.eni.vswitch.id": "{您的交換器ID}", "spark.dla.eni.security.group.id": "{您的安全性群組ID}", "spark.dla.eni.enable": "true", "spark.driver.resourceSpec": "medium", "spark.executor.instances": 1, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://<指定您存放SparkUI日誌的目錄/>", "spark.hadoop.hive.metastore.uris":"thrift://${ip}:${port},thrift://${ip}:${port}", "spark.hadoop.dfs.nameservices":"{您的nameservices名稱}", "spark.hadoop.dfs.client.failover.proxy.provider.${nameservices}":"{您的failover proxy provider實作類別全路徑名稱}", "spark.hadoop.dfs.ha.namenodes.${nameservices}":"{您的nameservices所屬namenode列表}", "spark.hadoop.dfs.namenode.rpc-address.${nameservices}.${nn1}":"namenode0所屬的ip:port", "spark.hadoop.dfs.namenode.rpc-address.${nameservices}.${nn2}":"namenode1所屬的ip:port" }, "file": "oss://{您的Jar包所屬的oss路徑}" }
參數說明如下:
參數
說明
備忘
spark.hadoop.hive.metastore.uris
配置訪問HiveMetaStore的Uri,對應${HIVE_CONF_DIR}/hive-site.xml中的hive.metastore.uris配置項。注意,一般該配置項的值都是網域名稱:連接埠的形式,使用者在serverless spark中配置參數的時候需要將它替換為對應IP+連接埠的形式。
網域名稱和IP的映射關係,一般可以登入叢集的master節點查看原生/etc/hosts,或者在master節點,直接使用ping+網域名稱的方式擷取,您也可以採用步驟2擷取對應的配置參數。
spark.dla.eni.vswitch.id
您的交換器ID。
無
spark.dla.eni.security.group.id
您的安全性群組ID。
無
spark.dla.eni.enable
控制開啟或關閉ENI。
無
spark.hadoop.dfs.nameservices
對應hdfs-site.xml中的dfs.nameservices
無
spark.dla.job.log.oss.uri
指定您存放SparkUI日誌的OSS目錄
無
spark.hadoop.dfs.client.failover.proxy.provider.${nameservices}
對應hdfs-site.xml中的dfs.client.failover.proxy.provider.${nameservices}
無
spark.hadoop.dfs.ha.namenodes.${nameservices}
對應hdfs-site.xml中的dfs.ha.namenodes.${nameservices}
無
spark.hadoop.dfs.namenode.rpc-address.${nameservices}.${nn1/nn2}
對應hdfs-site.xml中的dfs.namenode.rpc-address.${nameservices}.${nn1/nn2}
注意該配置項應該寫成IP:連接埠的形式,使用者可以通過使用者叢集master節點中的/etc/hosts檔案查看網域名稱和IP的對應關係或者在master節點,直接使用ping+網域名稱的方式擷取,您也可以採用步驟2擷取對應的配置參數。
作業運行成功後,單擊
,查看作業日誌。訪問Hive, 如果您叢集中的HDFS是以非高可用部署的(即只有一個Master節點/NameNode)。
{ "args": [ "hello_dla" ], "name": "spark-on-hive", "className": "com.aliyun.spark.SparkHive", "conf": { "spark.sql.catalogImplementation":"hive", "spark.dla.eni.vswitch.id": "{您的交換器ID}", "spark.dla.eni.security.group.id": "{您的安全性群組ID}", "spark.dla.eni.enable": "true", "spark.driver.resourceSpec": "medium", "spark.executor.instances": 1, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://<指定您存放SparkUI日誌的目錄/>"," "spark.hadoop.hive.metastore.uris":"thrift://${ip}:${port},thrift://${ip}:${port}", "spark.dla.eni.extra.hosts":"${ip0} ${hostname_0} ${hostname_1} ${hostname_n}" }, "file": "oss://{您的Jar包所屬的oss路徑}" }
參數
說明
備忘
spark.hadoop.hive.metastore.uris
配置訪問HiveMetaStore的Uri,對應${HIVE_CONF_DIR}/hive-site.xml中的hive.metastore.uris配置項。注意,一般該配置項的值都是網域名稱+連接埠的形式,使用者在serverless spark中配置參數的時候需要將它替換為對應ip:連接埠的形式。
網域名稱和IP的映射關係,一般可以登入叢集的master節點查看原生/etc/hosts, 或者在master節點,直接使用ping + 網域名稱的方式擷取,使用者也可以採用步驟1擷取對應的配置參數。
spark.dla.job.log.oss.uri
指定您存放SparkUI日誌的OSS目錄
無
spark.dla.eni.vswitch.id
您的交換器ID
無
spark.dla.eni.security.group.id
您的安全性群組ID
無
spark.dla.eni.enable
控制開啟或關閉ENI
無
spark.dla.eni.extra.hosts
Spark解析Hive表位置時,需要額外傳入IP和Table Store節點host的映射關係,以便Spark能正確解析位置的網域名稱資訊。
重要IP和網域名稱之間用空格隔開。多個IP 和網域名稱用逗號隔開,如 "ip0 master0, ip1 master1"
該值可從使用者叢集${Hive_CONF_DIR}/core-site.xml的fs.defaultFS擷取。樣本使用者fs.defaultFs的值為: "hdfs://master-1:9000", 則需要配置spark.dla.eni.extra.hosts的值為:"${master-1的ip} master-1"。IP和網域名稱的對應關係,您可以登入自建叢集的master節點,從/etc/hosts中查看IP和網域名稱的對應關係。您也可以從步驟2中擷取相關參數。