全部產品
Search
文件中心

Realtime Compute for Apache Flink:Paimon主鍵表和Append Only表

更新時間:Jul 13, 2024

Paimon僅支援主鍵表和Append Only表。本文為您介紹Paimon主鍵表和Append Only表的基本特性與功能。

Paimon主鍵表

建立Paimon表時指定了主鍵(primary key),則該表即為Paimon主鍵表。

文法結構

例如,建立一張分區鍵為dt,主鍵為dt、shop_id和user_id,分桶數固定為4的Paimon主鍵表。

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket' = '4'
);

Paimon主鍵表中每行資料的主索引值各不相同,如果將多條具有相同主鍵的資料寫入Paimon主鍵表,將根據資料合併機制對資料進行合并。

分桶方式

分桶(Bucket)是Paimon表讀寫操作的最小單元。非分區表的所有資料,以及分區表每個分區的資料,都會被進一步劃分到不同的分桶中,以便同一作業使用多個並發同時讀寫Paimon表,加快讀寫效率。支援的類別詳情如下。

類別

定義

說明

動態分桶(預設)

建立Paimon主鍵表時,不在WITH參數中指定bucket或指定'bucket' = '-1',將會建立動態分桶的Paimon表。

  • 動態分桶表不支援多個作業同時寫入。

  • 對於動態分桶的Paimon主鍵表,可以支援跨分區更新主鍵。

固定分桶

建立Paimon主鍵表時,在WITH參數中指定'bucket' = '<num>',即可指定非分區表的分桶數為<num>,或者分區表單個分區的分桶數為<num><num>是一個大於0的整數。

如果在建立固定分桶的Paimon表之後,需要修改分桶數,詳情請參見調整固定分桶表的分桶數量

對於固定分桶的Paimon主鍵表,分區表的主鍵需要完全包含分區鍵(partition key),以避免主鍵的跨分區更新。

動態分桶表更新

類別

說明

跨分區更新的動態分桶表

對於主鍵不完全包含分區鍵的動態分桶表,Paimon無法根據主鍵確定該資料屬於哪個分區的哪個分桶,因此需要使用RocksDB維護主鍵與分區以及分桶編號的映射關係。相比固定分桶而言,資料量較大的表可能會產生明顯的效能損失。另外,因為作業啟動時需要將映射關係全量載入至RocksDB中,作業的啟動速度也會變慢。資料合併機制會對跨分區更新的結果產生影響:

  • deduplicate:資料將會從老分區刪除,並插入新分區。

  • aggregation與partial-update:資料將會直接在老分區中更新,無視新資料的分區鍵。

  • first-row:如果相同主鍵的資料已經存在,則新資料將被直接丟棄。

非跨分區更新的動態分桶表

對於主鍵完全包含分區鍵的動態分桶表,Paimon可以確定該主鍵屬於哪個分區,但無法確定屬於哪個分桶,因此需要使用額外的堆記憶體建立索引,以維護主鍵與分桶編號的映射關係。

具體來說,每1億條主鍵將額外消耗1 GB的堆記憶體。只有當前正在寫入的分區才會消耗堆記憶體,歷史分區中的主鍵不會消耗堆記憶體。

除堆記憶體的消耗外,相比固定分桶而言,主鍵完全包含分區鍵的動態分桶表不會有明顯的效能損失。

資料分發

類別

資料分發

動態分桶

動態分桶的Paimon表會先將資料寫入已有的分桶中,當分桶的資料量超過限制時,再自動建立新的分桶。以下WITH參數將會影響動態分桶的行為。

  • dynamic-bucket.target-row-num:每個分桶最多儲存幾條資料。預設值為2000000。

  • dynamic-bucket.initial-buckets:初始的分桶數。如果不設定,初始將會建立等同於writer運算元並發數的分桶。

固定分桶

預設情況下,Paimon將根據每條資料主鍵的雜湊值,確定該資料屬於哪個分桶。

如果您需要修改資料的分桶方式,可以在建立Paimon表時,在WITH參數中指定bucket-key參數,不同列的名稱用英文逗號分隔,主鍵必須完整包含bucket-key。例如,如果設定了'bucket-key' = 'c1,c2',則Paimon將根據每條資料c1c2兩列的值,確定該資料屬於哪個分桶。

調整固定分桶表的分桶數量

