全部產品
Search
文件中心

Realtime Compute for Apache Flink:自訂表格值函數(UDTF)

更新時間:Sep 13, 2024

本文為您介紹如何為Flink自訂表格值函數(UDTF)開發、註冊和使用流程。

定義

自訂表格值函數(UDTF),自訂表格值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自訂的純量涵式類似,但與純量涵式不同。資料表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列資料。詳情參見User-defined Functions

UDTF開發

說明

Flink為您提供了UDF樣本,便於您快速開發業務。Flink UDF樣本中包含UDSF、UDAF和UDTF的實現,樣本中已為您配置對應版本的開發環境,您無需進行環境搭建。

  1. 下載並解壓ASI_UDX_Demo樣本到本地。

    說明

    ASI_UDX_Demo屬於第三方搭建的網站,訪問時可能會存在無法開啟或訪問延遲的問題。

    解壓完成後,會產生ASI_UDX-main檔案夾。其中:

    • pom.xml:專案層級的設定檔,主要描述了專案的Maven座標、依賴關係,開發人員需要遵循的規則、缺陷管理系統,組織和Licenses,以及其他所有的專案相關因素。

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java:自訂表格值函數(UDTF)樣本的Java代碼。

  2. 在IntelliJ IDEA中,單擊file > open,開啟剛才解壓縮完成的ASI_UDX-main

  3. 雙擊開啟\ASI_UDX-main\src\main\java\ASI_UDTF後,根據您的業務,修改ASI_UDTF.java檔案內容。

    該樣本中,ASI_UDTF.java已配置了將一行字串按照豎線(|)分割成多列字串的代碼。

    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String,String>> {
        public void eval(String str){
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            Tuple2<String,String> tuple2 = Tuple2.of(name,place);
            collect(tuple2);
        }
    }

    TableFunction支援的資料類型及類型推導機制請參見Flink支援的資料類型類型推導

    說明

    以上兩個文檔連結為Flink 1.15版本對應的文檔,不同Flink大版本中TableFunction支援的資料類型及推導機制可能會存在差異,請您通過VVR和Flink版本的映射關係去參考對應版本的Flink文檔。Flink版本查看方法請參見如何查看當前作業的Flink版本?

    常用的複合類型Tuple和Row樣本如下:

    • Tuple類型

      TableFunction<Tuple2<String,Integer>
    • Row類型

      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      public static class SplitFunction extends TableFunction<Row> {
      
        public void eval(String str) {
          for (String s : str.split(" ")) {
            // use collect(...) to emit a row
            collect(Row.of(s, s.length()));
          }
        }
      }
  4. 雙擊開啟\ASI_UDX-main\後,配置pom.xml

    該樣本中,pom.xml檔案已配置了Flink 1.11版依賴的主要JAR包資訊。如果您的業務:

    • 不依賴其他JAR包:不用配置pom.xml檔案,繼續下一步。

    • 依賴其他JAR包:在pom.xml檔案中添加您所需依賴的JAR包資訊。

    Flink 1.11版依賴的主要JAR包如下。

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. 在下載檔案中pom.xml所在的目錄執行如下命令打包檔案。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\目錄下會出現ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDTF開發工作。

UDTF註冊

UDTF註冊過程,請參見管理自訂函數(UDF)

UDTF使用

在註冊UDTF完成後,您就可以使用UDTF,詳細的操作步驟如下。

  1. Flink SQL作業開發。詳情請參見SQL作業開發

    ASI_UDTF_Source表中message欄位每行字串按照豎線(|)分割成多列,程式碼範例如下。

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
  2. 營運中心 > 作業營運頁面,單擊目標作業名稱操作列的啟動

    啟動成功後,ASI_UDTF_Sink表會被插入ASI_UDTF_Source表中message欄位按照豎線(|)分割成多列的字元。