全部產品
Search
文件中心

E-MapReduce:Java UDF

更新時間:Mar 25, 2025

本文為您介紹如何編寫和使用UDF。

背景資訊

自2.2.0版本起,StarRocks支援使用Java語言編寫使用者定義函數(User Defined Function,簡稱UDF)。

自3.0版本起,StarRocks支援Global UDF,您只需要在相關的SQL語句(CREATE/SHOW/DROP)中加上GLOBAL關鍵字,該語句即可全域生效,無需逐個為每個資料庫執行此語句。您可以根據業務情境開發自訂函數,擴充StarRocks的函數能力。

目前StarRocks支援的UDF包括:

  • 使用者自訂純量涵式(Scalar UDF)

  • 使用者自訂彙總函式(User Defined Aggregation Function,UDAF)

  • 使用者自訂視窗函數(User Defined Window Function,UDWF)

  • 使用者自訂表格格函數(User Defined Table Function,UDTF)

前提條件

使用StarRocks的Java UDF功能前,您需要:

  • 安裝Apache Maven以建立並編寫相關Java專案。

  • 在伺服器上安裝JDK 1.8。

  • 開啟UDF功能。在執行個體配置頁面,設定FE配置項enable_udfTRUE,並重啟執行個體使配置項生效。

類型映射關係

SQL TYPE

Java TYPE

BOOLEAN

java.lang.Boolean

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INT

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

STRING/VARCHAR

java.lang.String

開發並使用UDF

您需要建立Maven專案並使用Java語言編寫相應功能。

步驟一:建立Maven專案

建立Maven專案,專案的基本目錄結構如下。

project
|--pom.xml
|--src
|  |--main
|  |  |--java
|  |  |--resources
|  |--test
|--target

步驟二:添加依賴

在pom.xml中添加如下依賴。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>udf</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

步驟三:開發UDF

您需要使用Java語言開發相應UDF。

開發Scalar UDF

Scalar UDF,即使用者自訂純量涵式,可以對單行資料進行操作,輸出單行結果。當您在查詢時使用Scalar UDF,每行資料最終都會按行出現在結果集中。典型的純量涵式包括UPPERLOWERROUNDABS

以下樣本以提取JSON資料功能為例進行說明。例如,業務情境中,JSON資料中某個欄位的值可能是JSON字串而不是JSON對象,因此在提取JSON字串時,SQL語句需要嵌套調用GET_JSON_STRING,即GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")

為簡化SQL語句,您可以開發一個UDF,直接提取JSON字串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")

package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
    public final String evaluate(String jsonObj, String key) {
        if (obj == null || key == null) return null;
        try {
            // JSONPath庫可以全部展開,即使某個欄位的值是JSON格式的字串
            return JSONPath.read(jsonObj, key).toString();
        } catch (Exception e) {
            return null;
        }
    }
}

使用者自訂類必須實現如下方法。

說明

方法中請求參數和返回參數的資料類型,需要和步驟六中的CREATE FUNCTION語句中聲明的相同,且兩者的類型映射關係需要符合類型映射關係

方法

含義

TYPE1 evaluate(TYPE2, ...)

evaluate方法為UDF調用入口,必須是public成員方法。

開發UDAF

UDAF,即使用者自訂的彙總函式,對多行資料進行操作,輸出單行結果。典型的彙總函式包括SUMCOUNTMAXMIN,這些函數對於每個GROUP BY分組中多行資料進行彙總後,只輸出一行結果。

以下樣本以MY_SUM_INT函數為例進行說明。與內建函數SUM(傳回值為BIGINT類型)區別在於,MY_SUM_INT函數支援傳入參數和返回參數的類型為INT。

package com.starrocks.udf.sample;

public class SumInt {
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; }
    }

    public State create() {
        return new State();
    }

    public void destroy(State state) {
    }

    public final void update(State state, Integer val) {
        if (val != null) {
            state.counter+= val;
        }
    }

    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }

    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }

    public Integer finalize(State state) {
        return state.counter;
    }
}

使用者自訂類必須實現如下方法。

說明

方法中傳入參數和返回參數的資料類型,需要和步驟六中的CREATE FUNCTION語句中聲明的相同,且兩者的類型映射關係需要符合類型映射關係

方法

含義

State create()

建立State。

void destroy(State)

銷毀State。

void update(State, ...)

更新State。其中第一個參數是State,其餘的參數是函式宣告的輸入參數,可以為1個或多個。

void serialize(State, ByteBuffer)

序列化State。

void merge(State, ByteBuffer)

合并State和還原序列化State。

TYPE finalize(State)

通過State擷取函數的最終結果。

並且,開發UDAF函數時,您需要使用緩衝區類java.nio.ByteBuffer和局部變數serializeLength,用於儲存和表示中間結果,指定中間結果的序列化長度。

類和局部變數

說明

java.nio.ByteBuffer()

緩衝區類,用於儲存中間結果。由於中間結果在不同執行節點間傳輸時,會進行序列化和還原序列化,因此還需要使用serializeLength指定中間結果序列化後的長度。

