全部产品
Search
文档中心

实时计算Flink版:Paimon主键表和Append Only表

更新时间:May 21, 2024

Paimon仅支持主键表和Append Only表。本文为您介绍Paimon主键表和Append Only表的基本特性与功能。

Paimon主键表

创建Paimon表时指定了主键(primary key),则该表即为Paimon主键表。

语法结构

例如,创建一张分区键为dt,主键为dt、shop_id和user_id,分桶数固定为4的Paimon主键表。

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket' = '4'
);

Paimon主键表中每行数据的主键值各不相同,如果将多条具有相同主键的数据写入Paimon主键表,将根据数据合并机制对数据进行合并。

分桶方式

分桶(Bucket)是Paimon表读写操作的最小单元。非分区表的所有数据,以及分区表每个分区的数据,都会被进一步划分到不同的分桶中,以便同一作业使用多个并发同时读写Paimon表,加快读写效率。支持的类别详情如下。

类别

定义

说明

动态分桶(默认)

创建Paimon主键表时,不在WITH参数中指定bucket或指定'bucket' = '-1',将会创建动态分桶的Paimon表。

  • 动态分桶表不支持多个作业同时写入。

  • 对于动态分桶的Paimon主键表,可以支持跨分区更新主键。

固定分桶

创建Paimon主键表时,在WITH参数中指定'bucket' = '<num>',即可指定非分区表的分桶数为<num>,或者分区表单个分区的分桶数为<num><num>是一个大于0的整数。

如果在创建固定分桶的Paimon表之后,需要修改分桶数,详情请参见调整固定分桶表的分桶数量

对于固定分桶的Paimon主键表,分区表的主键需要完全包含分区键(partition key),以避免主键的跨分区更新。

动态分桶表更新

类别

说明

跨分区更新的动态分桶表

对于主键不完全包含分区键的动态分桶表,Paimon无法根据主键确定该数据属于哪个分区的哪个分桶,因此需要使用RocksDB维护主键与分区以及分桶编号的映射关系。相比固定分桶而言,数据量较大的表可能会产生明显的性能损失。另外,因为作业启动时需要将映射关系全量加载至RocksDB中,作业的启动速度也会变慢。数据合并机制会对跨分区更新的结果产生影响:

  • deduplicate:数据将会从老分区删除,并插入新分区。

  • aggregation与partial-update:数据将会直接在老分区中更新,无视新数据的分区键。

  • first-row:如果相同主键的数据已经存在,则新数据将被直接丢弃。

非跨分区更新的动态分桶表

对于主键完全包含分区键的动态分桶表,Paimon可以确定该主键属于哪个分区,但无法确定属于哪个分桶,因此需要使用额外的堆内存创建索引,以维护主键与分桶编号的映射关系。

具体来说,每1亿条主键将额外消耗1 GB的堆内存。只有当前正在写入的分区才会消耗堆内存,历史分区中的主键不会消耗堆内存。

除堆内存的消耗外,相比固定分桶而言,主键完全包含分区键的动态分桶表不会有明显的性能损失。

数据分发

类别

数据分发

动态分桶

动态分桶的Paimon表会先将数据写入已有的分桶中,当分桶的数据量超过限制时,再自动创建新的分桶。以下WITH参数将会影响动态分桶的行为。

  • dynamic-bucket.target-row-num:每个分桶最多存储几条数据。默认值为2000000。

  • dynamic-bucket.initial-buckets:初始的分桶数。如果不设置,初始将会创建等同于writer算子并发数的分桶。

固定分桶

默认情况下,Paimon将根据每条数据主键的哈希值,确定该数据属于哪个分桶。

如果您需要修改数据的分桶方式,可以在创建Paimon表时,在WITH参数中指定bucket-key参数,不同列的名称用英文逗号分隔,主键必须完整包含bucket-key。例如,如果设置了'bucket-key' = 'c1,c2',则Paimon将根据每条数据c1c2两列的值,确定该数据属于哪个分桶。

调整固定分桶表的分桶数量

