全部产品
Search
文档中心

实时计算Flink版:Paimon表数据写入和消费

更新时间:May 16, 2024

本文为您介绍如何在实时计算开发控制台向Paimon表中插入、更新、覆写或删除数据,以及从Paimon表消费数据,并指定消费位点。

前提条件

已创建Paimon Catalog和Paimon表,详情请参见管理Paimon Catalog

使用限制

仅实时计算引擎VVR 8.0.5及以上版本支持Paimon表。

向Paimon表写入数据

通过CTAS/CDAS语句同步数据及表结构变更

详情请参见管理Paimon Catalog

通过INSERT INTO语句插入或更新数据

您可以通过INSERT INTO语句,直接向Paimon表插入或更新数据。

通过INSERT OVERWRITE语句覆写数据

覆写是指清空并重新写入数据。您可以通过INSERT OVERWRITE语句覆写整张Paimon表或覆写特定分区,SQL语句示例如下。

说明
  • 仅批作业支持INSERT OVERWRITE语句。

  • 默认情况下,INSERT OVERWRITE操作不会产生变更数据,删除与导入的数据无法被下游流式消费。如果您需要消费此类数据,请参见流式消费INSERT OVERWRITE语句的结果

  • my_table表是非分区表,覆写整张my_table表。

    INSERT OVERWRITE my_table SELECT ...;
  • my_table表是分区表,覆写my_table表中的dt=20240108,hh=06分区。

    INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
  • my_table表是分区表,动态覆写my_table表中的分区,即SELECT语句结果中出现的分区都会被覆写,其它分区保持不变。

    INSERT OVERWRITE my_table SELECT ...;
  • my_table表是分区表,覆写整张my_table表。

    INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;

通过DELETE语句删除数据

您可以通过DELETE语句从Paimon主键表中删除数据。DELETE语句只能在查询脚本中执行。

--从my_table表中删除所有currency = 'UNKNOWN'的数据。
DELETE FROM my_table WHERE currency = 'UNKNOWN';

过滤删除消息

使用Paimon主键表时,默认情况下,类型为DELETE的消息会将Paimon表中对应主键的数据删除。如果您不希望Paimon表处理此类消息,可以通过SQL hint将以下参数设置为true,过滤删除消息。

参数

说明

数据类型

默认值

ignore-delete

是否过滤删除消息。

Boolean

false

调整结果表的并发数

您可以通过SQL hint设置以下参数,手动调整结果表算子的并发数。

参数

说明

数据类型

默认值

sink.parallelism

手动设定Paimon结果表算子的并发数。

Integer

例如,以下SQL将手动设置Paimon结果表算子并发数为10。

INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;

从Paimon表消费数据

通过流作业消费Paimon表

说明

通过流作业消费的Paimon主键表需要设置变更数据产生机制

默认情况下,流作业中的Paimon源表算子将首先产出作业启动时刻Paimon表中的全量数据,之后产出从作业启动时刻开始Paimon表中的增量数据。

从指定位点消费Paimon表

您可以通过以下方式从指定位点消费Paimon表:

  • 如果您不需要消费作业启动时刻Paimon表中的全量数据,只需要消费后续的增量数据,可通过SQL Hint设置'scan.mode' = 'latest'

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
  • 如果您不想要消费全量数据,只想消费从指定时间点开始的增量数据,可通过SQL Hint设置scan.timestamp-millis参数。参数值表示从Unix Epoch(1970-01-01 00:00:00 UTC)开始到指定时间点经过的毫秒数。

    SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
  • 如果您想要消费从指定时间点之后写入的全量数据,并持续消费后续的增量数据,可以从以下两种操作中选择一种。

    说明

    此类消费方式将读取在指定时间点之后修改的数据文件。由于小文件合并,数据文件中可能包含少量在指定时间点之前写入的数据。您可以根据业务需求,在SQL作业中添加WHERE 过滤条件对数据进行过滤。

    • 不设置任何SQL Hint,在启动作业时,选择指定源表开始时间并指定具体的时间信息。image.png

    • 通过SQL Hint设置scan.file-creation-time-millis参数。

      SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
  • 如果您不想要消费全量数据,只想消费从特定快照文件开始的增量数据,可通过SQL Hint设置scan.snapshot-id参数,参数值是指定快照文件的编号。

    SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
  • 如果您想要消费特定快照文件的全量数据,并持续消费后续的增量数据,可通过SQL hint设置'scan.mode' = 'from-snapshot-full'scan.snapshot-id参数,scan.snapshot-id参数值是指定快照文件的编号。

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;

