All Products
Search
Document Center

E-MapReduce:Integrate Paimon with Flink

Last Updated:Nov 20, 2024

E-MapReduce (EMR) allows you to use the Flink SQL client to read data from and write data to Paimon. This topic describes how to use the Flink SQL client to read data from and write data to Paimon.

Prerequisites

A Dataflow or custom cluster that contains the Flink and Paimon services is created. For more information, see Create a cluster.

Note

If you want to use Hive catalogs to read data from or write data to Paimon, you must create a custom cluster that contains the Flink, Paimon, and Hive services. In addition, you must select Self-managed RDS or Built-in MySQL for the Metadata parameter.

Limits

  • Clusters of EMR V3.46.0 do not allow you to use DLF catalogs and Hive catalogs to read data from or write data to Paimon.

  • Only clusters whose versions range from EMR V3.46.0 to V3.50.X or V5.12.0 to V5.16.X allow you to use the Flink SQL client to read data from and write data to Paimon.

    Note

    For clusters of EMR V3.51.X or a later minor version, or clusters of EMR V5.17.X or a later minor version, you can configure dependencies based on your business requirements. For more information, see Quick Start.

Procedure

Step 1: Configure dependencies

This topic describes how to use file system catalogs, Hive catalogs, and DLF catalogs on the Flink SQL client to read data from or write data to Paimon. You can select a type of catalog based on your scenario and environment requirements and configure dependencies based on the catalog type.

Filesystem Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/

Hive Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

DLF catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

Step 2: Start an EMR cluster

In this example, an EMR cluster in session mode is used. For more information about other modes, see Basic usage.

Run the following command to start a YARN session:

yarn-session.sh --detached

Step 3: Create a catalog

Paimon stores data and metadata in a file system such as Hadoop Distributed File System (HDFS) or an object storage service such as OSS-HDFS. The root path for storage is specified by the warehouse parameter. If the specified root path does not exist, a root path is automatically created. If the specified root path exists, you can use the created catalog to access existing tables in the path.

You can synchronize metadata to Hive or DLF. This way, other services can access data of Paimon by using Hive or DLF.

Note

Clusters of EMR V3.46.0 and EMR V5.17.0 do not allow you to use DLF catalogs and Hive catalogs to read data from or write data to Paimon.

Create a file system catalog

A file system catalog stores metadata in a file system or an object storage system.

  1. Run the following command to start the Flink SQL client:

    sql-client.sh
  2. Execute the following Flink SQL statement to create a file system catalog:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Create a Hive catalog

A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog.

For information about how to query data of Paimon in Hive, see Integrate Paimon with Hive.

  1. Run the following command to start the Flink SQL client:

    sql-client.sh
    Note

    Even if you are using Hive 3, you do not need to modify the startup command.

  2. Execute the following Flink SQL statement to create a Hive catalog:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'hive',
        'uri' = 'thrift://master-1-1:9083', -- The address of Hive Metastore. 
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Create a DLF catalog

A DLF catalog can synchronize metadata to DLF.

Important

When you create an EMR cluster, you must select DLF Unified Metadata for Metadata.

  1. Run the following command to start the Flink SQL client:

    sql-client.sh
    Note

    Even if you are using Hive 3, you do not need to modify the startup command.

  2. Execute the following Flink SQL statement to create a DLF catalog:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Step 4: Read data from and write data to Paimon in streaming mode

Execute the following Flink SQL statements to create a Paimon table in the created catalog and read data from and write data to the table:

-- Set the execution.runtime-mode parameter to streaming. 
SET 'execution.runtime-mode' = 'streaming';

-- Specify a checkpoint interval for Paimon. 
SET 'execution.checkpointing.interval' = '10s';

-- Use the catalog that is created in the previous step. 
USE CATALOG test_catalog;

-- Create a test database and use the database. 
CREATE DATABASE test_db;
USE test_db;

-- Create a Datagen source table that generates random data. 
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
);

-- Create a Paimon table. 
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
);

-- Write data to the Paimon table. 
INSERT INTO test_tbl SELECT * FROM datagen_source;

-- Read data from the table. 
-- The write operation is in progress when the read operation is performed. 
-- Make sure that the cluster has sufficient resources (task slots) to perform the write and read operations at the same time. Otherwise, data fails to be read. 
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 5: Perform an OLAP query on Paimon

Execute the following Flink SQL statements to perform an online analytical processing (OLAP) query on the Paimon table:

-- Set the execution.runtime-mode parameter to batch. 
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- Use the tableau mode to display the query result on the CLI. 
SET 'sql-client.execution.result-mode' = 'tableau';

-- Query data from the Paimon table. 
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 6: Clear resources

Important

After the test is complete, stop the write operation to prevent resource leak.

Execute the following Flink SQL statement to drop the Paimon table:

DROP TABLE test_tbl;