本文為您介紹如何通過Spark Structured Streaming流式寫入Iceberg表。
前提條件
使用限制
建立的DataLake叢集或Custom叢集需要與Kafka叢集在同一VPC和交換器下,不支援跨VPC。
流式寫入方式
Spark Structured Streaming通過DataStreamWriter介面流式寫資料到Iceberg表,代碼如下。
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()
代碼中的tableIdentifier是中繼資料表名或者表路徑。流式寫入支援以下兩種方式:
append:追加每個批次的資料到Iceberg表,相當於insert into。
complete:使用最新批次的資料完全覆蓋Iceberg,相當於insert overwrite。
樣本
本樣本是從上遊Kafka中讀取資料,寫入Iceberg表,打包放到EMR叢集上通過spark-submit提交執行。
通過Kafka指令碼建立測試使用的topic並準備測試資料。
使用SSH方式登入到Kafka叢集,詳情資訊請參見登入叢集。
執行以下命令,建立名為iceberg_test的topic。
kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create
執行以下命令,生產測試資料。
kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
通過Spark SQL建立測試使用的資料庫iceberg_db和表iceberg_table,詳細操作請參見基礎使用。
建立Maven專案,引入Spark的依賴和檢查編譯Scala代碼的Maven外掛程式,可以在pom.xml中添加如下配置。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
編寫Spark代碼。
以Scala版代碼為例,程式碼範例如下。
重要樣本中資料湖中繼資料的配置參數,根據叢集版本不同,配置的參數不同,Catalog名稱也不同。本樣本以EMR-5.3.0版本為列,其中
dlf_catalog
為Catalog名稱。具體版本對應的配置請參見資料湖中繼資料配置。def main(args: Array[String]): Unit = { // 配置使用資料湖中繼資料。 val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>") val spark = SparkSession .builder() .config(sparkConf) .appName("StructuredSinkIceberg") .getOrCreate() val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint" val bootstrapServers = "192.168.XX.XX:9092" val topic = "iceberg_test" // 從上遊Kafka讀取資料 val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .load() val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // 假設key是以Base64編碼的字串,先解碼為一般字元串 "CAST(value AS STRING) AS data") .select( col("strKey").cast(LongType).alias("id"), // 現在可以安全地將解碼後的字串轉換為Long col("data") ) // 流式寫入Iceberg表 val query = resDF.writeStream .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("path", "dlf_catalog.iceberg_db.iceberg_table") .option("checkpointLocation", checkpointPath) .start() query.awaitTermination() }
請您根據叢集的實際情況,修改如下參數。
參數
描述
checkpointPath
Spark流式寫資料的Checkpoint路徑。
bootstrapServers
Kafka叢集中任一Kafka Broker組件的內網IP地址。
topic
Topic名稱。
打包程式並部署到EMR叢集。
本地調試完成後,通過以下命令打包。
mvn clean install
使用SSH方式登入到叢集,詳情資訊請參見登入叢集。
上傳JAR包至EMR叢集。
本樣本是上傳到EMR叢集的根目錄下。
提交運行Spark作業。
執行以下命令,通過spark-submit提交Spark作業。
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \ --class com.aliyun.iceberg.StructuredSinkIceberg \ iceberg-demos.jar
說明應替換為具體的版本號碼,且版本號碼需與您的Spark和Kafka版本相容。
iceberg-demos.jar為您打包好的JAR包。--class和JAR包請根據您實際資訊修改。
通過Spark SQL查詢資料的變化,詳細操作請參見基礎使用。