本文為您介紹如何為Flink自訂彙總函式(UDAF)開發、註冊和使用流程。
定義
自訂彙總函式(UDAF),將多條記錄彙總成1條記錄。其輸入與輸出是多對一的關係,即將多條輸入記錄彙總成一條輸出值。詳情參見User-defined Functions。
User-defined Functions和下文中的ASI_UDX_Demo屬於第三方搭建的網站,訪問時可能會存在無法開啟或訪問延遲的問題。
UDAF開發
Flink為您提供了UDF樣本,便於您快速開發業務。Flink UDF樣本中包含UDSF、UDAF和UDTF的實現,樣本中已為您配置對應版本的開發環境,您無需進行環境搭建。
下載並解壓ASI_UDX_Demo樣本到本地。
解壓完成後,會產生ASI_UDX-main檔案夾。其中:
pom.xml:專案層級的設定檔,主要描述了專案的Maven座標、依賴關係、開發人員需要遵循的規則、缺陷管理系統,組織和Licenses,以及其他所有的專案相關因素。
\ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java:自訂彙總函式(UDAF)樣本的Java代碼。
在IntelliJ IDEA中,選擇ASI_UDX-main。
,開啟剛才解壓縮完成的雙擊開啟\ASI_UDX-main\後,配置pom.xml。
該樣本中,pom.xml檔案已配置了Flink 1.12版開發自訂函數需要的最小化依賴資訊。如果您的業務:
沒有其他依賴:不用配置pom.xml檔案,繼續下一步。
有其他依賴:在pom.xml檔案中添加您所需的依賴資訊。
Flink 1.12版最小化依賴如下。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.12.7</version> <scope>provided</scope> </dependency> </dependencies>
說明在填寫version時,建議根據目標作業的VVR版本,填寫為對應Flink大版本的最新小版本。關於VVR和Flink的對應關係,詳情請參見概述。
雙擊開啟\ASI_UDX-main\src\main\java\ASI_UDAF後,根據您的業務,配置ASI_UDAF.java。
該樣本中,ASI_UDAF.java實現了累積求和的代碼,詳情如下。
package ASI_UDAF; import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; public class ASI_UDAF{ public static class AccSum{ public long sum; } public static class MySum extends AggregateFunction<Long, AccSum>{ @Override public Long getValue(AccSum acSum){ return acSum.sum; } @Override public AccSum createAccumulator(){ AccSum acCount= new AccSum(); acCount.sum=0; return acCount; } public void accumulate(AccSum acc,long num){ acc.sum += num; } /** *Support retract a msg generated by upstream operator. */ public void retract(AccSum acc,long num){ acc.sum -= num; } /** *Support local-global two stage aggregate optimization. */ public void merge(AccSum acc,Iterable<AccSum> it){ Iterator<AccSum> iter=it.iterator(); while(iter.hasNext()){ AccSum accSum=iter.next(); if(null!=accSum){ acc.sum+=accSum.sum; } } } } }
該UDAF的是一個簡單累加和的操作。例如,同一個分組鍵(GROUP BY欄位)的3條輸入資料分別為1、2、3,輸出結果有以下兩種情況:
在未啟用MiniBatch最佳化時,即預設配置,則輸出的結果為1、3、6。
在開啟了MiniBatch最佳化時,因為輸出的資料條數取決於設定的MiniBatch參數和輸入資料的分布情況,所以能確定的是最後輸出一條結果為6, 但輸出的中間結果條數不確定。
說明MiniBatch最佳化詳情,請參見高效能Flink SQL最佳化技巧。
在下載檔案中pom.xml所在目錄執行如下打包命令。
mvn package -Dcheckstyle.skip
\ASI_UDX-main\target\目錄下會出現ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDAF開發工作。
UDAF使用
您可以通過以下兩種方式在SQL作業中使用自訂UDAF:
方式一:先註冊UDAF,再在作業中直接使用登入的UDAF。
通過該方式進行函數註冊的優點為便於後續開發進行代碼複用。UDAF註冊過程,請參見管理自訂函數(UDF)。如果註冊完的函數名稱為ASI_UDAF$MySum,則在作業中直接使用的程式碼範例如下。
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT NOT NULL ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO ASI_UDAF_Sink SELECT `ASI_UDAF$MySum`(a) FROM ASI_UDAF_Source;
方式二:先將自訂函數JAR包上傳至Flink
右側的更多配置的附加依賴檔案選項,再在作業的SQL語句中添加建立臨時函數的語句,並使用該函數。上傳自訂函數JAR包到附加依賴檔案中後,只能在本作業中使用該自訂函數,其他作業中不可使用。如果建立的臨時函數名稱為mysum,則在作業中使用該函數的程式碼範例如下。
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; --建立臨時函數mysum。 INSERT INTO ASI_UDAF_Sink SELECT `mysum`(a) FROM ASI_UDAF_Source;
SQL作業開發完成後,需要在
頁面,單擊目標作業名稱操作列的啟動。啟動成功後,ASI_UDAF_Sink表中就會插入ASI_UDAF_Source表中a欄位資料的累加和。