All Products
Search
Document Center

E-MapReduce:Iceberg connector

Last Updated:Jul 05, 2024

Iceberg is an open table format for data lakes. You can use an Iceberg connector to query data files in the Iceberg format.

Background information

For information about Iceberg, see Overview.

Prerequisites

A DataLake cluster or Hadoop cluster is created, and the Presto service is selected. For more information, see Create a cluster.

Limits

Only Hadoop clusters of E-MapReduce (EMR) V3.38.0 or a later minor version and DataLake clusters support Iceberg connectors.

Configure an Iceberg connector

For information about how to modify the configurations of an Iceberg connector, see Configure connectors.

Default configurations of the Iceberg connector

Log on to the EMR console and go to the Configure tab of the Trino service page. On the Configure tab, click iceberg.properties. The hive.metastore.uri configuration item is displayed on the iceberg.properties tab. This configuration item specifies the uniform resource identifier (URI) of Hive Metastore that you can access based on a Thrift protocol. You can modify this configuration item based on your business requirements.

Add Iceberg configurations

On the Configure tab of the Trino service page, click the iceberg.properties tab. Then, click Add Configuration Item.

Configuration item

Description

iceberg.file-format

The file format in which data of Iceberg tables is stored. Valid values:

  • ORC (default value)

  • PARQUET

iceberg.compression-codec

The compression format that is used when files are written. Valid values:

  • GZIP (default value)

  • ZSTD

  • LZ4

  • SNAPPY

  • NONE

iceberg.max-partitions-per-writer

The maximum number of partitions whose data can be processed by each writer. Default value: 100.

Example: Query data in an Iceberg table

You can use the basic SQL syntax of Trino to query data in an Iceberg table.

  1. Log on to your cluster in SSH mode. For more information, see Log on to a cluster.

  2. Connect to the Trino client. For more information, see Use the CLI to connect to Trino.

  3. Run the following command to create a schema:

    create schema iceberg.testdb;
  4. Run the following command to create a table named iceberg_test:

    create table iceberg.testdb.iceberg_test(id int);
  5. Run the following command to insert data into the iceberg_test table:

    insert into iceberg.testdb.iceberg_test values(1),(2);
    Note

    If you select DLF Unified Metadata for Metadata when you create a cluster, you cannot write data to an Iceberg table.

  6. Run the following command to query data from the table:

    select * from iceberg.testdb.iceberg_test;

    The following output is returned:

     id
    ----
     1
     2

SQL syntax

An Iceberg connector can be used to read data or metadata from or write data or metadata to an Iceberg table. In addition to basic SQL statements, the Iceberg connector also supports the SQL statements that are described in the following table.

SQL statement

Description

INSERT

See INSERT in the official documentation.

DELETE

See the Delete data by partition section in this topic and DELETE in the official documentation.

Statements related to schema and table management

See the Partition a table section in this topic and Schema and table management in the official documentation.

Statements related to materialized view management

See the Materialized views section in this topic and Materialized view management in the official documentation.

Statements related to view management

See View management in the official documentation.

Partition a table

An Iceberg connector can be used to partition a table based on functions. The following table describes the functions.

Function

Description

year(ts)

Partitions a table by year. This function returns the difference in years between the value of ts and January 1, 1970.

month(ts)

Partitions a table by month. This function returns the difference in months between the value of ts and January 1, 1970.

day(ts)

Partitions a table by day. This function returns the difference in days between the value of ts and January 1, 1970.

hour(ts)

Partitions a table by hour. This function returns a timestamp based on the value of ts. The minute and second parts of the value of ts are ignored.

bucket(x, nbuckets)

Performs hash partitioning on data and allocates data to a specified number of buckets. This function returns the integral hash value of x. The integral hash value of x is within the range of [0, nbuckets - 1).

truncate(s, nchars)

Returns the first nchars characters of s.

Example: Partition a table named customer_orders based on the month that is included in the value of order_date, the hash value of account_number (number of buckets: 10), and country.

CREATE TABLE iceberg.testdb.customer_orders (
    order_id BIGINT,
    order_date DATE,
    account_number BIGINT,
    customer VARCHAR,
    country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country'])

Delete data by partition

For a partitioned table, if you include a WHERE clause in a DELETE statement to filter partitions, the Iceberg connector deletes the partitions that meet filter conditions in the partitioned table. For example, execute the following statement to delete all partitions that meet the filter condition of country=US from the customer_orders table:

DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US'

You can use the Iceberg connector to delete data only by partition. For example, the following statement fails to be executed because the WHERE clause in the statement is used to filter specific rows in partitions:

DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods'

Roll back to a snapshot

Snapshots are supported for Iceberg tables.

The Iceberg connector provides a system snapshot table for each Iceberg table. Snapshots of each Iceberg table are identified by snapshot IDs of the BIGINT data type. For example, you can execute the following statement to query the latest snapshot ID of the customer_orders table:

SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1

You can execute the following statement to roll back the state of the table to a snapshot based on the specified snapshot ID:

CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****)

Query partitions of a system table

You can use the Iceberg connector to query partitions of a system table. For example, you can execute the following statement to query partitions of the customer_orders table. The information about partitions includes the maximum value and minimum value of each partition key column.

SELECT * FROM iceberg.testdb."customer_orders$partitions"

Properties of Iceberg tables

The following table describes the properties of Iceberg tables.

Property

Description

format

Specifies the file format in which data of Iceberg tables is stored. Valid values:

  • ORC (default value)

  • PARQUET

partitioning

Specifies partition key columns.

For example, if a table contains the partition key columns c1 and c2, this property is set to ARRAY['c1', 'c2'].

location

Specifies the URI of the file system that stores tables.

In the following sample statement, the format property is set to PARQUET, the partitioning property is set to ARRAY['c1', 'c2'], and the location property is set to /var/my_tables/test_table:

CREATE TABLE test_table (
    c1 integer,
    c2 date,
    c3 double)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['c1', 'c2'],
    location = '/var/my_tables/test_table')

Manage materialized views

The Iceberg connector supports materialized views. Each materialized view consists of a view definition and an Iceberg table. The table name is stored as a materialized view property, and data is stored in the Iceberg table.

The following table describes the statements that can be executed on materialized views.

Statement

Description

CREATE MATERIALIZED VIEW

Creates a materialized view.

You can use the properties of the Iceberg table to determine the storage format of the Iceberg table. For example, set the format property to ORC and the partitioning property to ARRAY['event_date'] in the WITH clause to store data of the Iceberg table in ORC files and partition the table by day.

WITH ( format = 'ORC', partitioning = ARRAY['event_date'] )

REFRESH MATERIALIZED VIEW

Updates data in a materialized view.

After you execute the statement, the data of the Iceberg table is deleted, and then the execution results of a query defined by the materialized view are inserted into the materialized view.

Important

A small-sized time window exists between the delete and insert operations. When the materialized view is empty, the materialized view remains empty if the insert operation fails.

You can also execute the statement to delete the definition and Iceberg table of a materialized view.