指定Consumer ID

Consumer ID可以保存Paimon表的消费进度,主要用于以下场景:

  • 如果您修改了SQL作业的计算逻辑,可能会导致作业拓扑发生变化,无法从Flink状态中恢复消费进度。设置Consumer ID可以将此ID对应的消费进度保存在Paimon表的元数据文件中,即使后续无状态启动作业,也能从中断的位点继续消费Paimon表。

  • 设置Consumer ID后,未被消费过的快照文件不会因过期而被删除,可以防止因消费速度跟不上快照过期速度导致的报错。

通过设置consumer-id参数,您可以给流作业中的Paimon源表算子赋予一个Consumer ID,其值可以是任意的字符串。Consumer ID第一次创建时,它的起始消费位点根据从指定位点消费Paimon表中的规则确定。后续只要继续使用相同的Consumer ID,即可恢复Paimon表的消费进度。

例如,为Paimon源表算子设置名为test-id的Consumer ID的SQL语句示例如下。如果您想要重置某个Consumer ID对应的消费位点,可以额外设置'consumer.ignore-progress' = 'true'

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
说明

由于未被Consumer ID消费过的快照文件不会因过期而被删除,如果不及时清理废弃的Consumer ID,快照文件及其对应的历史数据文件将永远不会被删除,会占用存储空间。您可以设置consumer.expiration-time表参数,将超过规定时间不使用的Cconsumer ID清理掉。例如,'consumer.expiration-time' = '3d'表示将3天未使用的Consumer ID清理掉。

流式消费INSERT OVERWRITE语句的结果

默认情况下,INSERT OVERWRITE操作不会产生变更数据,删除与导入的数据无法被下游流式消费。如果您需要消费此类数据,可以在流式消费作业中通过SQL Hint配置'streaming-read-overwrite' = 'true'

SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;

通过批作业消费Paimon表

默认情况下,批作业中的Paimon源表算子将读取最新的快照文件,输出Paimon表的最新状态数据。

Batch Time Travel

通过SQL Hint设置scan.timestamp-millis参数,即可查询Paimon表在该时间点的状态。参数值表示从Unix Epoch(1970-01-01 00:00:00 UTC)开始到指定时间点经过的毫秒数。

SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

通过SQL Hint设置scan.snapshot-id参数,即可查询Paimon表在该快照文件产生时的状态。参数值为指定快照文件的编号。

SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;

查询两次快照之间的数据变化

如果您想要查询两次快照间Paimon表中数据发生的变化,可以通过SQL Hint设置incremental-between参数。例如,查看20号快照文件和12号快照文件间发生变化的所有数据,SQL语句示例如下。

SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
说明

由于批作业不支持消费Delete类型的消息,默认情况下此类消息将会被丢弃。如果您想要在批作业中消费Delete类型的消息,请查询Audit Log系统表。例如SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */;

调整源表的并发数

默认情况下,Paimon根据分区数以及分桶数等信息自动推断源表算子的并发数。您可以通过SQL Hint设置以下参数,手动调整源表算子的并发数。

参数

数据类型

默认值

备注

scan.parallelism

Integer

手动设定Paimon源表算子的并发数。

scan.infer-parallelism

Boolean

true

是否自动推断Paimon源表算子的并发数。

scan.infer-parallelism.max

Integer

1024

Paimon源表算子自动推断出的并发数上限。

手动设置Paimon源表算子并发数为10的SQL语句示例如下。

SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;

使用Paimon维表

Paimon也可以作为维表使用。关于维表JOIN的语法,详情请参见维表JOIN语句

相关文档