全部产品
Search
文档中心

开源大数据平台E-MapReduce:Spark流式写入Iceberg

更新时间:Jul 05, 2024

本文为您介绍如何通过Spark Structured Streaming流式写入Iceberg表。

前提条件

  • 已在E-MapReduce控制台上,创建DataLake集群或Custom集群,详情请参见创建集群

  • 已在E-MapReduce控制台上,创建选择了Kafka服务的DataFlow集群,详情请参见创建集群

使用限制

创建的DataLake集群或Custom集群需要与Kafka集群在同一VPC和交换机下,不支持跨VPC。

流式写入方式

Spark Structured Streaming通过DataStreamWriter接口流式写数据到Iceberg表,代码如下。

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()
说明

代码中的tableIdentifier是元数据表名或者表路径。流式写入支持以下两种方式:

  • append:追加每个批次的数据到Iceberg表,相当于insert into。

  • complete:使用最新批次的数据完全覆盖Iceberg,相当于insert overwrite。

示例

本示例是从上游Kafka中读取数据,写入Iceberg表,打包放到EMR集群上通过spark-submit提交执行。

  1. 通过Kafka脚本创建测试使用的topic并准备测试数据。

    1. 使用SSH方式登录到Kafka集群,详情信息请参见登录集群

    2. 执行以下命令,创建名为iceberg_test的topic。

      kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create
    3. 执行以下命令,生产测试数据。

      kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
  2. 通过Spark SQL创建测试使用的数据库iceberg_db和表iceberg_table,详细操作请参见基础使用

  3. 新建Maven项目,引入Spark的依赖和检查编译Scala代码的Maven插件,可以在pom.xml中添加如下配置。

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- the Maven Scala plugin will compile Scala source files -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  4. 编写Spark代码。

    以Scala版代码为例,代码示例如下。

    重要

    示例中数据湖元数据的配置参数,根据集群版本不同,配置的参数不同,Catalog名称也不同。本示例以EMR-5.3.0版本为列,其中dlf_catalog为Catalog名称。具体版本对应的配置请参见数据湖元数据配置

    def main(args: Array[String]): Unit = {
    
      // 配置使用数据湖元数据。
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
        .builder()
        .config(sparkConf)
        .appName("StructuredSinkIceberg")
        .getOrCreate()
    
      val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "iceberg_test"
    
      // 从上游Kafka读取数据
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topic)
        .load()
    
      val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // 假设key是以Base64编码的字符串,先解码为普通字符串
          "CAST(value AS STRING) AS data")
          .select(
            col("strKey").cast(LongType).alias("id"), // 现在可以安全地将解码后的字符串转换为Long
            col("data")
          )
    
      // 流式写入Iceberg表
      val query = resDF.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "dlf_catalog.iceberg_db.iceberg_table")
        .option("checkpointLocation", checkpointPath)
        .start()
    
      query.awaitTermination()
    }

    请您根据集群的实际情况,修改如下参数。

    参数

    描述

    checkpointPath

    Spark流式写数据的Checkpoint路径。

    bootstrapServers

    Kafka集群中任一Kafka Broker组件的内网IP地址。

    topic

    Topic名称。

  5. 打包程序并部署到EMR集群。

    1. 本地调试完成后,通过以下命令打包。

      mvn clean install
    2. 使用SSH方式登录到集群,详情信息请参见登录集群

    3. 上传JAR包至EMR集群。

      本示例是上传到EMR集群的根目录下。

  6. 提交运行Spark作业。

    1. 执行以下命令,通过spark-submit提交Spark作业。

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \
       --class com.aliyun.iceberg.StructuredSinkIceberg \
       iceberg-demos.jar
      说明
      • 应替换为具体的版本号,且版本号需与您的Spark和Kafka版本兼容。

      • iceberg-demos.jar为您打包好的JAR包。--class和JAR包请根据您实际信息修改。

    2. 通过Spark SQL查询数据的变化,详细操作请参见基础使用