全部产品
Search
文档中心

开源大数据平台E-MapReduce:Paimon与Flink集成

更新时间:Nov 06, 2024

E-MapReduce支持通过Flink SQL对Paimon进行读写操作。本文通过示例为您介绍如何通过Flink SQL对Paimon进行读写操作。

前提条件

已创建选择了Flink和Paimon的DataFlow或Custom类型的集群,创建集群详情请参见创建集群

说明

如果您需要使用Hive Catalog的方式,则只能创建选择了Flink、Paimon和Hive的Custom类型的集群,且元数据类型仅可选择自建RDS内置MySQL

使用限制

  • EMR-3.46.0版本,暂不支持DLF Catalog和Hive Catalog的方式。

  • EMR-3.46.0至EMR-3.50.X版本、EMR-5.12.0至EMR-5.16.X版本的集群,支持使用Flink SQL对Paimon进行读写操作。

    说明

    EMR-3.51.X及其后续版本、EMR-5.17.X及其后续版本,建议您参考Paimon社区文档,在EMR集群中自行配置。

操作步骤

步骤一:配置依赖

本文介绍了通过Flink SQL对Paimon进行读写操作的三种方法:Filesystem Catalog、Hive Catalog和DLF Catalog。每种方法针对不同的应用场景和环境需求,请根据所选方法配置相应的依赖。

Filesystem Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/

Hive Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

创建DLF Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

步骤二:启动集群

本文以Session模式为例,其余模式请参见基础使用

执行以下命令,启动YARN Session。

yarn-session.sh --detached

步骤三:创建Catalog

Paimon将数据和元数据都保存在文件系统(例如,HDFS)或对象存储(例如,OSS-HDFS)中,存储的根路径由warehouse参数指定。如果指定的warehouse路径不存在,将会自动创建该路径;如果指定的warehouse路径存在,则可以通过该Catalog访问路径中已有的表。

您还可以将元数据额外同步到Hive或DLF中,方便其他服务访问Paimon。

说明

EMR-3.46.0和EMR-5.17.0版本,暂不支持DLF Catalog和Hive Catalog的方式。

创建Filesystem Catalog

Filesystem Catalog仅将元数据保存在文件系统或对象存储中。

  1. 执行以下命令,启动Flink SQL。

    sql-client.sh
  2. 执行以下Flink SQL语句,创建Filesystem Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

创建Hive Catalog

Hive Catalog会同步元数据到Hive MetaStore中。在Hive Catalog中创建的表可以直接在Hive中查询。

Hive查询Paimon,详情请参见Paimon与Hive集成

  1. 执行以下命令,启动Flink SQL。

    sql-client.sh
    说明

    即使您使用的是Hive3,也无需修改启动命令。

  2. 执行以下Flink SQL语句,创建Hive Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'hive',
        'uri' = 'thrift://master-1-1:9083', -- uri参数指向Hive metastore service的地址。
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

创建DLF Catalog

DLF Catalog会将元数据同步到DLF中。

重要

创建集群时,元数据必须为DLF统一元数据

  1. 执行以下命令,启动Flink SQL。

    sql-client.sh
    说明

    即使您使用的是Hive3,也无需修改启动命令。

  2. 执行以下Flink SQL语句,创建DLF Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

步骤四 :流作业读写Paimon

执行以下Flink SQL语句,在Catalog中创建一张表,并读写表中的数据。

-- 设置为流作业。
SET 'execution.runtime-mode' = 'streaming';

-- Paimon在流作业中需要设置checkpoint。
SET 'execution.checkpointing.interval' = '10s';

-- 使用之前创建的catalog。
USE CATALOG test_catalog;

-- 创建并使用一个测试database。
CREATE DATABASE test_db;
USE test_db;

-- 用datagen产生随机数据。
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
);

-- 创建Paimon表。
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
);

-- 向Paimon中写入数据。
INSERT INTO test_tbl SELECT * FROM datagen_source;

-- 读取表中的数据。
-- 流式查询作业运行的过程中,上面触发的流式写入作业仍在运行。
-- 您需要保证Flink集群有足够的资源(task slot)同时运行两个作业,否则无法查到数据。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步骤五:OLAP查询Paimon

执行以下Flink SQL语句,对刚才创建的表进行OLAP查询。

-- 设置为批作业。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- 使用tableau展示模式,在命令行中直接打出结果。
SET 'sql-client.execution.result-mode' = 'tableau';

-- 对表中数据进行查询。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步骤六:清理资源

重要

完成测试后,请手动停止流式写入Paimon的作业,防止资源泄露。

停止作业后,执行以下Flink SQL语句,删除刚才创建的表。

DROP TABLE test_tbl;