全部產品
Search
文件中心

Realtime Compute for Apache Flink:Paimon表資料寫入和消費

更新時間:Jul 13, 2024

本文為您介紹如何在Realtime Compute開發控制台向Paimon表中插入、更新、覆寫或刪除資料,以及從Paimon表消費資料,並指定消費位點。

前提條件

已建立Paimon Catalog和Paimon表,詳情請參見管理Paimon Catalog

使用限制

僅Realtime Compute引擎VVR 8.0.5及以上版本支援Paimon表。

向Paimon表寫入資料

通過CTAS/CDAS語句同步資料及表結構變更

詳情請參見管理Paimon Catalog

通過INSERT INTO語句插入或更新資料

您可以通過INSERT INTO語句,直接向Paimon表插入或更新資料。

通過INSERT OVERWRITE語句覆寫資料

覆寫是指清空並重新寫入資料。您可以通過INSERT OVERWRITE語句覆寫整張Paimon表或覆寫特定分區,SQL語句樣本如下。

說明
  • 僅批作業支援INSERT OVERWRITE語句。

  • 預設情況下,INSERT OVERWRITE操作不會產生變更資料,刪除與匯入的資料無法被下遊流式消費。如果您需要消費此類資料,請參見流式消費INSERT OVERWRITE語句的結果

  • my_table表是非分區表,覆寫整張my_table表。

    INSERT OVERWRITE my_table SELECT ...;
  • my_table表是分區表,覆寫my_table表中的dt=20240108,hh=06分區。

    INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
  • my_table表是分區表,動態覆寫my_table表中的分區,即SELECT語句結果中出現的分區都會被覆寫,其它分區保持不變。

    INSERT OVERWRITE my_table SELECT ...;
  • my_table表是分區表,覆寫整張my_table表。

    INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;

通過DELETE語句刪除資料

您可以通過DELETE語句從Paimon主鍵表中刪除資料。DELETE語句只能在查詢指令碼中執行。

--從my_table表中刪除所有currency = 'UNKNOWN'的資料。
DELETE FROM my_table WHERE currency = 'UNKNOWN';

過濾刪除訊息

使用Paimon主鍵表時,預設情況下,類型為DELETE的訊息會將Paimon表中對應主鍵的資料刪除。如果您不希望Paimon表處理此類訊息,可以通過SQL hint將以下參數設定為true,過濾刪除訊息。

參數

說明

資料類型

預設值

ignore-delete

是否過濾刪除訊息。

Boolean

false

調整結果表的並發數

您可以通過SQL hint設定以下參數,手動調整結果表運算元的並發數。

參數

說明

資料類型

預設值

sink.parallelism

手動設定Paimon結果表運算元的並發數。

Integer

例如,以下SQL將手動設定Paimon結果表運算元並發數為10。

INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;

從Paimon表消費資料

通過流作業消費Paimon表

說明

通過流作業消費的Paimon主鍵表需要設定變更資料產生機制

預設情況下,流作業中的Paimon源表運算元將首先產出作業啟動時刻Paimon表中的全量資料,之後產出從作業啟動時刻開始Paimon表中的增量資料。

從指錨點消費Paimon表

您可以通過以下方式從指錨點消費Paimon表:

  • 如果您不需要消費作業啟動時刻Paimon表中的全量資料,只需要消費後續的增量資料,可通過SQL Hint設定'scan.mode' = 'latest'

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
  • 如果您不想要消費全量資料,只想消費從指定時間點開始的增量資料,可通過SQL Hint設定scan.timestamp-millis參數。參數值表示從Unix Epoch(1970-01-01 00:00:00 UTC)開始到指定時間點經過的毫秒數。

    SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
  • 如果您想要消費從指定時間點之後寫入的全量資料,並持續消費後續的增量資料,可以從以下兩種操作中選擇一種。

    說明

    此類消費方式將讀取在指定時間點之後修改的資料檔案。由於小檔案合并,資料檔案中可能包含少量在指定時間點之前寫入的資料。您可以根據業務需求,在SQL作業中添加WHERE 過濾條件對資料進行過濾。

    • 不設定任何SQL Hint,在啟動作業時,選擇指定源表開始時間並指定具體的時間資訊。image.png

    • 通過SQL Hint設定scan.file-creation-time-millis參數。

      SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
  • 如果您不想要消費全量資料,只想消費從特定快照檔案開始的增量資料,可通過SQL Hint設定scan.snapshot-id參數,參數值是指定快照檔案的編號。

    SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
  • 如果您想要消費特定快照檔案的全量資料,並持續消費後續的增量資料,可通過SQL hint設定'scan.mode' = 'from-snapshot-full'scan.snapshot-id參數,scan.snapshot-id參數值是指定快照檔案的編號。

    SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;

