全部產品
Search
文件中心

E-MapReduce:Spark流式寫入Iceberg

更新時間:Jul 06, 2024

本文為您介紹如何通過Spark Structured Streaming流式寫入Iceberg表。

前提條件

  • 已在E-MapReduce控制台上,建立DataLake叢集或Custom叢集,詳情請參見建立叢集

  • 已在E-MapReduce控制台上,建立選擇了Kafka服務的DataFlow叢集,詳情請參見建立叢集

使用限制

建立的DataLake叢集或Custom叢集需要與Kafka叢集在同一VPC和交換器下,不支援跨VPC。

流式寫入方式

Spark Structured Streaming通過DataStreamWriter介面流式寫資料到Iceberg表,代碼如下。

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()
說明

代碼中的tableIdentifier是中繼資料表名或者表路徑。流式寫入支援以下兩種方式:

  • append:追加每個批次的資料到Iceberg表,相當於insert into。

  • complete:使用最新批次的資料完全覆蓋Iceberg,相當於insert overwrite。

樣本

本樣本是從上遊Kafka中讀取資料,寫入Iceberg表,打包放到EMR叢集上通過spark-submit提交執行。

  1. 通過Kafka指令碼建立測試使用的topic並準備測試資料。

    1. 使用SSH方式登入到Kafka叢集,詳情資訊請參見登入叢集

    2. 執行以下命令,建立名為iceberg_test的topic。

      kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create
    3. 執行以下命令,生產測試資料。

      kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
  2. 通過Spark SQL建立測試使用的資料庫iceberg_db和表iceberg_table,詳細操作請參見基礎使用

  3. 建立Maven專案,引入Spark的依賴和檢查編譯Scala代碼的Maven外掛程式,可以在pom.xml中添加如下配置。

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- the Maven Scala plugin will compile Scala source files -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  4. 編寫Spark代碼。

    以Scala版代碼為例,程式碼範例如下。

    重要

    樣本中資料湖中繼資料的配置參數,根據叢集版本不同,配置的參數不同,Catalog名稱也不同。本樣本以EMR-5.3.0版本為列,其中dlf_catalog為Catalog名稱。具體版本對應的配置請參見資料湖中繼資料配置

    def main(args: Array[String]): Unit = {
    
      // 配置使用資料湖中繼資料。
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
        .builder()
        .config(sparkConf)
        .appName("StructuredSinkIceberg")
        .getOrCreate()
    
      val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "iceberg_test"
    
      // 從上遊Kafka讀取資料
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topic)
        .load()
    
      val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // 假設key是以Base64編碼的字串,先解碼為一般字元串
          "CAST(value AS STRING) AS data")
          .select(
            col("strKey").cast(LongType).alias("id"), // 現在可以安全地將解碼後的字串轉換為Long
            col("data")
          )
    
      // 流式寫入Iceberg表
      val query = resDF.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "dlf_catalog.iceberg_db.iceberg_table")
        .option("checkpointLocation", checkpointPath)
        .start()
    
      query.awaitTermination()
    }

    請您根據叢集的實際情況,修改如下參數。

    參數

    描述

    checkpointPath

    Spark流式寫資料的Checkpoint路徑。

    bootstrapServers

    Kafka叢集中任一Kafka Broker組件的內網IP地址。

    topic

    Topic名稱。

  5. 打包程式並部署到EMR叢集。

    1. 本地調試完成後,通過以下命令打包。

      mvn clean install
    2. 使用SSH方式登入到叢集,詳情資訊請參見登入叢集

    3. 上傳JAR包至EMR叢集。

      本樣本是上傳到EMR叢集的根目錄下。

  6. 提交運行Spark作業。

    1. 執行以下命令,通過spark-submit提交Spark作業。

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \
       --class com.aliyun.iceberg.StructuredSinkIceberg \
       iceberg-demos.jar
      說明
      • 應替換為具體的版本號碼,且版本號碼需與您的Spark和Kafka版本相容。

      • iceberg-demos.jar為您打包好的JAR包。--class和JAR包請根據您實際資訊修改。

    2. 通過Spark SQL查詢資料的變化,詳細操作請參見基礎使用