全部產品
Search
文件中心

E-MapReduce:基礎使用

更新時間:Jul 01, 2024

本文為您介紹如何在E-MapReduce Hudi中寫資料以及查詢資料。

寫資料

環境配置

EMR-3.32.0以及後續版本中,已經將Hudi相關依賴整合到各個開源組件中,包括Spark、Hive和Presto,因此運行時不需要引入額外的Hudi依賴,只需要在pom檔案中添加Hudi依賴即可。不同的EMR版本使用的Hudi版本不同,詳細資料請參見下表。

Hudi版本

EMR版本

0.6.0

  • EMR 3.32.0~EMR 3.35.0

  • EMR 4.5.0~EMR 4.9.0

  • EMR 5.1.0

0.8.0

  • EMR 3.36.1~EMR 3.37.1

  • EMR 5.2.1~EMR 5.3.1

0.9.0

  • EMR 3.38.0~EMR 3.38.3

  • EMR 5.4.0~EMR 5.4.3

0.10.0

  • EMR 3.39.1~EMR 3.40.0

  • EMR 4.10.0

  • EMR 5.5.0~EMR 5.6.0

0.11.0

EMR 3.42.0,EMR 5.8.0

0.12.0

  • EMR 5.9.0~EMR 5.10.1

  • EMR 3.43.0~EMR 3.44.1

0.12.2

  • EMR 5.11.0~EMR 5.12.1

  • EMR 3.45.0~EMR 3.46.1

0.13.1

  • EMR 5.13.0、EMR 5.14.0

  • EMR 3.47.0、EMR 3.48.0

<dependency>
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi-spark_2.11</artifactId>
   <!-- for spark3 <artifactId>hudi-spark_2.12</artifactId> -->
   <version>${hudi_version}</version>
  <scope>provided</scope>
</dependency>

Insert和Update

樣本如下。

 val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("hudi test")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

import spark.implicits._
    val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
      .toDF("id", "name", "price", "version", "dt")

    df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      // .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) for update
      .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // for insert
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
      .option(META_SYNC_ENABLED_OPT_KEY, "true")
      .option(HIVE_USE_JDBC_OPT_KEY, "false")
      .option(HIVE_DATABASE_OPT_KEY, "default")
      .option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
      .option(INSERT_PARALLELISM, "8")
      .option(UPSERT_PARALLELISM, "8")
      .mode(Overwrite)
      .save("/tmp/hudi/h0")

Delete

樣本如下。

df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // for delete
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(DELETE_PARALLELISM, "8")
      .mode(Append)
      .save("/tmp/hudi/h0")

查詢資料

EMR引擎環境中已整合Hudi相關的軟體包,您無需在Spark、Presto和Hive查詢引擎中額外引入相關依賴。

Hive和Presto查詢Hudi表,需要在寫入階段開啟中繼資料同步功能,即設定META_SYNC_ENABLED_OPT_KEY為true。

對於社區版Hudi,COW和MOR表需要設定hive.input.formatorg.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat。EMR版本對於COW類型表,可以不用設定input format,支援自動適配Hudi的input format功能。