由于分桶数限制了实际工作的作业并发数,且单个分桶内数据总量太大可能导致读写性能的降低,因此分桶数不宜太小。但是,分桶数过大也会造成小文件数量过多。建议每个分桶的数据大小在2 GB左右,最大不超过5 GB。调整固定分桶表的分桶数量具体的操作步骤如下。

  1. 停止所有写入该Paimon表或消费该Paimon表的作业。

  2. 新建查询脚本,执行以下SQL语句,调整Paimon表的bucket参数。

    ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
  3. 整理非分区表中的所有数据,或分区表中仍需写入的分区中的所有数据。

    • 非分区表:新建空白批作业草稿,在SQL编辑器中编写以下SQL语句,之后部署启动批作业。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;
    • 分区表:新建空白批作业草稿,在SQL编辑器中编写以下SQL语句,之后部署启动批作业。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (<partition-spec>)
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE <partition-condition>;

      其中,<partition-spec>和<partition-condition>指定了需要整理的分区。例如,整理dt = 20240312, hh = 08分区中的数据的SQL语句如下。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (dt = '20240312', hh = '08')
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE dt = '20240312' AND hh = '08';
  4. 批作业执行完成后,即可恢复Paimon表的写入作业与消费作业。

变更数据产生机制

Paimon表需要将数据的增删与更新改写为完整的变更数据(类似于数据库的Binlog),才能让下游进行流式消费。通过在WITH参数中设置changelog-producer,Paimon将会以不同的方式产生变更数据,其取值如下。

参数取值

说明

使用场景

none(默认)

Paimon主键表将不会产出完整的变更数据。

Paimon表仅能通过批作业进行消费,不能通过流作业进行消费。

input

Paimon主键表会直接将输入的消息作为变更数据传递给下游消费者。

仅在输入数据流本身是完整的变更数据时(例如数据库的Binlog)使用。

由于input机制不涉及额外的计算,因此其效率最高。

lookup

Paimon主键表会通过批量点查的方式,在Flink作业每次创建检查点(checkpoint)时触发小文件合并(compaction),并利用小文件合并的结果产生完整的变更数据。

无论输入数据流是否为完整的变更数据,都可以使用该机制。

与full-compaction机制相比,lookup机制的时效性更好,但总体来看耗费的资源更多。推荐在对数据新鲜度有较高要求(分钟级)的情况下使用。

full-compaction

Paimon主键表会在每一次执行小文件全量合并(full compaction)时,产生完整的变更数据。

无论输入数据流是否为完整的变更数据,都可以使用该机制。

与lookup机制相比,full-compaction机制的时效性较差,但其利用了小文件合并流程,不产生额外计算,因此总体来看耗费的资源更少。推荐在对数据新鲜度要求不高(小时级)的情况下使用。

为了保证full-compaction机制的时效性,您可以在WITH参数中设置'full-compaction.delta-commits' = '<num>',要求Paimon在每<num>个Flink作业的检查点执行小文件全量合并。然而,由于小文件全量合并会消耗较多计算资源,因此频率不宜过高,建议每30分钟至1小时强制执行一次。

说明

默认情况下,即使更新后的数据与更新之前相同,Paimon仍然会产生变更数据。如果您希望消除此类无效的变更数据,可以在WITH参数中设置'changelog-producer.row-deduplicate' = 'true'。该参数仅对lookup与full-compaction机制有效。由于设置该参数后,需要引入额外的计算对比更新前后的值,推荐仅在无效变更数据较多的情况下使用该参数。

数据合并机制

参数说明

如果将多条具有相同主键的数据写入Paimon主键表,Paimon将会根据WITH参数中设置的merge-engine参数对数据进行合并。参数取值如下:

deduplicate(默认值)

设置'merge-engine' = 'deduplicate' 后,对于多条具有相同主键的数据,Paimon主键表仅会保留最新一条数据,并丢弃其它具有相同主键的数据。如果最新数据是一条delete消息,所有具有该主键的数据都会被丢弃。创建Paimon表的DDL语句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'deduplicate' -- deduplicate 是默认值,可以不设置
);
  • 如果写入Paimon表的数据依次为+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')+I(1, 8.0, 'cherry'),则SELECT * FROM T WHERE k = 1将查询到(1, 8.0, 'cherry')这条数据。

  • 如果写入Paimon表的数据依次为+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')-D(1, 4.0, 'banana'),则SELECT * FROM T WHERE k = 1将查不到任何数据。

first-row

设置'merge-engine' = 'first-row'后,Paimon只会保留相同主键数据中的第一条。与deduplicate合并机制相比,first-row只会产生insert类型的变更数据,且变更数据的产出效率更高。

