全部產品
Search
文件中心

MaxCompute:Spark訪問湖倉一體外部資料源

更新時間:Jul 12, 2024

Spark on MaxCompute目前已支援訪問湖倉一體外部資料源,若您想將資料處理作業的環境從Spark更換為MaxCompute,無需再遷移Spark作業資料到MaxCompute,可直接進行訪問,從而降低使用成本。本文為您介紹使用MaxCompute訪問外部資料源的樣本。

訪問基於Hadoop外部資料源的外部項目

  • MaxCompute SQL訪問外部項目表

    -- hadoop_external_project 為外部項目,映射的是EMR的Hive資料庫
    -- 訪問非分區表
    SELECT * from hadoop_external_project.testtbl;
    -- 訪問分區表
    SELECT * from hadoop_external_project.testtbl_par where b='20220914';
  • Spark on MaxCompute訪問外部項目表

    -- 配置項
    -- 當前預設關閉對於外表和外部project的支援,需要手動開啟
    spark.sql.odps.enableExternalTable=true
    spark.sql.odps.enableExternalProject=true;
    -- 指定spark版本
    spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0;
    
    -- 代碼
    import org.apache.spark.sql.SparkSession
    
    object external_Project_ReadTableHadoop {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("external_TableL-on-MaxCompute")
          // 預設300秒,廣播join時廣播等待的時間。
          .config("spark.sql.broadcastTimeout", 20 * 60)
          //在strict 模式下,使用者必須指定至少一個靜態分區,在非strict 模式下,所有分區都允許是動態。
          .config("odps.exec.dynamic.partition.mode", "nonstrict")
          .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com")
          .getOrCreate()
    
        // 訪問外部項目ext_dlf_0713 ;
        print("=====show tables in hadoop_external_project6=====")
        spark.sql("show tables in hadoop_external_project6").show()
    
        // 讀外部項目非分區表
        print("===============hadoop_external_project6.testtbl;================")
        spark.sql("desc extended hadoop_external_project6.testtbl").show()
        print("===============hadoop_external_project6.testtbl;================")
        spark.sql("SELECT * from hadoop_external_project6.testtbl").show()
    
        // 讀外部項目分區表
        print("===============hadoop_external_project6.testtbl_par;================")
        spark.sql("desc extended hadoop_external_project6.testtbl_par").show()
        print("===============hadoop_external_project6.testtbl;================")
        spark.sql("SELECT * from hadoop_external_project6.testtbl_par where b='20220914'").show()
    
      }
    
    }

訪問基於資料湖構建和Object Storage Service的外部項目

  • MaxCompute SQL訪問外部項目表

    -- ext_dlf_0713為外部項目,映射的是DLF的資料庫
    -- 訪問非分區表
    SELECT * from ext_dlf_0713.tbl_oss1;
  • Spark on MaxCompute訪問外部項目表

    -- 配置項
    -- 當前預設關閉對於外表和外部project的支援,需要手動開啟
    spark.sql.odps.enableExternalTable=true;
    spark.sql.odps.enableExternalProject=true;
    -- 指定spark版本
    spark.hadoop.odps.spark.version=spark-2.4.5-odps0.34.0;
    -- 如果是EMR產生的OSS location,增加該參數
    spark.hadoop.odps.oss.location.uri.style=emr;
    -- spark 訪問oss時需要指定oss.endpoint
    spark.hadoop.odps.oss.endpoint=oss-cn-shanghai-internal.aliyuncs.com;
    -- 指定region
    spark.hadoop.odps.region.id=cn-shanghai;
    spark.hadoop.odps.oss.region.default=cn-shanghai;
    
    -- 代碼
    import org.apache.spark.sql.{SaveMode, SparkSession}
    object external_Project_ReadTable {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("external_TableL-on-MaxCompute")
          // 預設300秒,廣播join時廣播等待的時間。
          .config("spark.sql.broadcastTimeout", 20 * 60)
          //在strict 模式下,使用者必須指定至少一個靜態分區,在非strict 模式下,所有分區都允許是動態。
          .config("odps.exec.dynamic.partition.mode", "nonstrict")
          .config("oss.endpoint","oss-cn-shanghai-internal.aliyuncs.com")
          .getOrCreate()
    
        // 訪問外部項目ext_dlf_0713 ;
        print("=====show tables in ext_dlf_0713=====")
        spark.sql("show tables in ext_dlf_0713").show()
    
        // 讀外部項目非分區表
        print("===============ext_dlf_0713.tbl_oss1;================")
        spark.sql("desc extended ext_dlf_0713.tbl_oss1").show()
        print("===============ext_dlf_0713.tbl_oss1;================")
        spark.sql("SELECT * from ext_dlf_0713.tbl_oss1").show()
    
    
      }
    
    }