流式資料湖倉Paimon連接器推薦配合Paimon Catalog使用,本文為您介紹如何使用流式資料湖倉Paimon連接器。
背景資訊
Apache Paimon是一種流批統一的湖儲存格式,支援高吞吐的寫入和低延後查詢。目前阿里雲開源巨量資料平台E-MapReduce常見的計算引擎(例如Flink、Spark、Hive或Trino)都與Paimon有著較為完善的整合度。您可以藉助Apache Paimon快速地在HDFS或者雲端OSS上構建自己的資料湖儲存服務,並接入上述計算引擎實現資料湖的分析,詳情請參見Apache Paimon。
類別 | 詳情 |
支援類型 | 源表、維表和結果表,資料攝入目標端 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不支援 |
特有監控指標 | 暫無 |
API種類 | SQL,資料攝入YAML作業 |
是否支援更新或刪除結果表資料 | 是 |
特色功能
目前Apache Paimon提供以下核心能力:
基於HDFS或者Object Storage Service構建低成本的輕量級資料湖儲存服務。
支援在流模式與批模式下讀寫大規模資料集。
支援分鐘級到秒級資料新鮮度的批查詢和OLAP查詢。
支援消費與產生增量資料,可作為傳統的離線數倉和新型的流式數倉的各級儲存。
支援預彙總資料,降低儲存成本與下遊計算壓力。
支援回溯歷史版本的資料。
支援高效的資料過濾。
支援表結構變更。
使用限制
僅Flink計算引擎VVR 6.0.6及以上版本支援Paimon連接器。
Paimon與VVR版本對應關係詳情如下表所示。
Paimon社區版本
Realtime ComputeFlink版引擎版本(VVR )
0.9
8.0.7、8.0.8、8.0.9
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
Paimon連接器可以在SQL作業中使用,作為源表或者結果表。
文法結構
如果您在Paimon Catalog中建立Paimon表,則無需指定
connector
參數,此時建立Paimon表的文法結構如下。CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );
說明如果您已在Paimon Catalog中建立了Paimon表,後續無需再次建立表即可直接使用。
如果您在其他Catalog中建立Paimon暫存資料表,則需要指定connector參數與Paimon表的儲存路徑path,此時建立Paimon表的文法結構如下。
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- 如果指定路徑不存在Paimon表資料檔案,則會自動建立檔案。 ... );
WITH參數
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 否 | 無 |
|
path | 表格儲存體路徑。 | String | 否 | 無 |
|
auto-create | 建立Paimon暫存資料表時,若指定路徑不存在Paimon表檔案,是否自動建立檔案。 | Boolean | 否 | false | 參數取值如下:
|
bucket | 每個分區的分桶數。 | Integer | 否 | 1 | 寫入Paimon表的資料將按 說明 建議每個Bucket的資料量在5 GB以下。 |
bucket-key | 分桶關鍵列。 | String | 否 | 無 | 指定將寫入Paimon表的資料按哪些列的值打散至不同的Bucket中。 列名之間用英文逗號(,)分隔,例如 說明
|
changelog-producer | 增量資料產生機制。 | String | 否 | none | Paimon可以為任意輸入資料流產生完整的增量資料(所有的update_after資料都有對應的update_before資料),方便下遊消費者。增量資料產生機制的可選值如下:
關於增量資料產生機制的選擇,詳情請參見增量資料產生機制。 |
full-compaction.delta-commits | Full Compaction最大間隔。 | Integer | 否 | 無 | 該參數指定了每commit snapshot多少次之後,一定會進行一次Full Compaction。 |
lookup.cache-max-memory-size | Paimon維表的記憶體緩衝大小。 | String | 否 | 256 MB | 該參數值會同時影響維表緩衝大小和lookup changelog-producer的緩衝大小,兩個機制的緩衝大小都由該參數配置。 |
merge-engine | 相同primary key資料的合并機制。 | String | 否 | deduplicate | 參數取值如下:
關於資料合併機制的具體分析,詳情請參見資料合併機制。 |
partial-update.ignore-delete | 是否忽略delete(-D)類型的訊息。 | Boolean | 否 | false | 參數取值如下:
說明
|
ignore-delete | 是否忽略delete(-D)類型的訊息。 | Boolean | 否 | false | 參數取值同partial-update.ignore-delete。 說明
|
partition.default-name | 分區預設名稱。 | String | 否 | __DEFAULT_PARTITION__ | 如果分區列的值為null或Null 字元串,將會採用該預設名稱作為分區名。 |
partition.expiration-check-interval | 多久檢查一次分區到期。 | String | 否 | 1h | 詳情請參見如何設定分區自動到期? |
partition.expiration-time | 分區的到期時間長度。 | String | 否 | 無 | 當一個分區的存活時間長度超過該值時,該分區將會到期,預設永不到期。 一個分區的存活時間長度由該分區的分區值計算而來,詳情請參見如何設定分區自動到期? |
partition.timestamp-formatter | 將時間字串轉換為時間戳記的格式串。 | String | 否 | 無 | 設定從分區值提取分區存活時間長度的格式,詳情請參見如何設定分區自動到期? |
partition.timestamp-pattern | 將分區值轉換為時間字串的格式串。 | String | 否 | 無 | 設定從分區值提取分區存活時間長度的格式,詳情請參見如何設定分區自動到期? |
scan.bounded.watermark | 當Paimon源表產生的資料的watermark超過該值時,Paimon源表將會結束產生資料。 | Long | 否 | 無 | 無。 |
scan.mode | 指定Paimon源表的消費位點。 | String | 否 | default | 詳情請參見如何設定Paimon源表的消費位點? |
scan.snapshot-id | 指定Paimon源表從哪個snapshot開始消費。 | Integer | 否 | 無 | 詳情請參見如何設定Paimon源表的消費位點? |
scan.timestamp-millis | 指定Paimon源表從哪個時間點開始消費。 | Integer | 否 | 無 | 詳情請參見如何設定Paimon源表的消費位點? |
snapshot.num-retained.max | 至多保留幾個最新Snapshot不到期。 | Integer | 否 | 2147483647 | 只要滿足該配置或snapshot.time-retained其中之一,並同時滿足snapshot.num-retained.min,就會觸發Snapshot到期。 |
snapshot.num-retained.min | 至少保留幾個最新Snapshot不到期。 | Integer | 否 | 10 | 無。 |
snapshot.time-retained | Snapshot產生多久以後會到期。 | String | 否 | 1h | 只要滿足該配置或snapshot.num-retained.max其中之一,並同時滿足snapshot.num-retained.min,就會觸發snapshot到期。 |
write-mode | Paimon表的寫入模式。 | String | 否 | change-log | 參數取值如下:
關於寫入模式的具體介紹,詳情請參見寫入模式。 |
scan.infer-parallelism | 是否自動推斷Paimon源表的並發度。 | Boolean | 否 | false | 參數取值如下:
|
scan.parallelism | Paimon源表的並發度。 | Integer | 否 | 無 | 說明 在作業 頁簽中,資源模式為專家模式時,該參數不生效。 |
sink.parallelism | Paimon結果表的並發度。 | Integer | 否 | 無 | 說明 在作業 頁簽中,資源模式為專家模式時,該參數不生效。 |
sink.clustering.by-columns | 指定寫入Paimon結果表的聚類列。 | String | 否 | 無 | 對於Paimon Append Only表(非主鍵表),在批作業中配置該參數可以啟用聚類寫入功能,使資料在特定列上按大小範圍聚集分布,從而提升該表的查詢速度。 多個列名請使用英文逗號(,)進行分隔,例如 聚類詳情請參見Apache Paimon官方文檔。 |
sink.delete-strategy | 設定校正策略,確保系統能正確處理回撤(-D/-U)類型訊息。 | Enum | 否 | NONE | 校正策略取值及Sink運算元應當正確處理回撤訊息的行為如下:
說明
|
更多配置項詳情請參見Apache Paimon官方文檔。
特色功能詳解
資料新鮮度與一致性保證
Paimon結果表使用兩階段交易認可協議,在每次Flink作業的checkpoint期間提交寫入的資料,因此資料新鮮度即為Flink作業的checkpoint間隔。每次提交將會產生至多兩個snapshot。
當兩個Flink作業同時寫入一張Paimon表時,如果兩個作業的資料沒有寫入同一個分桶,則能保證serializable層級的一致性。如果兩個作業的資料寫入了同一個分桶,則只能保證snapshot isolation層級的一致性。也就是說,表中的資料可能混合了兩個作業的結果,但不會有資料丟失。
資料合併機制
當Paimon結果表收到多條具有相同primary key的資料時,為了保持primary key的唯一性,Paimon結果表會將這些資料合併成一條資料。通過指定merge-engine
參數,您可以指定資料合併的具體行為。資料合併機制詳情如下表所示。
合并機制 | 詳情 |
去重(Deduplicate) | 去重機制(deduplicate)是預設的資料合併機制。對於多條具有相同primary key的資料,Paimon結果表僅會保留最新一條資料,並丟棄其它具有primary key的資料。 說明 如果最新一條資料是一條delete訊息,所有具有該primary key的資料都將被丟棄。 |
部分更新(Partial Update) | 通過指定部分更新機制(partial-update),您可以通過多條訊息對資料進行逐步更新,並最終得到完整的資料。具體來說,具有相同primary key的新資料將會覆蓋原來的資料,但值為null的列不會進行覆蓋。 例如,假設Paimon結果表按順序收到了以下三條資料:
第一列是primary key,則最終結果為<1, 25.2, 10, 'This is a book'>。 說明
|
預彙總(Aggregation) | 部分情境下,可能只關心彙總後的值。預彙總機制(aggregation)將具有相同primary key的資料根據您指定的彙總函式進行彙總。對於不屬於primary key的每一列,都需要通過
price列將會根據max函數進行彙總,而sales列將會根據sum函數進行彙總。給定兩條輸入資料 <1, 23.0, 15>和 <1, 30.2, 20>,最終結果為<1, 30.2, 35>。當前支援的彙總函式與對應的資料類型如下:
說明
|
增量資料產生機制
通過changelog-producer參數設定相應的增量資料產生機制,Paimon可以為任意輸入資料流產生完整的增量資料(所有的update_after資料都有對應的update_before資料)。以下列舉了所有的增量資料產生機制,更加詳細的介紹請參見Apache Paimon官方文檔。
機制 | 詳情 |
None | 設定 例如,假設下遊消費者需要計算某一列的總和,如果消費者只看到了最新資料5,它無法斷定該如何更新總和。因為如果之前的資料是4,它應該將總和增加1;如果之前的資料是6,它應該將總和減去1。此類消費者對update_before較為敏感,建議不要將增量資料產生機制配置為None,但是其他增量資料產生機制會帶來效能損耗。 說明 如果您的下遊是資料庫之類的對update_before資料不敏感的消費者,則可以將增量資料產生機制配置為None。因此,建議您根據實際需要配置增量資料產生機制。 |
Input | 設定 因此,只有當輸入資料流本身是完整的增量資料時(例如CDC資料),才能使用這一增量資料產生機制。 |
Lookup | 設定 與下文的Full Compaction機制相比,Lookup機制產生增量資料的時效性更好,但總體來看耗費的資源更多。 推薦在對增量資料的新鮮度有較高要求(例如分鐘級)的情況下使用。 |
Full Compaction | 設定 與上文的Lookup機制相比,Full Compaction機制產生增量資料的時效性更差,但它利用了資料的full compaction過程,不產生額外計算,因此總體來看耗費的資源更少。 推薦在對增量資料的新鮮度要求不高(例如小時級)的情況下使用。 |
寫入模式
Paimon表目前支援的寫入模式如下。
模式 | 詳情 |
Change-log | change-log寫入模式是Paimon表的預設寫入模式。該寫入模式支援根據primary key對資料進行插入、刪除與更新,您也可以在該寫入模式下使用上文提到的資料合併機制與增量資料產生機制。 |
Append-only | append-only寫入模式僅支援資料的插入,且不支援primary key。該模式比change-log模式更加高效,可在對資料新鮮度要求一般的情境下(例如分鐘級新鮮度)作為訊息佇列的替代品。 關於append-only寫入模式的詳細介紹,請參見Apache Paimon官方文檔。在使用append-only寫入模式時,需要注意以下兩點:
|
作為CTAS和CDAS的目標端
Paimon表支援即時同步單表或整庫層級的資料,在同步過程之中如果上遊的表結構發生了變更也會即時同步到Paimon表中。詳見管理Paimon表和管理Paimon Catalog。
資料攝入
Paimon連接器可以用於資料攝入YAML作業開發,作為目標端寫入。
文法結構
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
配置項
參數 | 說明 | 是否必填 | 資料類型 | 預設值 | 備忘 |
type | 連接器類型。 | 是 | STRING | 無 | 固定值為 |
name | 目標端名稱。 | 否 | STRING | 無 | Sink的名稱。 |
catalog.properties.metastore | Paimon Catalog的類型。 | 否 | STRING | filesystem | 取值如下:
|
catalog.properties.* | 建立Paimon Catalog的參數。 | 否 | STRING | 無 | 詳情請參見管理Paimon Catalog。 |
table.properties.* | 建立Paimon table的參數。 | 否 | STRING | 無 | 詳情請參見Paimon table options。 |
catalog.properties.warehouse | 檔案儲存體的根目錄。 | 否 | STRING | 無 | 僅在 |
commit.user | 提交資料檔案時的使用者名稱。 | 否 | STRING | 無 | 說明 建議為不同的作業設定不同的使用者名稱,方便在出現提交衝突時定位衝突的作業。 |
partition.key | 每個分區表的分區欄位。 | 否 | STRING | 無 | 不同的表使用 |
使用樣本
使用Paimon作為資料攝入目標端:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse