全部產品
Search
文件中心

Realtime Compute for Apache Flink:流式資料湖倉Paimon

更新時間:Oct 01, 2024

流式資料湖倉Paimon連接器推薦配合Paimon Catalog使用,本文為您介紹如何使用流式資料湖倉Paimon連接器。

背景資訊

Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。目前阿里雲開源巨量資料平台E-MapReduce常見的計算引擎(例如Flink、Spark、Hive或Trino)都與Paimon有著較為完善的整合度。您可以藉助Apache Paimon快速地在HDFS或者雲端OSS上構建自己的資料湖儲存服務,並接入上述計算引擎實現資料湖的分析,詳情請參見Apache Paimon

類別

詳情

支援類型

源表、維表和結果表,資料攝入目標端

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

暫無

API種類

SQL,資料攝入YAML作業

是否支援更新或刪除結果表資料

特色功能

目前Apache Paimon提供以下核心能力:

  • 基於HDFS或者Object Storage Service構建低成本的輕量級資料湖儲存服務。

  • 支援在流模式與批模式下讀寫大規模資料集。

  • 支援分鐘級到秒級資料新鮮度的批查詢和OLAP查詢。

  • 支援消費與產生增量資料,可作為傳統的離線數倉和新型的流式數倉的各級儲存。

  • 支援預彙總資料,降低儲存成本與下遊計算壓力。

  • 支援回溯歷史版本的資料。

  • 支援高效的資料過濾。

  • 支援表結構變更。

使用限制

  • 僅Flink計算引擎VVR 6.0.6及以上版本支援Paimon連接器。

  • Paimon與VVR版本對應關係詳情如下表所示。

    Paimon社區版本

    Realtime ComputeFlink版引擎版本(VVR )

    0.9

    8.0.7、8.0.8、8.0.9

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

Paimon連接器可以在SQL作業中使用,作為源表或者結果表。

文法結構

  • 如果您在Paimon Catalog中建立Paimon表,則無需指定connector參數,此時建立Paimon表的文法結構如下。

    CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    說明

    如果您已在Paimon Catalog中建立了Paimon表,後續無需再次建立表即可直接使用。

  • 如果您在其他Catalog中建立Paimon暫存資料表,則需要指定connector參數與Paimon表的儲存路徑path,此時建立Paimon表的文法結構如下。

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- 如果指定路徑不存在Paimon表資料檔案,則會自動建立檔案。
      ...
    );

WITH參數

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

  • 如果在Paimon Catalog中建立Paimon表,則無需填寫。

  • 如果在其他Catalog中建立Paimon暫存資料表,則固定值為paimon

path

表格儲存體路徑。

String

  • 如果在Paimon Catalog中建立Paimon表,則無需填寫。

  • 如果在其他Catalog中建立Paimon暫存資料表,則為表在HDFS或OSS中的儲存目錄。

auto-create

建立Paimon暫存資料表時,若指定路徑不存在Paimon表檔案,是否自動建立檔案。

Boolean

false

參數取值如下:

  • false(預設):如果指定路徑不存在Paimon表檔案,則報錯。

  • true:如果指定路徑不存在,則Flink系統自動建立Paimon表檔案。

bucket

每個分區的分桶數。

Integer

1

寫入Paimon表的資料將按bucket-key打散至每個bucket中。

說明

建議每個Bucket的資料量在5 GB以下。

bucket-key

分桶關鍵列。

String

指定將寫入Paimon表的資料按哪些列的值打散至不同的Bucket中。

列名之間用英文逗號(,)分隔,例如'bucket-key' = 'order_id,cust_id'會將資料按order_id列和cust_id列的值進行打散。

說明
  • 如果該參數未填寫,則按primary key進行打散。

  • 如果Paimon表未指定primary key,則按所有列的值進行打散。

changelog-producer

增量資料產生機制。

String

none