serializeLength()

中間結果序列化後的長度,單位為Byte。serializeLength的資料類型固定為INT。例如,樣本中State { int counter = 0; public int serializeLength() { return 4; }}包含對中間結果序列化後的說明,即,中間結果的資料類型為INT,序列化長度為4 Byte。您也可以按照業務需求進行調整,例如中間結果序列化後的資料類型LONG,序列化長度為8 Byte,則需要傳入State { long counter = 0; public int serializeLength() { return 8; }}

說明

java.nio.ByteBuffer序列化相關事項:

  • 不支援依賴ByteBuffer的remaining()方法來還原序列化State。

  • 不支援對ByteBuffer調用clear()方法。

  • serializeLength需要與實際寫入資料的長度保持一致,否則序列化和還原序列化過程中會造成結果錯誤。

開發UDWF

UDWF,即使用者自訂視窗函數。跟普通彙總函式不同的是,視窗函數針對一組行(一個視窗)計算值,並為每行返回一個結果。一般情況下,視窗函數包含OVER子句,將資料行拆分成多個分組,視窗函數基於每一行資料所在的組(一個視窗)進行計算,並為每行返回一個結果。

以下樣本以MY_WINDOW_SUM_INT函數為例進行說明。與內建函數SUM(傳回型別為BIGINT)區別在於,MY_WINDOW_SUM_INT函數支援傳入參數和返回參數的類型為INT。

package com.starrocks.udf.sample;

public class WindowSumInt {    
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; }
        @Override
        public String toString() {
            return "State{" +
                    "counter=" + counter +
                    '}';
        }
    }

    public State create() {
        return new State();
    }

    public void destroy(State state) {

    }

    public void update(State state, Integer val) {
        if (val != null) {
            state.counter+=val;
        }
    }

    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }

    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }

    public Integer finalize(State state) {
        return state.counter;
    }

    public void reset(State state) {
        state.counter = 0;
    }

    public void windowUpdate(State state,
                            int peer_group_start, int peer_group_end,
                            int frame_start, int frame_end,
                            Integer[] inputs) {
        for (int i = (int)frame_start; i < (int)frame_end; ++i) {
            state.counter += inputs[i];
        }
    }
}

使用者自訂類必須實現UDAF所需要的方法(視窗函數是特殊彙總函式)以及windowUpdate()方法。

說明

方法中請求參數和返回參數的資料類型,需要和步驟六中的CREATE FUNCTION語句中聲明的相同,且兩者的類型映射關係需要符合類型映射關係

需要額外實現的方法

方法

含義

void windowUpdate(State state, int, int, int , int, ...)

更新視窗資料。視窗函數的詳細說明,請參見Window_function。輸入每一行資料,都會擷取到對應視窗資訊來更新中間結果。

  • peer_group_start:是當前分區開始的位置。

    分區:OVER子句中PARTITION BY指定分區列,分區列的值相同的行被視為在同一個分區內。

  • peer_group_end:當前分區結束的位置。

  • frame_start:當前視窗架構(window frame)起始位置。

    視窗架構:window frame子句指定了運算範圍,以當前行為準,前後若干行作為視窗函數運算的對象。例如ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING,表示運算範圍為當前行和它前後各一行資料。

  • frame_end:當前視窗架構(window frame)結束位置。

  • inputs:表示一個視窗中輸入的資料,為封裝類數組。封裝類需要對應輸入資料的類型,本樣本中輸入資料類型為INT,因此封裝類數組為Integer[]。

開發UDTF

UDTF,即使用者自訂表格值函數,讀入一行資料,輸出多個值可視為一張表。資料表值函式常用於實現行轉列。

說明

目前UDTF只支援返回多行單列。

以下樣本以MY_UDF_SPLIT函數為例進行說明。MY_UDF_SPLIT函數支援分隔字元為空白格,傳入參數和返回參數的類型為STRING。

package com.starrocks.udf.sample;

public class UDFSplit{
    public String[] process(String in) {
        if (in == null) return null;
        return in.split(" ");
    }
}

使用者自訂類必須實現如下方法。

說明

方法中請求參數和返回參數的資料類型,需要和步驟六中的CREATE FUNCTION語句中聲明的相同,且兩者的類型映射關係需要符合類型映射關係

方法

含義

TYPE[] process()

process()方法為UDTF調用入口,需要返回數組。

步驟四:打包Java專案

通過以下命令打包Java專案。

mvn package

target目錄下會產生兩個檔案:udf-1.0-SNAPSHOT.jarudf-1.0-SNAPSHOT-jar-with-dependencies.jar

步驟五:上傳專案

將檔案udf-1.0-SNAPSHOT-jar-with-dependencies.jar上傳到OSS上,並開放JAR包的公用讀取許可權。詳情請參見簡單上傳設定Bucket ACL

說明

步驟六中,FE會對UDF所在JAR包進行校正並計算校正值,BE會下載UDF所在JAR包並執行。

步驟六:在StarRocks中建立UDF

