AnalyticDB for MySQLSpark支援訪問OSS-HDFS資料來源,本文介紹如何使用Spark來操作OSS-HDFS資料。
前提條件
叢集的產品系列為湖倉版。
叢集與OSS儲存空間位於相同地區。
已建立Job型資源群組。具體操作,請參見建立資源群組。
已建立AnalyticDB for MySQL叢集的資料庫帳號。
如果是通過阿里雲帳號訪問,只需建立高許可權帳號。具體操作,請參見建立高許可權帳號。
如果是通過RAM使用者訪問,需要建立高許可權帳號和普通帳號並且將RAM使用者綁定到普通帳號上。具體操作,請參見建立資料庫帳號和綁定或解除綁定RAM使用者與資料庫帳號。
開啟OSS-HDFS服務。具體操作,請參見開通OSS-HDFS服務。
Spark Jar模式讀寫OSS-HDFS資料來源
編寫訪問OSS-HDFS的樣本程式(即Spark作業依賴的JAR包),進行編譯打包後產生的JAR包名稱為
oss_hdfs_demo.jar
。範例程式碼如下:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkHDFS { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("Spark HDFS TEST") .getOrCreate() val welcome = "hello, adb-spark" //hdfs目錄用於存放內容 val hdfsPath = args(0); //將welcome字串存入指定的hdfs目錄 sparkSession.sparkContext.parallelize(Seq(welcome)).saveAsTextFile(hdfsPath) //從指定的hdfs目錄中讀取內容,並列印 sparkSession.sparkContext.textFile(hdfsPath).collect.foreach(println) } }
將
oss_hdfs_demo.jar
包上傳到OSS-HDFS。具體操作,請參見通過Hadoop Shell命令訪問。登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,在湖倉版頁簽下,單擊目的地組群ID。
在左側導覽列,單擊
。在編輯器視窗上方,選擇Job型資源群組和Spark應用類型。本文以Batch類型為例。
在編輯器中輸入以下Spark代碼。讀取OSS中的檔案並列印出來行數和第一行內容。
{ "args": ["oss://testBucketName/data/oss_hdfs"], "file": "oss://testBucketName/data/oss_hdfs_demo.jar", "name": "spark-on-hdfs", "className": "com.aliyun.spark.SparkHDFS", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 1, "spark.executor.resourceSpec": "medium", "spark.adb.connectors": "jindo" } }
參數說明:
參數名稱
參數說明
args
Spark JAR作業啟動並執行參數。本文範例程式碼需要在args傳入讀寫的OSS-HDFS路徑。
本文樣本為:
oss://testBucketName/data/oss_hdfs
。file
JAR包所屬的OSS-HDFS路徑。
本文樣本為:
oss://testBucketName/data/oss_hdfs_demo.jar
。name
Spark應用的名稱。
spark.adb.connectors
本文讀取OSS-HDFS資料使用的連接器為:
jindo
。conf
與開源Spark中的配置項基本一致,參數格式為
key: value
形式,多個參數之間以英文逗號(,)分隔。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明。單擊運行,執行完成後,您可以在Spark Jar開發頁面應用列表頁簽中的日誌查看資料。詳情請參見Spark開發編輯器。
Spark SQL模式讀寫OSS-HDFS資料來源
在OSS-HDFS上建立庫路徑和表路徑。具體操作,請參見通過Hadoop Shell命令訪問。本文樣本為:
庫路徑:
oss://{bucket}/jindo_test
;表路徑:oss://{bucket}/jindo_test/tbl
。編寫訪問OSS-HDFS的Spark SQL。
SET spark.driver.resourceSpec=small; SET spark.executor.instances=1; SET spark.executor.resourceSpec=small; SET spark.adb.connectors=jindo; CREATE DATABASE IF NOT EXISTS jindo_test LOCATION 'oss://{bucket}/jindo_test'; USE jindo_test; CREATE TABLE IF NOT EXISTS tbl(id int, name string) LOCATION 'oss://{bucket}/jindo_test/tbl'; INSERT INTO tbl values(1, 'aaa'); SELECT * FROM tbl;
單擊立即執行。