Paimon可以為任意輸入資料流產生完整的增量資料(所有的update_after資料都有對應的update_before資料),方便下遊消費者。增量資料產生機制的可選值如下:

  • none(預設值):不額外產生增量資料。下遊仍然可以流讀Paimon表,但讀到的增量資料是不完整的(只有update_after資料,沒有對應的update_before資料)。

  • input:將輸入資料流雙寫至增量資料檔案中,作為增量資料。

  • full-compaction:每次Full Compaction產生完整的增量資料。

  • lookup:每次commit snapshot前產生完整的增量資料。

關於增量資料產生機制的選擇,詳情請參見增量資料產生機制

full-compaction.delta-commits

Full Compaction最大間隔。

Integer

該參數指定了每commit snapshot多少次之後,一定會進行一次Full Compaction。

lookup.cache-max-memory-size

Paimon維表的記憶體緩衝大小。

String

256 MB

該參數值會同時影響維表緩衝大小和lookup changelog-producer的緩衝大小,兩個機制的緩衝大小都由該參數配置。

merge-engine

相同primary key資料的合并機制。

String

deduplicate

參數取值如下:

  • deduplicate:僅保留最新一條。

  • partial-update:用最新資料中非null的列更新相同primary key的現有資料,其它列保持不變。

  • aggregation:通過指定彙總函式進行預彙總。

關於資料合併機制的具體分析,詳情請參見資料合併機制

partial-update.ignore-delete

是否忽略delete(-D)類型的訊息。

Boolean

false

參數取值如下:

  • true:忽略delete訊息。

  • false:不忽略delete訊息。您需要通過sequence.field等配置項來設定Sink對於delete資料的處理策略,否則可能會拋出IllegalStateException或IllegalArgumentException報錯。

說明
  • 在Realtime Compute引擎VVR 8.0.6及以下版本,該參數只在partial update情境下,merge-engine = partial-update時生效。

  • 在Realtime Compute引擎VVR 8.0.7及以上版本,該參數相容適配非partial update情境,與ignore-delete參數功能一致,推薦替換成ignore-delete

  • 請您根據實際業務情境,判斷出現的delete類型資料是否符合預期,從而決定是否啟用該參數。如果delete類型資料所代表的作業語義不符合預期,則拋出錯誤是更合適的選擇。

ignore-delete

是否忽略delete(-D)類型的訊息。

Boolean

false

參數取值同partial-update.ignore-delete

說明
  • 僅Realtime Compute引擎VVR 8.0.7及以上版本支援該參數。

  • 與partial-update.ignore-delete參數功能一致,推薦使用ignore-delete參數,並避免同時配置這兩參數。

partition.default-name

分區預設名稱。

String

__DEFAULT_PARTITION__

如果分區列的值為null或Null 字元串,將會採用該預設名稱作為分區名。

partition.expiration-check-interval

多久檢查一次分區到期。

String

1h

詳情請參見如何設定分區自動到期?

partition.expiration-time

分區的到期時間長度。

String

當一個分區的存活時間長度超過該值時,該分區將會到期,預設永不到期。

一個分區的存活時間長度由該分區的分區值計算而來,詳情請參見如何設定分區自動到期?

partition.timestamp-formatter

將時間字串轉換為時間戳記的格式串。

String

設定從分區值提取分區存活時間長度的格式,詳情請參見如何設定分區自動到期?

partition.timestamp-pattern

將分區值轉換為時間字串的格式串。

String

設定從分區值提取分區存活時間長度的格式,詳情請參見如何設定分區自動到期?

scan.bounded.watermark

當Paimon源表產生的資料的watermark超過該值時,Paimon源表將會結束產生資料。

Long

無。

scan.mode

指定Paimon源表的消費位點。

String

default

詳情請參見如何設定Paimon源表的消費位點?

scan.snapshot-id

指定Paimon源表從哪個snapshot開始消費。

Integer

詳情請參見如何設定Paimon源表的消費位點?

scan.timestamp-millis

指定Paimon源表從哪個時間點開始消費。

Integer

詳情請參見如何設定Paimon源表的消費位點?

