全部產品
Search
文件中心

E-MapReduce:Spark批式讀寫Iceberg

更新時間:Jul 01, 2024

本文以Spark 3.x操作Iceberg表為例,介紹如何通過Spark DataFrame API以批處理的方式讀寫Iceberg表。

前提條件

已建立Hadoop叢集,詳情請參見建立叢集
說明 此文檔僅適用於EMR-3.38.0及後續版本與EMR-5.4.0及後續版本的Hadoop叢集。

操作步驟

  1. 建立Maven專案,引入Pom依賴。

    引入Spark及Iceberg的依賴,以下程式碼範例指定了Spark 3.1.1與Iceberg 0.12.0版本,使用provided引包編譯,運行時使用叢集上的軟體包。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>0.12.0</version>
        <scope>provided</scope>
    </dependency>
    說明

    由於EMR叢集的Iceberg軟體包與開源依賴包存在一定差異,例如EMR Iceberg預設整合了DLF Catalog,所以建議您在本地使用provided方式引入開源Iceberg依賴進行代碼編譯,打包放到叢集上運行時使用叢集環境中的依賴。

  2. 配置Catalog。

    使用Spark API操作Iceberg表,首先需要配置Catalog,在SparkConf中加入必要配置項即可。

    以下是在Spark SQL中使用資料湖中繼資料的配置,叢集版本不同預設的Catalog名稱不同,需要配置的參數也不同,具體請參見資料湖中繼資料配置

    • EMR-3.40及後續版本和EMR-5.6.0及後續版本

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
    • EMR-3.39.x和EMR-5.5.x版本

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")
    • EMR-3.38.x版本和EMR-5.3.x~EMR-5.4.x版本(包含)

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
      說明

      運行程式碼範例前必須先配置環境變數。關於如何配置環境變數,請參見配置環境變數

  3. 寫表。

    Spark 3.x支援DataFrameWriterV2 API寫入資料到Iceberg表。目前v1 DataFrame API已不推薦使用,以下代碼以V2 API寫Iceberg表sample為例。

    以下樣本中的<yourCatalogName>為Catalog的名稱,請根據實際情況修改Catalog名稱。

    建立資料表

    val df: DataFrame = ...
    df.writeTo("<yourCatalogName>.iceberg_db.sample").create()
    說明

    建立表支援create、replace以及createOrReplace語義,另外支援通過tableProperty和partitionedBy配置表的屬性與分區欄位。

    您可以通過以下命令追加或覆蓋資料:

    • 追加資料

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").append()
    • 覆蓋資料

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()
  4. 讀表。

    請根據您Spark的版本,選擇讀表的方式:

    • Spark 3.x(推薦)

       val df = spark.table("<yourCatalogName>.iceberg_db.sample")
    • Spark 2.4

      val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")

樣本

本樣本是使用Spark DataFrame API批式讀寫Iceberg表。

重要

樣本中資料湖中繼資料的配置參數,根據叢集版本不同,配置的參數不同,預設的Catalog名稱也不同。本樣本以EMR-5.3.0版本為列,其中dlf_catalog為Catalog名稱。具體版本對應的配置請參見資料湖中繼資料配置

  1. 通過Spark SQL建立測試使用的資料庫iceberg_db,詳細資料請參見基礎使用

  2. 編寫Spark代碼。

    以Scala版代碼為例,程式碼範例如下。

    def main(args: Array[String]): Unit = {
    
      // 配置使用資料湖中繼資料。
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergReadWriteTest")
      .getOrCreate()
    
      // 從DataFrame中建立或替換Iceberg表
      val firstDF = spark.createDataFrame(Seq(
      (1, "a"), (2, "b"), (3, "c")
      )).toDF("id", "data")
    
      firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace()
    
      // 將DataFrame寫入Iceberg表
      val secondDF = spark.createDataFrame(Seq(
      (4, "d"), (5, "e"), (6, "f")
      )).toDF("id", "data")
    
      secondDF.writeTo("dlf_catalog.iceberg_db.sample").append()
    
      // 讀Iceberg表
      val icebergTable = spark.table("dlf_catalog.iceberg_db.sample")
    
      icebergTable.show()
    }
  3. 打包程式並部署到EMR叢集。

    1. 檢查編譯Scala代碼的Maven外掛程式,可以在pom.xml中配置如下外掛程式。

      <build>
          <plugins>
              <!-- the Maven Scala plugin will compile Scala source files -->
              <plugin>
                  <groupId>net.alchim31.maven</groupId>
                  <artifactId>scala-maven-plugin</artifactId>
                  <version>3.2.2</version>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>testCompile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    2. 您可以在本地完成代碼調試後,通過如下命令打包。

      mvn clean install
    3. 使用SSH方式登入到叢集,詳情資訊請參見登入叢集

    4. 上傳JAR包至EMR叢集。

      本樣本是上傳到EMR叢集的根目錄下。

  4. 執行以下命令,通過spark-submit運行Spark作業。

    spark-submit \
     --master yarn \
     --deploy-mode cluster \
     --driver-memory 1g \
     --executor-cores 1 \
     --executor-memory 1g \
     --num-executors 1 \
     --class com.aliyun.iceberg.IcebergTest \
     iceberg-demos.jar
    說明

    iceberg-demos.jar為您打包好的JAR包。--class和JAR包請根據您實際資訊修改。

    運行結果如下。

    +---+----+
    | id|data|
    +---+----+
    |  4|   d|
    |  1|   a|
    |  5|   e|
    |  6|   f|
    |  2|   b|
    |  3|   c|
    +---+----+