This topic describes how to use Iceberg in an E-MapReduce (EMR) cluster.
Background information
In this topic, metadata is managed by using Data Lake Formation (DLF). For more information about metadata configurations, see Configuration of DLF metadata.
Prerequisites
A Hadoop cluster of EMR V5.3.0 or a later minor version is created. For more information, see Create a cluster.
Limits
Spark SQL extensions of Iceberg are not supported in Spark 2.4. If you use a cluster of EMR V3.38.X or a later minor version, you can use only the Spark DataFrame API to perform operations related to Iceberg. This topic describes how to use Spark SQL to perform operations related to Iceberg in a cluster of EMR V5.3.0 or a later minor version.
Procedure
Log on to your cluster in SSH mode. For more information, see Log on to a cluster.
Run the following commands to configure parameters related to Iceberg.
Before you can use Spark SQL to perform operations related to Iceberg, you must configure a catalog. The following commands show how to configure a catalog. The default name of the catalog and the parameters that you must configure vary based on the version of your cluster. For more information, see Configuration of DLF metadata. In the following configurations, DLF is used to manage metadata.
NoteThe catalog configuration is prefixed with spark.sql.catalog.<catalog_name>. <catalog_name> indicates the name of the catalog.
EMR V5.6.0 or a later minor version
spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
EMR V5.5.X
spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \ --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
NoteYou can leave the spark.sql.catalog.dlf.warehouse parameter empty. If you do not configure this parameter, the default warehouse path is used.
EMR V5.3.X to EMR V5.4.X
You must configure an AccessKey pair. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.
spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \ --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \ --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \ --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \ --conf spark.sql.catalog.dlf_catalog.access.key.id=<ALIBABA_CLOUD_ACCESS_KEY_ID> \ --conf spark.sql.catalog.dlf_catalog.access.key.secret=<ALIBABA_CLOUD_ACCESS_KEY_SECRET> \ --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \ --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \ --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>
If the output contains the following information, the Spark SQL CLI is started:
spark-sql>
Perform basic operations.
ImportantIn the following example,
<catalog_name>
specifies the name of your catalog. For clusters of EMR V5.6.0 or a later minor version, the catalog name is iceberg. For clusters of other versions, see information described in Step 2. The catalog configuration is prefixed with spark.sql.catalog.<catalog_name>.Create a database
CREATE DATABASE IF NOT EXISTS <catalog_name>.iceberg_db;
Create a table
CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample( id BIGINT COMMENT 'unique id', data STRING ) USING iceberg;
The statement that is used to create an Iceberg table can include the COMMENT, PARTITIONED BY, LOCATION, and TBLPROPERTIES clauses. The following code shows how to use the TBLPROPERTIES clause to configure table-level properties:
CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample( id BIGINT COMMENT 'unique id', data STRING ) USING iceberg TBLPROPERTIES ( 'write.format.default'='parquet' );
Insert data
INSERT INTO <catalog_name>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
Query data
SELECT * FROM <catalog_name>.iceberg_db.sample; SELECT count(1) AS count, data FROM <catalog_name>.iceberg_db.sample GROUP BY data;
Update data
UPDATE <catalog_name>.iceberg_db.sample SET data = 'x' WHERE id = 3;
Delete data
DELETE FROM <catalog_name>.iceberg_db.sample WHERE id = 3;