snapshot.num-retained.max

至多保留幾個最新Snapshot不到期。

Integer

2147483647

只要滿足該配置或snapshot.time-retained其中之一,並同時滿足snapshot.num-retained.min,就會觸發Snapshot到期。

snapshot.num-retained.min

至少保留幾個最新Snapshot不到期。

Integer

10

無。

snapshot.time-retained

Snapshot產生多久以後會到期。

String

1h

只要滿足該配置或snapshot.num-retained.max其中之一,並同時滿足snapshot.num-retained.min,就會觸發snapshot到期。

write-mode

Paimon表的寫入模式。

String

change-log

參數取值如下:

  • change-log:Paimon表支援根據primary key進行資料的插入、刪除和更新。

  • append-only:Paimon表只接受資料的插入,且不支援primary key。該模式比change-log模式更加高效。

關於寫入模式的具體介紹,詳情請參見寫入模式

scan.infer-parallelism

是否自動推斷Paimon源表的並發度。

Boolean

false

參數取值如下:

  • true:將會根據分桶數自動推斷Paimon源表的並發度。

  • false:按VVP配置的預設並發。如果是專家模式就按使用者配置的並發。

scan.parallelism

Paimon源表的並發度。

Integer

說明

在作業部署詳情 > 資源配置頁簽中,資源模式為專家模式時,該參數不生效。

sink.parallelism

Paimon結果表的並發度。

Integer

說明

在作業部署詳情 > 資源配置頁簽中,資源模式為專家模式時,該參數不生效。

sink.clustering.by-columns

指定寫入Paimon結果表的聚類列。

String

對於Paimon Append Only表(非主鍵表),在批作業中配置該參數可以啟用聚類寫入功能,使資料在特定列上按大小範圍聚集分布,從而提升該表的查詢速度。

多個列名請使用英文逗號(,)進行分隔,例如'col1,col2'

聚類詳情請參見Apache Paimon官方文檔

sink.delete-strategy​

設定校正策略,確保系統能正確處理回撤(-D/-U)類型訊息。

​​

Enum

NONE

校正策略取值及Sink運算元應當正確處理回撤訊息的行為如下:​

  • ​NONE(預設值):不做校正。​

  • IGNORE_DELETE:Sink運算元應當忽略-U和-D類型的訊息,不發生回撤。

  • NON_PK_FIELD_TO_NULL:Sink運算元應當忽略-U類型的訊息,但是在收到-D類型的訊息時,保持主索引值不變、回撤Schema中其他非主索引值。

    主要用在多個Sink同時寫入同一張表時部分更新的情境。​

  • DELETE_ROW_ON_PK:Sink運算元應當忽略-U類型的訊息,但是在收到-D類型的訊息時刪除主鍵對應的行。​

  • CHANGELOG_STANDARD:Sink運算元應當在收到-U和-D類型的資料時均會刪除主鍵對應的行。​

說明
  • 僅Realtime Compute引擎VVR 8.0.8及以上版本支援該參數。

  • Paimon Sink處理回撤訊息的行為實際由ignore-delete、merge-engine等其他配置項的值決定。本配置項不直接影響這部分行為,而是會校正這部分行為是否符合預期策略。在不符合預期策略的情況下,相關校正步驟將終止,並在作業報錯中提示您如何修改ignore-delete、merge-engine等其他配置項以符合預期。

說明

更多配置項詳情請參見Apache Paimon官方文檔

特色功能詳解

資料新鮮度與一致性保證

Paimon結果表使用兩階段交易認可協議,在每次Flink作業的checkpoint期間提交寫入的資料,因此資料新鮮度即為Flink作業的checkpoint間隔。每次提交將會產生至多兩個snapshot。

當兩個Flink作業同時寫入一張Paimon表時,如果兩個作業的資料沒有寫入同一個分桶,則能保證serializable層級的一致性。如果兩個作業的資料寫入了同一個分桶,則只能保證snapshot isolation層級的一致性。也就是說,表中的資料可能混合了兩個作業的結果,但不會有資料丟失。

