All Products
Search
Document Center

E-MapReduce:Use Spark to write data to an Iceberg table in streaming mode

Last Updated:Jul 22, 2024

This topic describes how to write data to an Iceberg table by using Spark Structured Streaming.

Prerequisites

  • A DataLake cluster or a custom cluster is created. For more information, see Create a cluster.

  • A Dataflow cluster that contains the Kafka service is created. For more information, see Create a cluster.

Limits

The DataLake cluster or the custom cluster and the Dataflow Kafka cluster must be deployed in the same vSwitch of the same virtual private cloud (VPC).

Write data to an Iceberg table in streaming mode

Write data to an Iceberg table by calling the DataStreamWriter API in Spark Structured Streaming.

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()
Note

The tableIdentifier parameter in the code specifies the name of a metadata table or the path of a metadata table. You can use one of the following methods to write data to an Iceberg table in streaming mode:

  • append: appends data in each batch to an Iceberg table. The method is equivalent to the INSERT INTO operation.

  • complete: overwrites data in an Iceberg table with data in the latest batch. The method is equivalent to the INSERT OVERWRITE operation.

Examples

This section provides an example on how to read data from the Dataflow cluster and write the data to an Iceberg table. You can run the spark-submit command to run a Spark job to implement data read and write after you package the related code and upload the packaged code to your EMR cluster.

  1. Use a Kafka script to create a topic for a test and prepare test data.

    1. Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.

    2. Run the following command to create a topic named iceberg_test:

      kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create
    3. Run the following command to prepare test data:

      kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
  2. Use Spark SQL to create a database named iceberg_db and a table named iceberg_table for the test. For more information, see Use Iceberg.

  3. Create a Maven project, add the dependencies of Spark, and add the Maven plug-ins that are used to compile the code in Scala. Sample configurations in the pom.xml file:

    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>3.1.2</version>
            </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- the Maven Scala plugin will compile Scala source files -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  4. Write Spark code.

    Sample code in Scala:

    Important

    The parameters and the default name of the catalog vary based on the version of your cluster. In this example, DLF is used to manage metadata. In this example, an EMR V5.3.0 cluster and a catalog named dlf_catalog are used. For more information, see Configuration of DLF metadata.

    def main(args: Array[String]): Unit = {
    
      // Configure the parameters for the catalog. 
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
        .builder()
        .config(sparkConf)
        .appName("StructuredSinkIceberg")
        .getOrCreate()
    
      val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "iceberg_test"
    
      // Read data from the Dataflow cluster.
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topic)
        .load()
    
      val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // Decode the Base64-encoded strings into common strings.
          "CAST(value AS STRING) AS data")
          .select(
            col("strKey").cast(LongType).alias("id"), // Convert strings of the STRING type to strings of the LONG type.
            col("data")
          )
    
      // Write data to the Iceberg table in streaming mode.
      val query = resDF.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "dlf_catalog.iceberg_db.iceberg_table")
        .option("checkpointLocation", checkpointPath)
        .start()
    
      query.awaitTermination()
    }

    You can change the values of the parameters described in the following table based on your business requirements.

    Parameter

    Description

    checkpointPath

    The checkpoint path of the data that is written by using Spark Structured Streaming.

    bootstrapServers

    The private IP address of a Kafka broker in the Kafka cluster.

    topic

    The name of the topic.

  5. Package the code, and deploy the code to the EMR cluster.

    1. After you debug the code on your on-premises machine, run the following command to package the code:

      mvn clean install
    2. Log on to your EMR cluster in SSH mode. For more information, see Log on to a cluster.

    3. Upload the JAR package to the EMR cluster.

      In this example, the JAR package is uploaded to the root directory of the EMR cluster.

  6. Submit and run a Spark job.

    1. Run the spark-submit command to run the Spark job:

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \
       --class com.aliyun.iceberg.StructuredSinkIceberg \
       iceberg-demos.jar
      Note
      • Replace <version> in the preceding code with a specific version. spark-sql-kafka must be compatible with Spark and Kafka.

      • In this example, a JAR package named iceberg-demos.jar is used. You can change the value of the --class parameter and the name of the JAR package based on your business requirements.

    2. Use Spark SQL to query data changes. For more information, see Basic usage.