This topic describes how to write data to and query data from E-MapReduce (EMR) Hudi.
Write data
Environment configuration
In EMR V3.32.0 and later, dependencies related to Hudi are integrated into various open source components, such as Spark, Hive, and Presto. When you run a job to write data to Hudi, you do not need to introduce the dependencies again. You need to only add the following Hudi dependency to the pom.xml file. The Hudi version that is used varies based on the EMR version. The following table describes the mapping between Hudi versions and EMR versions.
Hudi version | EMR version |
0.6.0 |
|
0.8.0 |
|
0.9.0 |
|
0.10.0 |
|
0.11.0 | EMR V3.42.0 or EMR V5.8.0 |
0.12.0 |
|
0.12.2 |
|
0.13.1 |
|
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark_2.11</artifactId>
<!-- for spark3 <artifactId>hudi-spark_2.12</artifactId> -->
<version>${hudi_version}</version>
<scope>provided</scope>
</dependency>
Insert or update data
Sample code:
val spark = SparkSession
.builder()
.master("local[*]")
.appName("hudi test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
.toDF("id", "name", "price", "version", "dt")
df.write.format("hudi")
.option(TABLE_NAME, "hudi_test_0")
// .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) for update
.option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // for insert
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "version")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
.option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
.option(META_SYNC_ENABLED_OPT_KEY, "true")
.option(HIVE_USE_JDBC_OPT_KEY, "false")
.option(HIVE_DATABASE_OPT_KEY, "default")
.option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
.option(INSERT_PARALLELISM, "8")
.option(UPSERT_PARALLELISM, "8")
.mode(Overwrite)
.save("/tmp/hudi/h0")
Delete data
Sample code:
df.write.format("hudi")
.option(TABLE_NAME, "hudi_test_0")
.option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // for delete
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "version")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
.option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(DELETE_PARALLELISM, "8")
.mode(Append)
.save("/tmp/hudi/h0")
Query data
Software packages related to Hudi are integrated into various open source components such as Spark, Presto, and Hive in EMR. You can query data in a Hudi table without the need to introduce additional dependencies.
If you use Hive or Presto to query Hudi tables, you must enable metadata synchronization at the write stage by setting the META_SYNC_ENABLED_OPT_KEY parameter to true.
If you use open source Hudi, you must set the hive.input.format parameter to org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat for both Copy on Write and Merge on Read tables. If you use EMR Hudi, you do not need to specify an input format for Copy on Write tables. Copy on Write tables automatically adapt to input formats of Hudi.