資料合併機制

當Paimon結果表收到多條具有相同primary key的資料時,為了保持primary key的唯一性,Paimon結果表會將這些資料合併成一條資料。通過指定merge-engine參數,您可以指定資料合併的具體行為。資料合併機制詳情如下表所示。

合并機制

詳情

去重(Deduplicate)

去重機制(deduplicate)是預設的資料合併機制。對於多條具有相同primary key的資料,Paimon結果表僅會保留最新一條資料,並丟棄其它具有primary key的資料。

說明

如果最新一條資料是一條delete訊息,所有具有該primary key的資料都將被丟棄。

部分更新(Partial Update)

通過指定部分更新機制(partial-update),您可以通過多條訊息對資料進行逐步更新,並最終得到完整的資料。具體來說,具有相同primary key的新資料將會覆蓋原來的資料,但值為null的列不會進行覆蓋。

例如,假設Paimon結果表按順序收到了以下三條資料:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

第一列是primary key,則最終結果為<1, 25.2, 10, 'This is a book'>。

說明
  • 如果需要流讀partial-update的結果,必須將changelog-producer參數設定為lookup或full-compaction。

  • partial-update無法處理delete訊息。您可以設定partial-update.ignore-delete參數以忽略delete訊息。

預彙總(Aggregation)

部分情境下,可能只關心彙總後的值。預彙總機制(aggregation)將具有相同primary key的資料根據您指定的彙總函式進行彙總。對於不屬於primary key的每一列,都需要通過fields.<field-name>.aggregate-function指定一個彙總函式,否則該列將預設使用last_non_null_value彙總函式。例如,考慮以下Paimon表的定義。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price列將會根據max函數進行彙總,而sales列將會根據sum函數進行彙總。給定兩條輸入資料 <1, 23.0, 15>和 <1, 30.2, 20>,最終結果為<1, 30.2, 35>。當前支援的彙總函式與對應的資料類型如下:

  • sum:支援DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • min和max:支援DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。

  • last_value和last_non_null_value:支援所有資料類型。

  • listagg:支援STRING。

  • bool_and和bool_or:支援BOOLEAN。

說明
  • 只有sum函數支援回撤與刪除資料,其它彙總函式不支援回撤與刪除。如果您需要某些列忽略回撤與刪除訊息,可以設定'fields.${field_name}.ignore-retract'='true'

  • 如果需要流讀aggregation的結果,必須將changelog-producer參數設定為lookup或full-compaction。

增量資料產生機制

通過changelog-producer參數設定相應的增量資料產生機制,Paimon可以為任意輸入資料流產生完整的增量資料(所有的update_after資料都有對應的update_before資料)。以下列舉了所有的增量資料產生機制,更加詳細的介紹請參見Apache Paimon官方文檔

機制

詳情

None

設定changelog-producer為none(預設值)後,此時,對於同一個primary key,下遊的Paimon源表只能看到資料的最新情況。但這些最新情況無法讓下遊消費者方便地瞭解完整的增量資料,從而進行正確的計算。因為它只能確定對應資料是否被刪除了,或最新資料是什麼,無法得知更改之前的資料是什麼。

例如,假設下遊消費者需要計算某一列的總和,如果消費者只看到了最新資料5,它無法斷定該如何更新總和。因為如果之前的資料是4,它應該將總和增加1;如果之前的資料是6,它應該將總和減去1。此類消費者對update_before較為敏感,建議不要將增量資料產生機制配置為None,但是其他增量資料產生機制會帶來效能損耗。

說明

如果您的下遊是資料庫之類的對update_before資料不敏感的消費者,則可以將增量資料產生機制配置為None。因此,建議您根據實際需要配置增量資料產生機制。

Input

設定changelog-producer為input後,Paimon結果表會將輸入資料流雙寫至增量資料檔案中,作為增量資料。