StarRocks內提供了兩種Namespace的UDF:一種是Database級Namespace,一種是Global級Namespace。

  • 如果您沒有特殊的UDF可見度隔離需求,您可以直接選擇建立Global UDF。在引用Global UDF時,直接調用Function Name即可,無需任何Catalog和Database作為首碼,訪問更加便捷。

  • 如果您有特殊的UDF可見度隔離需求,或者需要在不同Database下建立同名UDF,那麼你可以選擇在Database內建立UDF。此時,如果您的會話在某個Database內,您可以直接調用Function Name即可;如果您的會話在其他Catalog和Database下,那麼您需要帶上Catalog和Database首碼,例如:catalog.database.function

說明

建立Global UDF需要有System級的CREATE GLOBAL FUNCTION許可權;建立資料庫層級的UDF需要有資料庫級的CREATE FUNCTION許可權;使用UDF需要有對應UDF的USAGE許可權。關於如何賦權,參見GRANT

JAR包上傳完成後,您需要在StarRocks中,按需建立相應的UDF。如果建立Global UDF,只需要在SQL語句中帶上GLOBAL關鍵字即可。

文法

CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]

參數說明

參數

必選

說明

GLOBAL

如需建立全域UDF,需指定該關鍵字。從3.0版本開始支援。

AGGREGATE

如要建立UDAF和UDWF,需指定該關鍵字。

TABLE

如要建立UDTF,需指定該關鍵字。

function_name

函數名,可以包含資料庫名稱,比如,db1.my_func。如果function_name中包含了資料庫名稱,那麼該UDF會建立在對應的資料庫中,否則該UDF會建立在當前資料庫。新函數名和參數不能與目標資料庫中已有的函數相同,否則會建立失敗;如只有函數名相同,參數不同,則可以建立成功。

arg_type

函數的參數類型。具體支援的資料類型,請參見類型映射關係

return_type

函數的傳回值類型。具體支援的資料類型,請參見類型映射關係

properties

函數相關屬性。建立不同類型的UDF需配置不同的屬性,詳情和樣本請參考以下樣本。

建立Scalar UDF

執行如下命令,在StarRocks中建立之前樣本中的Scalar UDF。

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string) 
RETURNS string
PROPERTIES (
    "symbol" = "com.starrocks.udf.sample.UDFJsonGet", 
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

參數

描述

symbol

UDF所在專案的類名。格式為<package_name>.<class_name>

type

用於標記所建立的UDF類型。取值為StarrocksJar,表示基於Java的UDF。

file

UDF所在JAR包的HTTP路徑,配置成OSS包含對應內網Endpoint的HTTP URL。格式為http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/<jar_package_name>

建立UDAF

執行如下命令,在StarRocks中建立之前樣本中的UDAF。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT) 
RETURNS INT
PROPERTIES 
( 
    "symbol" = "com.starrocks.udf.sample.SumInt", 
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES裡的參數說明與建立Scalar UDF相同。

建立UDWF

執行如下命令,在StarRocks中建立先前樣本中的UDWF。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES 
(
    "analytic" = "true",
    "symbol" = "com.starrocks.udf.sample.WindowSumInt", 
    "type" = "StarrocksJar", 
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

analytic:所建立的函數是否為視窗函數,固定取值為true。其他參數說明與建立Scalar UDF相同。

建立UDTF

執行如下命令,在StarRocks中建立先前樣本中的UDTF。

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES 
(
    "symbol" = "com.starrocks.udf.sample.UDFSplit", 
    "type" = "StarrocksJar", 
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES裡的參數說明與建立Scalar UDF相同。

步驟七:使用UDF

建立完成後,您可以測試使用您開發的UDF。

使用Scalar UDF

執行如下命令,使用步驟六建立的Scalar UDF函數。

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

使用UDAF

執行如下命令,使用步驟六建立的UDAF函數。

SELECT MY_SUM_INT(col1);

使用UDWF

執行如下命令,使用步驟六建立的UDWF函數。

SELECT MY_WINDOW_SUM_INT(intcol) 
            OVER (PARTITION BY intcol2
                  ORDER BY intcol3
                  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

使用UDTF

執行如下命令,使用先前樣本中的UDTF。

-- 假設存在表 t1,其列 a、b、c1 資訊如下。
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."

-- 使用 MY_UDF_SPLIT() 函數。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1); 
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."
說明
  • 第一個MY_UDF_SPLIT為調用MY_UDF_SPLIT後產生的列別名。

  • 暫不支援使用AS t2(f1)的方式指定表格函數返回表的表別名和列別名。

查看UDF資訊

運行以下命令查看UDF資訊。

SHOW [GLOBAL] FUNCTIONS;

刪除UDF

運行以下命令刪除指定的UDF。

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

FAQ

Q:開發UDF時是否可以使用靜態變數?不同UDF間的靜態變數間否會互相影響?

A:支援在開發UDF時使用靜態變數,且不同UDF間(即使類同名),靜態變數是互相隔離的,不會互相影響。