全部產品
Search
文件中心

Realtime Compute for Apache Flink:自訂彙總函式(UDAF)

更新時間:Sep 13, 2024

本文為您介紹如何為Flink自訂彙總函式(UDAF)開發、註冊和使用流程。

定義

自訂彙總函式(UDAF),將多條記錄彙總成1條記錄。其輸入與輸出是多對一的關係,即將多條輸入記錄彙總成一條輸出值。詳情參見User-defined Functions

說明

User-defined Functions和下文中的ASI_UDX_Demo屬於第三方搭建的網站,訪問時可能會存在無法開啟或訪問延遲的問題。

UDAF開發

說明

Flink為您提供了UDF樣本,便於您快速開發業務。Flink UDF樣本中包含UDSF、UDAF和UDTF的實現,樣本中已為您配置對應版本的開發環境,您無需進行環境搭建。

  1. 下載並解壓ASI_UDX_Demo樣本到本地。

    解壓完成後,會產生ASI_UDX-main檔案夾。其中:

    • pom.xml:專案層級的設定檔,主要描述了專案的Maven座標、依賴關係、開發人員需要遵循的規則、缺陷管理系統,組織和Licenses,以及其他所有的專案相關因素。

    • \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java:自訂彙總函式(UDAF)樣本的Java代碼。

  2. 在IntelliJ IDEA中,選擇file > open,開啟剛才解壓縮完成的ASI_UDX-main

  3. 雙擊開啟\ASI_UDX-main\後,配置pom.xml

    該樣本中,pom.xml檔案已配置了Flink 1.12版開發自訂函數需要的最小化依賴資訊。如果您的業務:

    • 沒有其他依賴:不用配置pom.xml檔案,繼續下一步。

    • 有其他依賴:在pom.xml檔案中添加您所需的依賴資訊。

    Flink 1.12版最小化依賴如下。

     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
             <version>1.12.7</version>
             <scope>provided</scope>
         </dependency>
    </dependencies>
    說明

    在填寫version時,建議根據目標作業的VVR版本,填寫為對應Flink大版本的最新小版本。關於VVR和Flink的對應關係,詳情請參見概述

  4. 雙擊開啟\ASI_UDX-main\src\main\java\ASI_UDAF後,根據您的業務,配置ASI_UDAF.java

    該樣本中,ASI_UDAF.java實現了累積求和的代碼,詳情如下。

    package ASI_UDAF;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class ASI_UDAF{
        public static class AccSum{
            public long sum;
        }
    
        public static class MySum extends AggregateFunction<Long, AccSum>{
    
            @Override
            public Long getValue(AccSum acSum){
                return acSum.sum;
            }
    
            @Override
            public AccSum createAccumulator(){
                AccSum acCount= new AccSum();
                acCount.sum=0;
                return acCount;
            }
    
            public void accumulate(AccSum acc,long num){
                acc.sum += num;
            }
    
            /**
            *Support retract a msg generated by upstream operator.
            */
            public void retract(AccSum acc,long num){
                acc.sum -= num;
            }
    
            /**
            *Support local-global two stage aggregate optimization.
            */
            public void merge(AccSum acc,Iterable<AccSum> it){
                Iterator<AccSum> iter=it.iterator();
                while(iter.hasNext()){
                    AccSum accSum=iter.next();
                    if(null!=accSum){
                        acc.sum+=accSum.sum;
                    }
                }
            }
        }
    }

    該UDAF的是一個簡單累加和的操作。例如,同一個分組鍵(GROUP BY欄位)的3條輸入資料分別為1、2、3,輸出結果有以下兩種情況:

    • 在未啟用MiniBatch最佳化時,即預設配置,則輸出的結果為1、3、6。

    • 在開啟了MiniBatch最佳化時,因為輸出的資料條數取決於設定的MiniBatch參數和輸入資料的分布情況,所以能確定的是最後輸出一條結果為6, 但輸出的中間結果條數不確定。

    說明

    MiniBatch最佳化詳情,請參見高效能Flink SQL最佳化技巧

  5. 在下載檔案中pom.xml所在目錄執行如下打包命令。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\目錄下會出現ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDAF開發工作。

UDAF使用

您可以通過以下兩種方式在SQL作業中使用自訂UDAF:

  • 方式一:先註冊UDAF,再在作業中直接使用登入的UDAF。

    通過該方式進行函數註冊的優點為便於後續開發進行代碼複用。UDAF註冊過程,請參見管理自訂函數(UDF)。如果註冊完的函數名稱為ASI_UDAF$MySum,則在作業中直接使用的程式碼範例如下。

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a BIGINT NOT NULL
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `ASI_UDAF$MySum`(a)
    FROM ASI_UDAF_Source;
  • 方式二:先將自訂函數JAR包上傳至Flink資料開發 > ETL右側的更多配置附加依賴檔案選項,再在作業的SQL語句中添加建立臨時函數的語句,並使用該函數。

    上傳自訂函數JAR包到附加依賴檔案中後,只能在本作業中使用該自訂函數,其他作業中不可使用。如果建立的臨時函數名稱為mysum,則在作業中使用該函數的程式碼範例如下。

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; --建立臨時函數mysum。
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `mysum`(a)
    FROM ASI_UDAF_Source;
說明

SQL作業開發完成後,需要在營運中心 > 作業營運頁面,單擊目標作業名稱操作列的啟動。啟動成功後,ASI_UDAF_Sink表中就會插入ASI_UDAF_Source表中a欄位資料的累加和。