This topic describes how to use the Apache Iceberg connector.
Background information
Apache Iceberg is an open table format for data lakes. You can use Apache Iceberg to quickly build your own data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS). Then, you can use a computing engine of the open source big data ecosystem, such as Apache Flink, Apache Spark, Apache Hive, or Apache Presto, to analyze data in your data lake.
Item | Description |
Table type | Source table and sink table |
Running mode | Batch mode and streaming mode |
Data format | N/A |
Metric | N/A |
API type | SQL API |
Data update or deletion in a sink table | Supported |
Features
Apache Iceberg provides the following core capabilities:
Builds a low-cost lightweight data lake storage service based on HDFS or OSS.
Provides comprehensive atomicity, consistency, isolation, durability (ACID) semantics.
Supports historical version backtracking.
Supports efficient data filtering.
Supports schema evolution.
Supports partition evolution.
You can use the efficient fault tolerance and stream processing capabilities of Flink to import a large amount of behavioral data in logs into an Apache Iceberg data lake in real time. Then, you can use Flink or another analytics engine to extract the value of your data.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 4.0.8 or later supports the Apache Iceberg connector. The Apache Iceberg connector must be used together with Data Lake Formation (DLF) catalogs. For more information, see Manage DLF catalogs.
The Apache Iceberg connector supports the Apache Iceberg table formats of version 1 and version 2. For more information, see Iceberg Table Spec.
NoteOnly Realtime Compute for Apache Flink that uses VVR 8.0.7 or later supports the Apache Iceberg table format of version 2.
If the streaming read mode is enabled, only Apache Iceberg tables in which data is written in Append Only mode can be used as source tables.
Syntax
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the source table.
STRING
Yes
No default value
Set the value to
iceberg
.catalog-name
The name of the catalog.
STRING
Yes
No default value
Set the value to a custom name.
catalog-database
The name of the database.
STRING
Yes
default
Set the value to the name of the database that is created on Data Lake Formation (DLF). Example: dlf_db.
NoteIf you have not created a DLF database, create one.
io-impl
The name of the implementation class in the distributed file system.
STRING
Yes
No default value
Set the value to
org.apache.iceberg.aliyun.oss.OSSFileIO
.oss.endpoint
The endpoint of your OSS bucket.
STRING
No
No default value
For more information, see Regions and endpoints.
NoteWe recommend that you set the oss.endpoint parameter to the virtual private cloud (VPC) endpoint of the OSS bucket. For example, if you select the China (Hangzhou) region, set oss.endpoint to oss-cn-hangzhou-internal.aliyuncs.com.
If you want to access OSS across VPCs, follow the instructions that are described in How does fully managed Flink access a service across VPCs?
access.key.id
The AccessKey ID of your Alibaba Cloud account.
STRING
Yes
No default value
For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage keys.
access.key.secret
The AccessKey secret of your Alibaba Cloud account.
STRING
Yes
No default value
For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?
ImportantTo protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage keys.
catalog-impl
The class name of the catalog.
STRING
Yes
No default value
Set the value to org.apache.iceberg.aliyun.dlf.DlfCatalog.
warehouse
The OSS directory in which table data is stored.
STRING
Yes
No default value
N/A.
dlf.catalog-id
The ID of your Alibaba Cloud account.
STRING
Yes
No default value
You can go to the Security Settings page to obtain the account ID.
dlf.endpoint
The endpoint of DLF
STRING
Yes
No default value
NoteWe recommend that you set dlf.endpoint to the VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.
If you want to access DLF across VPCs, follow the instructions that are described in How does Realtime Compute for Apache Flink access a service across VPCs?
dlf.region-id
The name of the region in which the DLF service is activated.
STRING
Yes
No default value
NoteMake sure that the region you selected matches the endpoint you selected for dlf.endpoint.
Parameters only for sink tables
Parameter
Description
Data type
Required
Default value
Remarks
write.operation
The write operation mode.
STRING
No
upsert
upsert: Data is updated. This is the default value.
insert: Data is written to the table in append mode.
bulk_insert: A specific amount of data is written at a time and existing data is not updated.
hive_sync.enable
Specifies whether to enable the synchronization of metadata to Hive.
BOOLEAN
No
false
Valid values:
true: The synchronization of metadata to Hive is enabled.
false: The synchronization of metadata to Hive is disabled. This is the default value.
hive_sync.mode
The Hive data synchronization mode.
STRING
No
hms
hms: If you use a Hive metastore or DLF catalog, retain the default value. This is the default value.
jdbc: If the Java Database Connectivity (JDBC) catalog is used, set this value to jdbc.
hive_sync.db
The name of the Hive database to which data is synchronized.
STRING
No
Database name of the current table in the catalog
N/A.
hive_sync.table
The name of the Hive table to which data is synchronized.
STRING
No
Name of the current table
N/A.
dlf.catalog.region
The name of the region in which the DLF service is activated.
STRING
No
No default value
NoteThe dlf.catalog.region parameter takes effect only when the hive_sync.mode parameter is set to
hms
.Make sure that the value of this parameter matches the endpoint specified by the dlf.catalog.endpoint parameter.
dlf.catalog.endpoint
The endpoint of DLF.
STRING
No
No default value
NoteThe dlf.catalog.endpoint parameter takes effect only when the hive_sync.mode parameter is set to hms.
We recommend that you set the dlf.catalog.endpoint parameter to a VPC endpoint of DLF. For example, if you select the China (Hangzhou) region, set the dlf.catalog.endpoint parameter to dlf-vpc.cn-hangzhou.aliyuncs.com.
If you want to access DLF across VPCs, follow the instructions that are described in How does Realtime Compute for Apache Flink access a service across VPCs?
Data type mappings
Data type of Apache Iceberg | Data type of Realtime Compute for Apache Flink |
BOOLEAN | BOOLEAN |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(P,S) | DECIMAL(P,S) |
DATE | DATE |
TIME | TIME Note Apache Iceberg timestamps are accurate to the microsecond, and timestamps of Realtime Compute for Apache Flink are accurate to the millisecond. When you use Realtime Compute for Apache Flink to read Apache Iceberg data, the time precision is aligned to milliseconds. |
TIMESTAMP | TIMESTAMP |
TIMESTAMPTZ | TIMESTAMP_LTZ |
STRING | STRING |
FIXED(L) | BYTES |
BINARY | VARBINARY |
STRUCT<...> | ROW |
LIST<E> | LIST |
MAP<K,V> | MAP |
Sample code
Confirm that an OSS bucket and a DLF database are created. For more information, see Create buckets and Create a metadatabase.
When you specify a directory for your DLF database, we recommend that you enter a directory in the ${warehouse}/${database_name}.db format. For example, if the value of the warehouse parameter is oss://iceberg-test/warehouse and the value of the database_name parameter is dlf_db, set the OSS directory of the dlf_db database to oss://iceberg-test/warehouse/dlf_db.db.
Sample code for an Apache Iceberg sink table
This example shows how to use the Datagen connector to randomly generate streaming data and write the data to an Apache Iceberg sink table.
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;
Sample code for an Apache Iceberg source table
CREATE TEMPORARY TABLE src_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
CREATE TEMPORARY TABLE dst_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
BEGIN STATEMENT SET;
INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
END;
Reference
For more information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.