本文为您介绍如何在实时计算开发控制台向Paimon表中插入、更新、覆写或删除数据,以及从Paimon表消费数据,并指定消费位点。
前提条件
已创建Paimon Catalog和Paimon表,详情请参见管理Paimon Catalog。
使用限制
仅实时计算引擎VVR 8.0.5及以上版本支持Paimon表。
向Paimon表写入数据
通过CTAS/CDAS语句同步数据及表结构变更
详情请参见管理Paimon Catalog。
通过INSERT INTO语句插入或更新数据
您可以通过INSERT INTO语句,直接向Paimon表插入或更新数据。
Paimon主键表可以接受所有类型(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)的消息,相同主键的数据在写入后会根据数据合并机制进行合并。
Paimon Append Only表(非主键表)只能接受INSERT类型的消息。
通过INSERT OVERWRITE语句覆写数据
覆写是指清空并重新写入数据。您可以通过INSERT OVERWRITE语句覆写整张Paimon表或覆写特定分区,SQL语句示例如下。
仅批作业支持INSERT OVERWRITE语句。
默认情况下,INSERT OVERWRITE操作不会产生变更数据,删除与导入的数据无法被下游流式消费。如果您需要消费此类数据,请参见流式消费INSERT OVERWRITE语句的结果。
my_table表是非分区表,覆写整张my_table表。
INSERT OVERWRITE my_table SELECT ...;
my_table表是分区表,覆写my_table表中的
dt=20240108,hh=06
分区。INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
my_table表是分区表,动态覆写my_table表中的分区,即SELECT语句结果中出现的分区都会被覆写,其它分区保持不变。
INSERT OVERWRITE my_table SELECT ...;
my_table表是分区表,覆写整张my_table表。
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
通过DELETE语句删除数据
您可以通过DELETE语句从Paimon主键表中删除数据。DELETE语句只能在查询脚本中执行。
--从my_table表中删除所有currency = 'UNKNOWN'的数据。
DELETE FROM my_table WHERE currency = 'UNKNOWN';
过滤删除消息
使用Paimon主键表时,默认情况下,类型为DELETE的消息会将Paimon表中对应主键的数据删除。如果您不希望Paimon表处理此类消息,可以通过SQL hint将以下参数设置为true,过滤删除消息。
参数 | 说明 | 数据类型 | 默认值 |
ignore-delete | 是否过滤删除消息。 | Boolean | false |
调整结果表的并发数
您可以通过SQL hint设置以下参数,手动调整结果表算子的并发数。
参数 | 说明 | 数据类型 | 默认值 |
sink.parallelism | 手动设定Paimon结果表算子的并发数。 | Integer | 无 |
例如,以下SQL将手动设置Paimon结果表算子并发数为10。
INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;
从Paimon表消费数据
通过流作业消费Paimon表
通过流作业消费的Paimon主键表需要设置变更数据产生机制。
默认情况下,流作业中的Paimon源表算子将首先产出作业启动时刻Paimon表中的全量数据,之后产出从作业启动时刻开始Paimon表中的增量数据。
从指定位点消费Paimon表
您可以通过以下方式从指定位点消费Paimon表:
如果您不需要消费作业启动时刻Paimon表中的全量数据,只需要消费后续的增量数据,可通过SQL Hint设置
'scan.mode' = 'latest'
。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
如果您不想要消费全量数据,只想消费从指定时间点开始的增量数据,可通过SQL Hint设置
scan.timestamp-millis
参数。参数值表示从Unix Epoch(1970-01-01 00:00:00 UTC)开始到指定时间点经过的毫秒数。SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
如果您想要消费从指定时间点之后写入的全量数据,并持续消费后续的增量数据,可以从以下两种操作中选择一种。
说明此类消费方式将读取在指定时间点之后修改的数据文件。由于小文件合并,数据文件中可能包含少量在指定时间点之前写入的数据。您可以根据业务需求,在SQL作业中添加WHERE 过滤条件对数据进行过滤。
不设置任何SQL Hint,在启动作业时,选择指定源表开始时间并指定具体的时间信息。
通过SQL Hint设置
scan.file-creation-time-millis
参数。SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
如果您不想要消费全量数据,只想消费从特定快照文件开始的增量数据,可通过SQL Hint设置
scan.snapshot-id
参数,参数值是指定快照文件的编号。SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
如果您想要消费特定快照文件的全量数据,并持续消费后续的增量数据,可通过SQL hint设置
'scan.mode' = 'from-snapshot-full'
和scan.snapshot-id
参数,scan.snapshot-id
参数值是指定快照文件的编号。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
指定Consumer ID
Consumer ID可以保存Paimon表的消费进度,主要用于以下场景:
如果您修改了SQL作业的计算逻辑,可能会导致作业拓扑发生变化,无法从Flink状态中恢复消费进度。设置Consumer ID可以将此ID对应的消费进度保存在Paimon表的元数据文件中,即使后续无状态启动作业,也能从中断的位点继续消费Paimon表。
设置Consumer ID后,未被消费过的快照文件不会因过期而被删除,可以防止因消费速度跟不上快照过期速度导致的报错。
通过设置consumer-id
参数,您可以给流作业中的Paimon源表算子赋予一个Consumer ID,其值可以是任意的字符串。Consumer ID第一次创建时,它的起始消费位点根据从指定位点消费Paimon表中的规则确定。后续只要继续使用相同的Consumer ID,即可恢复Paimon表的消费进度。
例如,为Paimon源表算子设置名为test-id的Consumer ID的SQL语句示例如下。如果您想要重置某个Consumer ID对应的消费位点,可以额外设置'consumer.ignore-progress' = 'true'
。
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
由于未被Consumer ID消费过的快照文件不会因过期而被删除,如果不及时清理废弃的Consumer ID,快照文件及其对应的历史数据文件将永远不会被删除,会占用存储空间。您可以设置consumer.expiration-time
表参数,将超过规定时间不使用的Cconsumer ID清理掉。例如,'consumer.expiration-time' = '3d'
表示将3天未使用的Consumer ID清理掉。
流式消费INSERT OVERWRITE语句的结果
默认情况下,INSERT OVERWRITE操作不会产生变更数据,删除与导入的数据无法被下游流式消费。如果您需要消费此类数据,可以在流式消费作业中通过SQL Hint配置'streaming-read-overwrite' = 'true'
。
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;
通过批作业消费Paimon表
默认情况下,批作业中的Paimon源表算子将读取最新的快照文件,输出Paimon表的最新状态数据。
Batch Time Travel
通过SQL Hint设置scan.timestamp-millis
参数,即可查询Paimon表在该时间点的状态。参数值表示从Unix Epoch(1970-01-01 00:00:00 UTC)开始到指定时间点经过的毫秒数。
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
通过SQL Hint设置scan.snapshot-id
参数,即可查询Paimon表在该快照文件产生时的状态。参数值为指定快照文件的编号。
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
查询两次快照之间的数据变化
如果您想要查询两次快照间Paimon表中数据发生的变化,可以通过SQL Hint设置incremental-between
参数。例如,查看20号快照文件和12号快照文件间发生变化的所有数据,SQL语句示例如下。
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
由于批作业不支持消费Delete类型的消息,默认情况下此类消息将会被丢弃。如果您想要在批作业中消费Delete类型的消息,请查询Audit Log系统表。例如SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */;
。
调整源表的并发数
默认情况下,Paimon根据分区数以及分桶数等信息自动推断源表算子的并发数。您可以通过SQL Hint设置以下参数,手动调整源表算子的并发数。
参数 | 数据类型 | 默认值 | 备注 |
scan.parallelism | Integer | 无 | 手动设定Paimon源表算子的并发数。 |
scan.infer-parallelism | Boolean | true | 是否自动推断Paimon源表算子的并发数。 |
scan.infer-parallelism.max | Integer | 1024 | Paimon源表算子自动推断出的并发数上限。 |
手动设置Paimon源表算子并发数为10的SQL语句示例如下。
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;
使用Paimon维表
Paimon也可以作为维表使用。关于维表JOIN的语法,详情请参见维表JOIN语句。
相关文档
Paimon表数据写入和消费时,支持使用SQL Hint临时修改表参数,详情请参见作为CTAS的目标端Catalog。
Paimon主键表和Append表的基本特性与功能,详情请参见Paimon主键表和Append Only表。
不同场景下Paimon主键表和Append Scalable表的常用优化,详情请参见Paimon性能优化。
Paimon表的消费依赖快照文件,快照过期时间太短或消费作业效率低会导致正在消费的快照文件因过期被删除,消费作业出现
File xxx not found, Possible causes
报错,解决方法请参见读Paimon作业出现File xxx not found, Possible causes。