本文為您介紹Spark-2.x依賴的配置以及Spark-2.x樣本說明。
配置Spark-2.x的依賴
通過MaxCompute提供的Spark用戶端提交應用時,需要在pom.xml檔案中添加以下依賴。pom.xml檔案請參見pom.xml。
<properties>
<spark.version>2.3.0</spark.version>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>${cupid.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala.version}</version>
</dependency>
上述代碼中Scope的定義如下:
spark-core、spark-sql等所有Spark社區發布的包,設定Scope為provided。
odps-spark-datasource設定Scope為compile。
WordCount樣本(Scala)
程式碼範例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class \ com.aliyun.odps.spark.examples.WordCount \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
MaxCompute Table讀寫樣本(Scala)
程式碼範例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.sparksql.SparkSQL \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
GraphX PageRank樣本(Scala)
程式碼範例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.graphx.PageRank \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
Mllib Kmeans-ON-OSS樣本(Scala)
spark.hadoop.fs.oss.ststoken.roleArn
和spark.hadoop.fs.oss.endpoint
的填寫請參見Oss-Access文檔說明。
程式碼範例
提交方式
# 編輯代碼。 val modelOssDir = "oss://bucket/kmeans-model" // 填寫具體的OSS Bucket路徑。 val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("KmeansModelSaveToOss") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
OSS UnstructuredData樣本(Scala)
spark.hadoop.fs.oss.ststoken.roleArn
和spark.hadoop.fs.oss.endpoint
的填寫請參見Oss-Access文檔說明。
程式碼範例
提交方式
# 編輯代碼。 val pathIn = "oss://bucket/inputdata/" // 填寫具體的OSS Bucket路徑。 val spark = SparkSession .builder() .config("spark.hadoop.fs.oss.credentials.provider", "org.apache.hadoop.fs.aliyun.oss.AliyunStsTokenCredentialsProvider") .config("spark.hadoop.fs.oss.ststoken.roleArn", "acs:ram::****:role/aliyunodpsdefaultrole") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com") .appName("SparkUnstructuredDataCompute") .getOrCreate() cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
SparkPi樣本(Scala)
程式碼範例
提交方式
cd /path/to/MaxCompute-Spark/spark-2.x mvn clean package # 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支援Spark Streaming LogHub樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支援Spark Streaming LogHub寫MaxCompute樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.loghub.LogHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支援Spark Streaming DataHub樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHubStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支援Spark Streaming DataHub寫MaxCompute樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.datahub.DataHub2OdpsDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支援Spark Streaming Kafka樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.streaming.kafka.KafkaStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
更多資訊請參見MaxCompute-Spark。
支援Spark StructuredStreaming DataHub樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.datahub.DatahubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支援Spark StructuredStreaming Kafka樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.kafka.KafkaStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
支援Spark StructuredStreaming LogHub樣本(Scala)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.structuredstreaming.loghub.LoghubStructuredStreamingDemo \ /path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar
MaxCompute Table讀寫PySpark樣本(Python)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/odps-spark-datasource_2.11-3.3.8-public.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_sql.py
PySpark寫OSS樣本(Python)
程式碼範例
提交方式
# 環境變數spark-defaults.conf的配置請參見搭建開發環境。 # OSS相關配置請參見OSS Access文檔說明。 cd $SPARK_HOME bin/spark-submit --master yarn-cluster --jars /path/to/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar \ /path/to/MaxCompute-Spark/spark-2.x/src/main/python/spark_oss.py # spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar可以通過Spark-2.x編譯得到。
Spark-SQL樣本(Java)
Spark-SQL Java範例程式碼請參見JavaSparkSQL.java。
從MaxCompute中讀取資料寫入HBase
通過IntelliJ IDEA工具編寫代碼,實現從MaxCompute中讀取資料寫入HBase。
程式碼範例
object McToHbase { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("spark_sql_ddl") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api") .getOrCreate() val sc = spark.sparkContext val config = HBaseConfiguration.create() val zkAddress = "" config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress); val jobConf = new JobConf(config) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test") try{ import spark._ spark.sql("select '7', 'long'").rdd.map(row => { val id = row(0).asInstanceOf[String] val name = row(1).asInstanceOf[String] val put = new Put(Bytes.toBytes(id)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes(name)) (new ImmutableBytesWritable, put) }).saveAsHadoopDataset(jobConf) } finally { sc.stop() } } }
提交方式:通過IntelliJ IDEA提交並運行範例程式碼。更多操作資訊,請參見Spark在MaxCompute的運行方式。
讀寫OSS檔案
通過IntelliJ IDEA工具或DataWorks,實現讀寫OSS檔案。
程式碼範例
樣本1:Local模式下的程式碼範例。
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // 需要設定spark.master為local[N]才能直接運行,N為並發數。 .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { //OSS檔案的讀取。 val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) //RDD寫入。 inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") } finally { sc.stop() } } }
說明執行該代碼前,請您務必檢查是否已添加了
hadoop-fs-oss
依賴,否則會報錯。樣本2:Local模式下的程式碼範例。
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .config("spark.master", "local[4]") // 需要設定spark.master為local[N]才能直接運行,N為並發數。 .config("spark.hadoop.fs.oss.accessKeyId", "") .config("spark.hadoop.fs.oss.accessKeySecret", "") .config("spark.hadoop.fs.oss.endpoint", "oss-cn-beijing.aliyuncs.com") .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { //OSS檔案的讀取。 val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") //OSS檔案的寫入。 // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者 // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡 // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險 val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }
樣本3:Cluster模式下的程式碼範例。
package com.aliyun.odps.spark.examples import java.io.ByteArrayInputStream import com.aliyun.oss.{OSSClientBuilder,OSSClient} import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("SparkOSS") .getOrCreate() val sc = spark.sparkContext try { //OSS檔案的讀取。 val pathIn = "oss://spark-oss/workline.txt" val inputData = sc.textFile(pathIn, 5) val cnt = inputData.count inputData.count() println(s"count: $cnt") // inputData.repartition(1).saveAsTextFile("oss://spark-oss/user/data3") //OSS檔案的寫入。 // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者 // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡 // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險 val ossClient = new OSSClientBuilder().build("oss-cn-beijing.aliyuncs.com", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) val filePath="user/data" ossClient.putObject("spark-oss",filePath , new ByteArrayInputStream(cnt.toString.getBytes())) ossClient.shutdown() } finally { sc.stop() } } }
提交方式:
Local模式下的代碼通過IntelliJ IDEA開發、測試並提交。更多操作資訊,請參見Spark在MaxCompute的運行方式。
在DataWorks上通過ODPS Spark節點提交並運行。詳情請參見開發ODPS Spark任務。
讀MaxCompute寫OSS
通過IntelliJ IDEA工具或DataWorks,實現讀取MaxCompute資料並寫入OSS。
程式碼範例
Local模式下的範例程式碼。
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("Spark2OSS") .config("spark.master", "local[4]")// 需設定spark.master為local[N]才能直接運行,N為並發數 .config("spark.hadoop.odps.project.name", "") .config("spark.hadoop.odps.access.id", "") .config("spark.hadoop.odps.access.key", "") .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.sql.catalogImplementation", "odps") .config("spark.hadoop.fs.oss.accessKeyId","") .config("spark.hadoop.fs.oss.accessKeySecret","") .config("spark.hadoop.fs.oss.endpoint","oss-cn-beijing.aliyuncs.com") .getOrCreate() try{ //通過SparkSql查詢表 val data = spark.sql("select * from user_detail") //展示查詢資料 data.show(10) //將查詢到的資料存放區到一個OSS的檔案中 data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }
Cluster模式下的範例程式碼。
package com.aliyun.odps.spark.examples.userpakage import org.apache.spark.sql.{SaveMode, SparkSession} object SparkODPS2OSS { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("SparkODPS2OSS") .getOrCreate() try{ //通過SparkSql查詢表 val data = spark.sql("select * from user_detail") //展示查詢資料 data.show(10) //將查詢到的資料存放區到一個OSS的檔案中 data.toDF().coalesce(1).write.mode(SaveMode.Overwrite).csv("oss://spark-oss/user/data3") }finally { spark.stop() } } }
提交方式:
Local模式下的代碼通過IntelliJ IDEA開發、測試並提交。
在DataWorks上通過ODPS Spark節點提交並運行。詳情請參見開發ODPS Spark任務。
說明Spark開發環境的配置請參見Spark在MaxCompute的運行方式。