Spark on MaxCompute目前已支援訪問湖倉一體外部資料源,若您想將資料處理作業的環境從Spark更換為MaxCompute,無需再遷移Spark作業資料到MaxCompute,可直接進行訪問,從而降低使用成本。本文為您介紹使用MaxCompute訪問外部資料源的樣本。
訪問基於Hadoop外部資料源的外部項目
MaxCompute SQL訪問外部項目表
-- hadoop_external_project 為外部項目,映射的是EMR的Hive資料庫 -- 訪問非分區表 SELECT * from hadoop_external_project.testtbl; -- 訪問分區表 SELECT * from hadoop_external_project.testtbl_par where b='20220914';Spark on MaxCompute訪問外部項目表
-- 配置項 -- 當前預設關閉對於外表和外部project的支援,需要手動開啟 spark.sql.odps.enableExternalTable=true spark.sql.odps.enableExternalProject=true; -- 指定spark版本 spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0; -- 代碼 import org.apache.spark.sql.SparkSession object external_Project_ReadTableHadoop { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") // 預設300秒,廣播join時廣播等待的時間。 .config("spark.sql.broadcastTimeout", 20 * 60) //在strict 模式下,使用者必須指定至少一個靜態分區,在非strict 模式下,所有分區都允許是動態。 .config("odps.exec.dynamic.partition.mode", "nonstrict") .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com") .getOrCreate() // 訪問外部項目ext_dlf_0713 ; print("=====show tables in hadoop_external_project6=====") spark.sql("show tables in hadoop_external_project6").show() // 讀外部項目非分區表 print("===============hadoop_external_project6.testtbl;================") spark.sql("desc extended hadoop_external_project6.testtbl").show() print("===============hadoop_external_project6.testtbl;================") spark.sql("SELECT * from hadoop_external_project6.testtbl").show() // 讀外部項目分區表 print("===============hadoop_external_project6.testtbl_par;================") spark.sql("desc extended hadoop_external_project6.testtbl_par").show() print("===============hadoop_external_project6.testtbl;================") spark.sql("SELECT * from hadoop_external_project6.testtbl_par where b='20220914'").show() } }
訪問基於資料湖構建和Object Storage Service的外部項目
MaxCompute SQL訪問外部項目表
-- ext_dlf_0713為外部項目,映射的是DLF的資料庫 -- 訪問非分區表 SELECT * from ext_dlf_0713.tbl_oss1;Spark on MaxCompute訪問外部項目表
-- 配置項 -- 當前預設關閉對於外表和外部project的支援,需要手動開啟 spark.sql.odps.enableExternalTable=true; spark.sql.odps.enableExternalProject=true; -- 指定spark版本 spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0; -- 如果是EMR產生的OSS location,增加該參數 spark.hadoop.odps.oss.location.uri.style=emr; -- spark 訪問oss時需要指定oss.endpoint spark.hadoop.odps.oss.endpoint=oss-cn-shanghai-internal.aliyuncs.com; -- 指定region spark.hadoop.odps.region.id=cn-shanghai; spark.hadoop.odps.oss.region.default=cn-shanghai; -- 代碼 import org.apache.spark.sql.{SaveMode, SparkSession} object external_Project_ReadTable { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("external_TableL-on-MaxCompute") // 預設300秒,廣播join時廣播等待的時間。 .config("spark.sql.broadcastTimeout", 20 * 60) //在strict 模式下,使用者必須指定至少一個靜態分區,在非strict 模式下,所有分區都允許是動態。 .config("odps.exec.dynamic.partition.mode", "nonstrict") .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com") .getOrCreate() // 訪問外部項目ext_dlf_0713 ; print("=====show tables in ext_dlf_0713=====") spark.sql("show tables in ext_dlf_0713").show() // 讀外部項目非分區表 print("===============ext_dlf_0713.tbl_oss1;================") spark.sql("desc extended ext_dlf_0713.tbl_oss1").show() print("===============ext_dlf_0713.tbl_oss1;================") spark.sql("SELECT * from ext_dlf_0713.tbl_oss1").show() } }