全部產品
Search
文件中心

Realtime Compute for Apache Flink:自訂純量涵式(UDSF)

更新時間:Sep 13, 2024

本文為您介紹如何開發、註冊和使用Flink自訂純量涵式(UDSF)。

定義

自訂純量涵式(UDSF)將0個、1個或多個標量值對應到一個新的標量值。輸入與輸出是一對一的關係,即讀入一行資料,寫出一條輸出值。詳情參見User-defined Functions

UDSF開發

說明

Flink為您提供了UDF樣本,便於您快速開發業務。Flink UDF樣本中包含UDSF、UDAF和UDTF的實現,樣本中已為您配置對應版本的開發環境,您無需進行環境搭建。

  1. 下載並解壓ASI_UDX_Demo樣本到本地。

    說明

    ASI_UDX_Demo屬於第三方搭建的網站,訪問時可能會存在無法開啟或訪問延遲的問題。

    解壓完成後,會產生ASI_UDX-main檔案夾。其中:

    • pom.xml:專案層級的設定檔,主要描述了專案的Maven座標、依賴關係,開發人員需要遵循的規則、缺陷管理系統,組織和Licenses,以及其他所有的專案相關因素。

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java:自訂純量涵式(UDSF)樣本的Java代碼。

  2. 在IntelliJ IDEA中,單擊file > open,開啟剛才解壓縮完成的ASI_UDX-main

  3. 雙擊開啟\ASI_UDX-main\src\main\java\ASI_UDF後,根據您的業務,配置ASI_UDF.java

    該樣本中,ASI_UDF.java已配置了擷取每條資料中從begin~end位的字元的代碼。

    package ASI_UDF;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    public class ASI_UDF extends ScalarFunction {
        public String eval(String s, Integer begin, Integer end) {
            return s.substring(begin, end);
        }
    }
  4. 雙擊開啟\ASI_UDX-main\後,配置pom.xml

    該樣本中,pom.xml檔案已配置了Flink 1.11版依賴的主要JAR包資訊。如果您的業務:

    • 不依賴其他JAR包:不用配置pom.xml檔案,繼續下一步。

    • 依賴其他JAR包:在pom.xml檔案中添加您所需依賴的JAR包資訊。

    Flink 1.11版依賴的主要JAR包如下。

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. 在下載檔案中pom.xml所在的目錄執行如下命令打包檔案。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\目錄下會出現ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDSF開發工作。

UDSF註冊

UDSF註冊過程,請參見管理自訂函數(UDF)

UDSF使用

在註冊UDSF完成後,您就可以使用UDSF,詳細的操作步驟如下。

  1. Flink SQL作業開發。詳情請參見SQL作業開發

    擷取ASI_UDSF_Source表中a欄位中每行字串中第2~4位的字元,程式碼範例如下。

    CREATE TEMPORARY TABLE ASI_UDSF_Source (
      a VARCHAR,
      b INT,
      c INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDSF_Sink (
      a VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDSF_Sink
    SELECT ASI_UDSF(a,2,4)
    FROM ASI_UDSF_Source;
  2. 營運中心 > 作業營運頁面,單擊目標作業名稱操作列的啟動

    啟動成功後,ASI_UDSF_Sink表每行會被插入ASI_UDSF_Source表中a欄位每行字串的第2~4位字元。