All Products
Search
Document Center

E-MapReduce:Basic usage

Last Updated:Aug 12, 2024

This topic describes how to use Iceberg in an E-MapReduce (EMR) cluster.

Background information

In this topic, metadata is managed by using Data Lake Formation (DLF). For more information about metadata configurations, see Configuration of DLF metadata.

Prerequisites

A Hadoop cluster of EMR V5.3.0 or a later minor version is created. For more information, see Create a cluster.

Limits

Spark SQL extensions of Iceberg are not supported in Spark 2.4. If you use a cluster of EMR V3.38.X or a later minor version, you can use only the Spark DataFrame API to perform operations related to Iceberg. This topic describes how to use Spark SQL to perform operations related to Iceberg in a cluster of EMR V5.3.0 or a later minor version.

Procedure

  1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Run the following commands to configure parameters related to Iceberg.

    Before you can use Spark SQL to perform operations related to Iceberg, you must configure a catalog. The following commands show how to configure a catalog. The default name of the catalog and the parameters that you must configure vary based on the version of your cluster. For more information, see Configuration of DLF metadata. In the following configurations, DLF is used to manage metadata.

    Note

    The catalog configuration is prefixed with spark.sql.catalog.<catalog_name>. <catalog_name> indicates the name of the catalog.

    • EMR V5.6.0 or a later minor version

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
    • EMR V5.5.X

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
       --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
      Note

      You can leave the spark.sql.catalog.dlf.warehouse parameter empty. If you do not configure this parameter, the default warehouse path is used.

    • EMR V5.3.X to EMR V5.4.X

      You must configure an AccessKey pair. Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \
       --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \
       --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \
       --conf spark.sql.catalog.dlf_catalog.access.key.id=<ALIBABA_CLOUD_ACCESS_KEY_ID> \
       --conf spark.sql.catalog.dlf_catalog.access.key.secret=<ALIBABA_CLOUD_ACCESS_KEY_SECRET> \
       --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \
       --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>

    If the output contains the following information, the Spark SQL CLI is started:

    spark-sql>
  3. Perform basic operations.

    Important

    In the following example, <catalog_name> specifies the name of your catalog. For clusters of EMR V5.6.0 or a later minor version, the catalog name is iceberg. For clusters of other versions, see information described in Step 2. The catalog configuration is prefixed with spark.sql.catalog.<catalog_name>.

    • Create a database

      CREATE DATABASE IF NOT EXISTS <catalog_name>.iceberg_db;
    • Create a table

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg;

      The statement that is used to create an Iceberg table can include the COMMENT, PARTITIONED BY, LOCATION, and TBLPROPERTIES clauses. The following code shows how to use the TBLPROPERTIES clause to configure table-level properties:

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg
      TBLPROPERTIES (
          'write.format.default'='parquet'
      );
    • Insert data

      INSERT INTO <catalog_name>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
    • Query data

      SELECT * FROM <catalog_name>.iceberg_db.sample;
      SELECT count(1) AS count, data FROM <catalog_name>.iceberg_db.sample GROUP BY data;
    • Update data

      UPDATE <catalog_name>.iceberg_db.sample SET data = 'x' WHERE id = 3;
    • Delete data

      DELETE FROM <catalog_name>.iceberg_db.sample WHERE id = 3;