因此,只有當輸入資料流本身是完整的增量資料時(例如CDC資料),才能使用這一增量資料產生機制。

Lookup

設定changelog-producer為lookup後,Paimon結果表會通過一種類似於維表的點查機制,在每次commit snapshot之前產生本次snapshot對應的完整增量資料。無論輸入資料是否為完整的增量資料,這一增量資料產生機制均能產生完整的增量資料。

與下文的Full Compaction機制相比,Lookup機制產生增量資料的時效性更好,但總體來看耗費的資源更多。

推薦在對增量資料的新鮮度有較高要求(例如分鐘級)的情況下使用。

Full Compaction

設定changelog-producer為full-compaction後,Paimon結果表會在每一次full compaction時產生完整的增量資料。無論輸入資料是否為完整的增量資料,這一增量資料產生機制均能產生完整的增量資料。Full compaction的時間間隔由full-compaction.delta-commits參數指定。

與上文的Lookup機制相比,Full Compaction機制產生增量資料的時效性更差,但它利用了資料的full compaction過程,不產生額外計算,因此總體來看耗費的資源更少。

推薦在對增量資料的新鮮度要求不高(例如小時級)的情況下使用。

寫入模式

Paimon表目前支援的寫入模式如下。

模式

詳情

Change-log

change-log寫入模式是Paimon表的預設寫入模式。該寫入模式支援根據primary key對資料進行插入、刪除與更新,您也可以在該寫入模式下使用上文提到的資料合併機制與增量資料產生機制。

Append-only

append-only寫入模式僅支援資料的插入,且不支援primary key。該模式比change-log模式更加高效,可在對資料新鮮度要求一般的情境下(例如分鐘級新鮮度)作為訊息佇列的替代品。

關於append-only寫入模式的詳細介紹,請參見Apache Paimon官方文檔。在使用append-only寫入模式時,需要注意以下兩點:

  • 建議您根據實際需求設定bucket-key參數,否則Paimon表將根據所有列的值進行分桶,計算效率較低。

  • append-only寫入模式可在一定程度上保證資料的產出順序,具體的產出順序為:

    1. 如果兩條資料來自不同的分區,若設定了scan.plan-sort-partition參數,則分區值較小的資料將首先產出。否則來自較早建立的分區的資料將首先產出。

    2. 如果兩條資料來自同一分區的同一分桶,則較早寫入的資料將首先產出。

    3. 如果兩條資料來自同一分區的不同分桶,由於不同分桶由不同的並發進行處理,因此不保證兩條資料的產出順序。

作為CTAS和CDAS的目標端

Paimon表支援即時同步單表或整庫層級的資料,在同步過程之中如果上遊的表結構發生了變更也會即時同步到Paimon表中。詳見管理Paimon表管理Paimon Catalog

資料攝入

Paimon連接器可以用於資料攝入YAML作業開發,作為目標端寫入。

文法結構

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

配置項

參數

說明

是否必填

資料類型

預設值

備忘

type

連接器類型。

STRING

固定值為paimon

name

目標端名稱。

STRING

Sink的名稱。

catalog.properties.metastore

Paimon Catalog的類型。

STRING

filesystem

取值如下:

  • filesystem(預設值)

  • dlf-paimon

catalog.properties.*

建立Paimon Catalog的參數。

STRING

詳情請參見管理Paimon Catalog

table.properties.*

建立Paimon table的參數。

STRING

詳情請參見Paimon table options

catalog.properties.warehouse

檔案儲存體的根目錄。

STRING

僅在catalog.properties.metastore設定為 filesystem時生效。

commit.user

提交資料檔案時的使用者名稱。

STRING

說明

建議為不同的作業設定不同的使用者名稱,方便在出現提交衝突時定位衝突的作業。

partition.key

每個分區表的分區欄位。

STRING

不同的表使用;分割,不同的欄位使用,分割,表與欄位使用:分割。例如:testdb.table1:id1,id2;testdb.table2:name

使用樣本

使用Paimon作為資料攝入目標端:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

常見問題