All Products
Search
Document Center

Realtime Compute for Apache Flink:Apache Iceberg connector

Last Updated:Jul 17, 2024

This topic describes how to use the Apache Iceberg connector.

Background information

Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a computing engine of the open source big data ecosystem, such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto, to analyze data in your data lake.

Item

Description

Table type

Source table and sink table

Running mode

Batch mode and streaming mode

Data format

N/A

Metric

N/A

API type

SQL API

Data update or deletion in a sink table

Supported

Features

Apache Iceberg provides the following core capabilities:

  • Builds a low-cost lightweight data lake storage service based on HDFS or OSS.

  • Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.

  • Supports historical version backtracking.

  • Supports efficient data filtering.

  • Supports schema evolution.

  • Supports partition evolution.

Note

You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Apache Iceberg connector. The Apache Iceberg connector must be used together with Data Lake Formation (DLF) catalogs. For more information, see Manage DLF catalogs.

  • The Apache Iceberg connector supports the Apache Iceberg table formats of version 1 and version 2. For more information, see Iceberg Table Spec.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports the Apache Iceberg table format of version 2.

  • If the streaming read mode is enabled, only Apache Iceberg tables in which data is written in Append Only mode can be used as source tables.

Syntax

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the source table.

    STRING

    Yes

    No default value

    Set the value to iceberg.

    catalog-name

    The name of the catalog.

    STRING

    Yes

    No default value

    Set the value to a custom name.

    catalog-database

    The name of the database.

    STRING

    Yes

    default

    Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db.

    Note

    If you have not created a DLF database, create one.

    io-impl

    The name of the implementation class in the distributed file system.

    STRING

    Yes

    No default value

    Set the value to org.apache.iceberg.aliyun.oss.OSSFileIO.

    oss.endpoint

    The endpoint of your OSS bucket.

    STRING

    No

    No default value

    For more information, see Regions and endpoints.

    Note
    • We recommend that you set the oss.endpoint parameter to the virtual private cloud (VPC) endpoint of the OSS bucket. For example, if you select the China (Hangzhou) region, set oss.endpoint to oss-cn-hangzhou-internal.aliyuncs.com.

    • If you want to access OSS across VPCs, follow the instructions that are described in How does fully managed Flink access a service across VPCs?

    access.key.id

    The AccessKey ID of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage keys.

    access.key.secret

    The AccessKey secret of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage keys.

    catalog-impl

    The class name of the catalog.

    STRING

    Yes

    No default value

    Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.

    warehouse

    The OSS directory in which table data is stored.

    STRING

    Yes

    No default value

    N/A.

    dlf.catalog-id

    The ID of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    You can go to the Security Settings page to obtain the account ID.

    dlf.endpoint

    The endpoint of DLF

    STRING

    Yes

    No default value

    Note

    dlf.region-id

    The name of the region in which the DLF service is activated.

    STRING

    Yes

    No default value

    Note

    Make sure that the region you selected matches the endpoint you selected for dlf.endpoint.

  • Parameters only for sink tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    write.operation

    The write operation mode.

    STRING

    No

    upsert

    • upsert: Data is updated. This is the default value.

    • insert: Data is written to the table in append mode.

    • bulk_insert: A specific amount of data is written at a time and existing data is not updated.

    hive_sync.enable

    Specifies whether to enable the synchronization of metadata to Hive.

    BOOLEAN

    No

    false

    Valid values:

    • true: The synchronization of metadata to Hive is enabled.

    • false: The synchronization of metadata to Hive is disabled. This is the default value.

    hive_sync.mode

    The Hive data synchronization mode.

    STRING

    No

    hms

    • hms: If you use a Hive metastore or DLF catalog, retain the default value. This is the default value.

    • jdbc: If the Java Database Connectivity (JDBC) catalog is used, set this value to jdbc.

    hive_sync.db

    The name of the Hive database to which data is synchronized.

    STRING

    No

    Database name of the current table in the catalog

    N/A.

    hive_sync.table

    The name of the Hive table to which data is synchronized.

    STRING

    No

    Name of the current table

    N/A.

    dlf.catalog.region

    The name of the region in which the DLF service is activated.

    STRING

    No

    No default value

    Note
    • The dlf.catalog.region parameter takes effect only when the hive_sync.mode parameter is set to hms.

    • Make sure that the value of this parameter matches the endpoint specified by the dlf.catalog.endpoint parameter.

    dlf.catalog.endpoint

    The endpoint of DLF.

    STRING

    No

    No default value

    Note
    • The dlf.catalog.endpoint parameter takes effect only when the hive_sync.mode parameter is set to hms.

    • We recommend that you set the dlf.catalog.endpoint parameter to a VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.

    • If you want to access DLF across VPCs, follow the instructions that are described in How does Realtime Compute for Apache Flink access a service across VPCs?

Data type mappings

Data type of Apache Iceberg

Data type of Realtime Compute for Apache Flink

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

Note

Apache Iceberg timestamps are accurate to the microsecond, and timestamps of Realtime Compute for Apache Flink are accurate to the millisecond. When you use Realtime Compute for Apache Flink to read Apache Iceberg data, the time precision is aligned to milliseconds.

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

Sample code

Confirm that an OSS bucket and a DLF database are created. For more information, see Create buckets and Create a metadatabase.

Note

When you specify a directory for your DLF database, we recommend that you enter a directory in the ${warehouse}/${database_name}.db format. For example, if the value of the warehouse parameter is oss://iceberg-test/warehouse and the value of the database_name parameter is dlf_db, set the OSS directory of the dlf_db database to oss://iceberg-test/warehouse/dlf_db.db.

Sample code for an Apache Iceberg sink table

This example shows how to use the Datagen connector to randomly generate streaming data and write the data to an Apache Iceberg sink table.

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

Sample code for an Apache Iceberg source table

CREATE TEMPORARY TABLE src_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

CREATE TEMPORARY TABLE dst_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

BEGIN STATEMENT SET;

INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;

END;

Reference

For more information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.