由於分桶數限制了實際工作的作業並發數,且單個分桶內資料總量太大可能導致讀寫效能的降低,因此分桶數不宜太小。但是,分桶數過大也會造成小檔案數量過多。建議每個分桶的資料大小在2 GB左右,最大不超過5 GB。調整固定分桶表的分桶數量具體的操作步驟如下。

  1. 停止所有寫入該Paimon表或消費該Paimon表的作業。

  2. 建立查詢指令碼,執行以下SQL語句,調整Paimon表的bucket參數。

    ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
  3. 整理非分區表中的所有資料,或分區表中仍需寫入的分區中的所有資料。

    • 非分區表:建立空白批作業草稿,在SQL編輯器中編寫以下SQL語句,之後部署啟動批作業。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;
    • 分區表:建立空白批作業草稿,在SQL編輯器中編寫以下SQL語句,之後部署啟動批作業。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (<partition-spec>)
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE <partition-condition>;

      其中,<partition-spec>和<partition-condition>指定了需要整理的分區。例如,整理dt = 20240312, hh = 08分區中的資料的SQL語句如下。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (dt = '20240312', hh = '08')
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE dt = '20240312' AND hh = '08';
  4. 批作業執行完成後,即可恢複Paimon表的寫入作業與消費作業。

變更資料產生機制

Paimon表需要將資料的增刪與更新改寫為完整的變更資料(類似於資料庫的Binlog),才能讓下遊進行流式消費。通過在WITH參數中設定changelog-producer,Paimon將會以不同的方式產生變更資料,其取值如下。

參數取值

說明

使用情境

none(預設)

Paimon主鍵表將不會產出完整的變更資料。

Paimon表僅能通過批作業進行消費,不能通過流作業進行消費。

input

Paimon主鍵表會直接將輸入的訊息作為變更資料傳遞給下遊消費者。

僅在輸入資料流本身是完整的變更資料時(例如資料庫的Binlog)使用。

由於input機制不涉及額外的計算,因此其效率最高。

lookup

Paimon主鍵表會通過批量點查的方式,在Flink作業每次建立檢查點(checkpoint)時觸發小檔案合并(compaction),並利用小檔案合并的結果產生完整的變更資料。

無論輸入資料流是否為完整的變更資料,都可以使用該機制。

與full-compaction機制相比,lookup機制的時效性更好,但總體來看耗費的資源更多。推薦在對資料新鮮度有較高要求(分鐘級)的情況下使用。

full-compaction

Paimon主鍵表會在每一次執行小檔案全量合并(full compaction)時,產生完整的變更資料。

無論輸入資料流是否為完整的變更資料,都可以使用該機制。

與lookup機制相比,full-compaction機制的時效性較差,但其利用了小檔案合并流程,不產生額外計算,因此總體來看耗費的資源更少。推薦在對資料新鮮度要求不高(小時級)的情況下使用。

為了保證full-compaction機制的時效性,您可以在WITH參數中設定'full-compaction.delta-commits' = '<num>',要求Paimon在每<num>個Flink作業的檢查點執行小檔案全量合并。然而,由於小檔案全量合并會消耗較多計算資源,因此頻率不宜過高,建議每30分鐘至1小時強制執行一次。

說明

預設情況下,即使更新後的資料與更新之前相同,Paimon仍然會產生變更資料。如果您希望消除此類無效的變更資料,可以在WITH參數中設定'changelog-producer.row-deduplicate' = 'true'。該參數僅對lookup與full-compaction機制有效。由於設定該參數後,需要引入額外的計算對比更新前後的值,推薦僅在無效變更資料較多的情況下使用該參數。

資料合併機制

參數說明

如果將多條具有相同主鍵的資料寫入Paimon主鍵表,Paimon將會根據WITH參數中設定的merge-engine參數對資料進行合并。參數取值如下:

deduplicate(預設值)

設定'merge-engine' = 'deduplicate' 後,對於多條具有相同主鍵的資料,Paimon主鍵表僅會保留最新一條資料,並丟棄其它具有相同主鍵的資料。如果最新資料是一條delete訊息,所有具有該主鍵的資料都會被丟棄。建立Paimon表的DDL語句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'deduplicate' -- deduplicate 是預設值,可以不設定
);
  • 如果寫入Paimon表的資料依次為+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')+I(1, 8.0, 'cherry'),則SELECT * FROM T WHERE k = 1將查詢到(1, 8.0, 'cherry')這條資料。

  • 如果寫入Paimon表的資料依次為+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')-D(1, 4.0, 'banana'),則SELECT * FROM T WHERE k = 1將查不到任何資料。

first-row