说明
  • 如果下游需要流式消费first-row的结果,需要将changelog-producer参数设为 lookup。

  • first-row无法处理delete与update_before消息。您可以设置'first-row.ignore-delete' = 'true'以忽略这两类消息。

  • first-row合并机制不支持指定sequence field。

例如,创建Paimon表的DDL语句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

如果写入Paimon表的数据依次为+I(1, 2.0, 'apple')+I(1, 4.0, 'banana'), +I(1, 8.0, 'cherry'),则SELECT * FROM T WHERE k = 1将查询到(1, 2.0, 'apple')这条数据。

aggregation

对于多条具有相同主键的数据,Paimon主键表将会根据您指定的聚合函数进行聚合。对于不属于主键的每一列,都需要通过fields.<field-name>.aggregate-function指定一个聚合函数,否则该列将默认使用last_non_null_value聚合函数。

说明

如果下游需要流式消费aggregation的结果,需要将changelog-producer参数设为lookup或full-compaction。

例如,price列将会使用max函数进行聚合,而sales列将会使用sum函数进行聚合。

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

如果写入Paimon表的数据依次为+I(1, 23.0, 15)+I(1, 30.2, 20)SELECT * FROM T WHERE product_id = 1将查询到(1, 30.2, 35)这条数据。

支持的聚合函数与对应的数据类型如下:

  • sum(求和):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • product(求乘积):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • count(统计非null值总数):支持INTEGER和BIGINT。

  • max(最大值)和min(最小值):CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。

  • first_value(返回第一次输入的值)和last_value(返回最新输入的值):支持所有数据类型,包括null。

  • first_not_null_value(返回第一次输入的非null值)和last_non_null_value(返回最新输入的非 null 值):支持所有数据类型。

  • listagg(将输入的字符串依次用英文逗号连接):支持STRING。

  • bool_and和bool_or:支持BOOLEAN。

说明

上述聚合函数中,只有sum、product和count支持回撤消息(update_before 与 delete 消息)。您可以设置'fields.<field-name>.ignore-retract' = 'true'使对应列忽略回撤消息。

partial-update

设置'merge-engine' = 'partial-update'后,您可以通过多条消息对数据进行逐步更新,并最终得到完整的数据。即具有相同主键的新数据将会覆盖原来的数据,但值为null的列不会进行覆盖。

说明
  • 如果下游需要流式消费partial-update的结果,需要将changelog-producer参数设为lookup或full-compaction。

  • partial-update 无法处理 delete 与 update_before 消息。您可以设置'partial-update.ignore-delete' = 'true' 以忽略这两类消息。

例如,考虑以下创建 Paimon 表的 DDL 语句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

如果写入Paimon表的数据依次为+I(1, 23.0, 10, NULL)+I(1, NULL, NULL, 'This is a book')+I(1, 25.2, NULL, NULL),则SELECT * FROM T WHERE k = 1将查询到(1, 25.2, 10, 'This is a book')这条数据。

在partial-update合并机制中,您还可以设置指定WITH参数指定合并顺序或对数据进行打宽与聚合,详情如下:

  • 通过Sequence Group为不同列分别指定合并顺序

    除了sequence field之外,您也可以通过sequence group为不同列分别指定合并顺序。您可以为来自不同源表的列分别指定合并顺序,帮您在打宽场景下处理乱序数据。

    例如,a、b 两列根据 g_1 列的值从小到大进行合并,而 c、d 两列将根据g_2 列的值从小到大进行合并。

    CREATE TABLE T (
      k INT,
      a STRING,
      b STRING,
      g_1 INT,
      c STRING,
      d STRING,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.g_2.sequence-group' = 'c,d'
    );
  • 同时进行数据的打宽与聚合

    您还可以在WITH参数中设置fields.<field-name>.aggregate-function,为<field-name>这一列指定聚合函数,对该列的值进行聚合。<field-name>这一列需要属于某个 sequence group。aggregation合并机制支持的聚合函数均可使用。

    例如,a、b两列将根据g_1 列的值从小到大进行合并,其中a列将会保留最新的非null值,而 b列将会保留输入的最大值。c、d两列将根据g_2列的值从小到大进行合并,其中c列将会保留最新的非null值,而d列将会求出输入数据的和。

    CREATE TABLE T (
      k INT,
      a STRING,
      b INT,
      g_1 INT,
      c STRING,
      d INT,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.b.aggregate-function' = 'max',
      'fields.g_2.sequence-group' = 'c,d',
      'fields.d.aggregate-function' = 'sum'
    );