指定Consumer ID

Consumer ID可以儲存Paimon表的消費進度,主要用於以下情境:

  • 如果您修改了SQL作業的計算邏輯,可能會導致作業拓撲發生變化,無法從Flink狀態中恢複消費進度。設定Consumer ID可以將此ID對應的消費進度儲存在Paimon表的中繼資料檔案中,即使後續無狀態啟動作業,也能從中斷的位點繼續消費Paimon表。

  • 設定Consumer ID後,未被消費過的快照檔案不會因到期而被刪除,可以防止因消費速度跟不上快照到期速度導致的報錯。

通過設定consumer-id參數,您可以給流作業中的Paimon源表運算元賦予一個Consumer ID,其值可以是任意的字串。Consumer ID第一次建立時,它的起始消費位點根據從指錨點消費Paimon表中的規則確定。後續只要繼續使用相同的Consumer ID,即可恢複Paimon表的消費進度。

例如,為Paimon源表運算元設定名為test-id的Consumer ID的SQL語句樣本如下。如果您想要重設某個Consumer ID對應的消費位點,可以額外設定'consumer.ignore-progress' = 'true'

SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
說明

由於未被Consumer ID消費過的快照檔案不會因到期而被刪除,如果不及時清理廢棄的Consumer ID,快照檔案及其對應的歷史資料檔案將永遠不會被刪除,會佔用儲存空間。您可以設定consumer.expiration-time表參數,將超過規定時間不使用的Cconsumer ID清理掉。例如,'consumer.expiration-time' = '3d'表示將3天未使用的Consumer ID清理掉。

流式消費INSERT OVERWRITE語句的結果

預設情況下,INSERT OVERWRITE操作不會產生變更資料,刪除與匯入的資料無法被下遊流式消費。如果您需要消費此類資料,可以在流式消費作業中通過SQL Hint配置'streaming-read-overwrite' = 'true'

SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;

通過批作業消費Paimon表

預設情況下,批作業中的Paimon源表運算元將讀取最新的快照檔案,輸出Paimon表的最新狀態資料。

Batch Time Travel

通過SQL Hint設定scan.timestamp-millis參數,即可查詢Paimon表在該時間點的狀態。參數值表示從Unix Epoch(1970-01-01 00:00:00 UTC)開始到指定時間點經過的毫秒數。

SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;

通過SQL Hint設定scan.snapshot-id參數,即可查詢Paimon表在該快照檔案產生時的狀態。參數值為指定快照檔案的編號。

SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;

查詢兩次快照之間的資料變化

如果您想要查詢兩次快照間Paimon表中資料發生的變化,可以通過SQL Hint設定incremental-between參數。例如,查看20號快照檔案和12號快照檔案間發生變化的所有資料,SQL語句樣本如下。

SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
說明

由於批作業不支援消費Delete類型的訊息,預設情況下此類訊息將會被丟棄。如果您想要在批作業中消費Delete類型的訊息,請查詢Audit Log系統資料表。例如SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */;

調整源表的並發數

預設情況下,Paimon根據分區數以及分桶數等資訊自動推斷源表運算元的並發數。您可以通過SQL Hint設定以下參數,手動調整源表運算元的並發數。

參數

資料類型

預設值

備忘

scan.parallelism

Integer

手動設定Paimon源表運算元的並發數。

scan.infer-parallelism

Boolean

true

是否自動推斷Paimon源表運算元的並發數。

scan.infer-parallelism.max

Integer

1024

Paimon源表運算元自動推斷出的並發數上限。

手動設定Paimon源表運算元並發數為10的SQL語句樣本如下。

SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;

使用Paimon維表

Paimon也可以作為維表使用。關於維表JOIN的文法,詳情請參見維表JOIN語句

相關文檔