設定'merge-engine' = 'first-row'後,Paimon只會保留相同主鍵資料中的第一條。與deduplicate合并機制相比,first-row只會產生insert類型的變更資料,且變更資料的產出效率更高。

說明
  • 如果下遊需要流式消費first-row的結果,需要將changelog-producer參數設為 lookup。

  • first-row無法處理delete與update_before訊息。您可以設定'first-row.ignore-delete' = 'true'以忽略這兩類訊息。

  • first-row合并機制不支援指定sequence field。

例如,建立Paimon表的DDL語句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

如果寫入Paimon表的資料依次為+I(1, 2.0, 'apple')+I(1, 4.0, 'banana'), +I(1, 8.0, 'cherry'),則SELECT * FROM T WHERE k = 1將查詢到(1, 2.0, 'apple')這條資料。

aggregation

對於多條具有相同主鍵的資料,Paimon主鍵表將會根據您指定的彙總函式進行彙總。對於不屬於主鍵的每一列,都需要通過fields.<field-name>.aggregate-function指定一個彙總函式,否則該列將預設使用last_non_null_value彙總函式。

說明

如果下遊需要流式消費aggregation的結果,需要將changelog-producer參數設為lookup或full-compaction。

例如,price列將會使用max函數進行彙總,而sales列將會使用sum函數進行彙總。

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

如果寫入Paimon表的資料依次為+I(1, 23.0, 15)+I(1, 30.2, 20)SELECT * FROM T WHERE product_id = 1將查詢到(1, 30.2, 35)這條資料。

支援的彙總函式與對應的資料類型如下:

  • sum(求和):支援DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • product(求乘積):支援DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • count(統計非null值總數):支援INTEGER和BIGINT。

  • max(最大值)和min(最小值):CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。

  • first_value(返回第一次輸入的值)和last_value(返回最新輸入的值):支援所有資料類型,包括null。

  • first_not_null_value(返回第一次輸入的非null值)和last_non_null_value(返回最新輸入的非 null 值):支援所有資料類型。

  • listagg(將輸入的字串依次用英文逗號串連):支援STRING。

  • bool_and和bool_or:支援BOOLEAN。

說明

上述彙總函式中,只有sum、product和count支援回撤訊息(update_before 與 delete 訊息)。您可以設定'fields.<field-name>.ignore-retract' = 'true'使對應列忽略回撤訊息。

partial-update

設定'merge-engine' = 'partial-update'後,您可以通過多條訊息對資料進行逐步更新,並最終得到完整的資料。即具有相同主鍵的新資料將會覆蓋原來的資料,但值為null的列不會進行覆蓋。

說明
  • 如果下遊需要流式消費partial-update的結果,需要將changelog-producer參數設為lookup或full-compaction。

  • partial-update 無法處理 delete 與 update_before 訊息。您可以設定'partial-update.ignore-delete' = 'true' 以忽略這兩類訊息。

例如,考慮以下建立 Paimon 表的 DDL 語句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

如果寫入Paimon表的資料依次為+I(1, 23.0, 10, NULL)+I(1, NULL, NULL, 'This is a book')+I(1, 25.2, NULL, NULL),則SELECT * FROM T WHERE k = 1將查詢到(1, 25.2, 10, 'This is a book')這條資料。

在partial-update合并機制中,您還可以設定指定WITH參數指定合并順序或對資料進行打寬與彙總,詳情如下:

  • 通過Sequence Group為不同列分別指定合并順序

    除了sequence field之外,您也可以通過sequence group為不同列分別指定合并順序。您可以為來自不同源表的列分別指定合并順序,幫您在打寬情境下處理亂序資料。

    例如,a、b 兩列根據 g_1 列的值從小到大進行合并,而 c、d 兩列將根據g_2 列的值從小到大進行合并。

    CREATE TABLE T (
      k INT,
      a STRING,
      b STRING,
      g_1 INT,
      c STRING,
      d STRING,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.g_2.sequence-group' = 'c,d'
    );
  • 同時進行資料的打寬與彙總

    您還可以在WITH參數中設定fields.<field-name>.aggregate-function,為<field-name>這一列指定彙總函式,對該列的值進行彙總。<field-name>這一列需要屬於某個 sequence group。aggregation合并機制支援的彙總函式均可使用。

    例如,a、b兩列將根據g_1 列的值從小到大進行合并,其中a列將會保留最新的非null值,而 b列將會保留輸入的最大值。c、d兩列將根據g_2列的值從小到大進行合并,其中c列將會保留最新的非null值,而d列將會求出輸入資料的和。

    CREATE TABLE T (
      k INT,
      a STRING,
      b INT,
      g_1 INT,
      c STRING,
      d INT,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.b.aggregate-function' = 'max',
      'fields.g_2.sequence-group' = 'c,d',
      'fields.d.aggregate-function' = 'sum'
    );