更多信息,详情请参见Merge Engine

乱序数据处理

默认情况下,Paimon会按照数据的输入顺序确定合并的顺序,最后写入Paimon的数据会被认为是最新数据。如果您的输入数据流存在乱序数据,可以通过在WITH参数中指定'sequence.field' = '<column-name>,具有相同主键的数据将按<column-name>这一列的值从小到大进行合并。可以作为sequence field的数据类型有:TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP和TIMESTAMP_LTZ。

说明

如果您使用MySQLop_t元数据作为sequence field,会导致一对update_before与update_after消息具有相同的sequence field值,需要在WITH参数中设置'sequence.auto-padding' = 'row-kind-flag',以保证Paimon会先处理update_before消息,再处理update_after消息。

Paimon Append Only表(非主键表)

如果在创建Paimon表时没有指定主键(Primary Key),则该表就是Paimon Append Only表。您只能以流式方式将完整记录插入到表中,适合不需要流式更新的场景(例如日志数据同步)。

语法结构

例如,以下SQL语句将创建一张分区键为dt的Append Scalable表。

CREATE TABLE T (
  dt STRING
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) PARTITIONED BY (dt) WITH (
  'bucket' = '-1'
);

表类型

类型

定义

说明

Append Scalable表

创建Paimon Append Only表时,在WITH参数中指定'bucket' = '-1',将会创建Append Scalable表。

作为Hive表的高效替代,在对数据的流式消费顺序没有需求的情况下,应尽量选择Append Scalable表。它具有写入无shuffle、数据可排序、并发控制便捷,可直接将直接吸收或者转换现存的hive表、且可以使用异步合并等优势,进一步加快写入过程。

Append Queue表

创建Paimon Append Only表时,在WITH参数中指定'bucket' = '<num>',将会创建Append Queue表。

其中,<num>是一个大于0的整数,指定了非分区表的分桶数,或分区表单个分区的分桶数。

作为消息队列具有分钟级延迟的替代。Paimon表的分桶数此时相当于Kafka的Partition数或云消息队列MQTT版的Shard数。

数据的分发

类型

说明

Append Scalable表

由于无视桶的概念,多个并发可以同时写同一个分区,无考虑顺序以及数据是否分到对应桶的问题。在写入时,直接由上游将数据推往writer节点。其中,不需要对数据进行hash partitioning。由此,在其他条件相同的情况下,通常此类型的表写入速度更快。当上游并发和writer并发相同时,需要注意是否会产生数据倾斜问题。

Append Queue表

默认情况下,Paimon将根据每条数据所有列的取值,确定该数据属于哪个分桶(bucket)。如果您需要修改数据的分桶方式,可以在创建Paimon表时,在WITH参数中指定bucket-key参数,不同列的名称用英文逗号分隔。

例如,设置'bucket-key' = 'c1,c2',则Paimon将根据每条数据c1c2两列的值,确定该数据属于哪个分桶。

说明

建议尽可能设置bucket-key,以减少分桶过程中参与计算的列的数量,提高Paimon表的写入效率。

数据消费顺序

类型

说明

Append Scalable表

无法像Append Queue表一样,在流式消费Paimon表时保证数据的消费顺序与数据写入Paimon表的顺序一致。适合对数据的流式消费顺序没有需求场景。

Append Queue表

Append Queue表可以保证流式消费Paimon表时,每个分桶中数据的消费顺序与数据写入Paimon表的顺序一致。具体来说:

  • 对于两条来自不同分区(partition)的数据

    • 如果表参数中设置了'scan.plan-sort-partition' = 'true',则分区值更小的数据会首先产出。

    • 如果表参数中未设置'scan.plan-sort-partition' = 'true',则分区创建时间更早的数据会首先产出。

  • 对于两条来自相同分区的相同分桶的数据,先写入Paimon表的数据会首先产出。

  • 对于两条来自相同分区但不同分桶的数据,由于不同分桶可能被不同的Flink作业并发处理,因此不保证两条数据的消费顺序。

相关文档