更多資訊,詳情請參見Merge Engine

亂序資料處理

預設情況下,Paimon會按照資料的輸入順序確定合并的順序,最後寫入Paimon的資料會被認為是最新資料。如果您的輸入資料流存在亂序資料,可以通過在WITH參數中指定'sequence.field' = '<column-name>,具有相同主鍵的資料將按<column-name>這一列的值從小到大進行合并。可以作為sequence field的資料類型有:TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP和TIMESTAMP_LTZ。

說明

如果您使用MySQLop_t中繼資料作為sequence field,會導致一對update_before與update_after訊息具有相同的sequence field值,需要在WITH參數中設定'sequence.auto-padding' = 'row-kind-flag',以保證Paimon會先處理update_before訊息,再處理update_after訊息。

Paimon Append Only表(非主鍵表)

如果在建立Paimon表時沒有指定主鍵(Primary Key),則該表就是Paimon Append Only表。您只能以流式方式將完整記錄插入到表中,適合不需要流式更新的情境(例如日誌資料同步)。

文法結構

例如,以下SQL語句將建立一張分區鍵為dt的Append Scalable表。

CREATE TABLE T (
  dt STRING
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) PARTITIONED BY (dt) WITH (
  'bucket' = '-1'
);

表類型

類型

定義

說明

Append Scalable表

建立Paimon Append Only表時,在WITH參數中指定'bucket' = '-1',將會建立Append Scalable表。

作為Hive表的高效替代,在對資料的流式消費順序沒有需求的情況下,應盡量選擇Append Scalable表。它具有寫入無shuffle、資料可排序、並發控制便捷,可直接將直接吸收或者轉換現存的hive表、且可以使用非同步合并等優勢,進一步加快寫入過程。

Append Queue表

建立Paimon Append Only表時,在WITH參數中指定'bucket' = '<num>',將會建立Append Queue表。

其中,<num>是一個大於0的整數,指定了非分區表的分桶數,或分區表單個分區的分桶數。

作為訊息佇列具有分鐘級延遲的替代。Paimon表的分桶數此時相當於Kafka的Partition數或雲Message QueueTT版的Shard數。

資料的分發

類型

說明

Append Scalable表

由於無視桶的概念,多個並發可以同時寫同一個分區,無考慮順序以及資料是否分到對應桶的問題。在寫入時,直接由上遊將資料推往writer節點。其中,不需要對資料進行hash partitioning。由此,在其他條件相同的情況下,通常此類型的表寫入速度更快。當上遊並發和writer並發相同時,需要注意是否會產生資料扭曲問題。

Append Queue表

預設情況下,Paimon將根據每條資料所有列的取值,確定該資料屬於哪個分桶(bucket)。如果您需要修改資料的分桶方式,可以在建立Paimon表時,在WITH參數中指定bucket-key參數,不同列的名稱用英文逗號分隔。

例如,設定'bucket-key' = 'c1,c2',則Paimon將根據每條資料c1c2兩列的值,確定該資料屬於哪個分桶。

說明

建議儘可能設定bucket-key,以減少分桶過程中參與計算的列的數量,提高Paimon表的寫入效率。

資料消費順序

類型

說明

Append Scalable表

無法像Append Queue表一樣,在流式消費Paimon表時保證資料的消費順序與資料寫入Paimon表的順序一致。適合對資料的流式消費順序沒有需求情境。

Append Queue表

Append Queue表可以保證流式消費Paimon表時,每個分桶中資料的消費順序與資料寫入Paimon表的順序一致。具體來說:

  • 對於兩條來自不同分區(partition)的資料

    • 如果表參數中設定了'scan.plan-sort-partition' = 'true',則分區值更小的資料會首先產出。

    • 如果表參數中未設定'scan.plan-sort-partition' = 'true',則分區建立時間更早的資料會首先產出。

  • 對於兩條來自相同分區的相同分桶的資料,先寫入Paimon表的資料會首先產出。

  • 對於兩條來自相同分區但不同分桶的資料,由於不同分桶可能被不同的Flink作業並發處理,因此不保證兩條資料的消費順序。

相關文檔