Realtime ComputeFlink版支援在Flink SQL作業中使用Java自訂函數,本文介紹Flink Java自訂函數的分類、參數傳遞及調用注意事項。
注意事項
為了避免JAR包依賴衝突,在開發自訂函數時您需要注意以下幾點:
SQL開發頁面選擇的Flink版本,請和Pom依賴中的Flink版本保持一致。
Flink相關依賴,scope請使用provided,即在依賴中添加
<scope>provided</scope>
。其他第三方依賴請採用Shade方式打包,Shade打包詳情請參見Apache Maven Shade Plugin。
Flink依賴衝突問題,詳情請參見空間管理與操作
為避免UDF在SQL作業文本裡被頻繁調用導致逾時的情況,推薦您將UDF的JAR包作為依賴檔案上傳,並且通過
CRETATE TEMPORARY FUNCTION
文法在作業中聲明函數,例如CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';
自訂函數分類
Flink支援以下3類自訂函數。
分類 | 描述 |
UDSF(User Defined Scalar Function) | 使用者自訂純量值函式,將0個、1個或多個標量值對應到一個新的標量值。其輸入與輸出是一對一的關係,即讀入一行資料,寫出一條輸出值。詳情請參見自訂純量涵式(UDSF)。 |
UDAF(User Defined Aggregation Function) | 自訂彙總函式,將多條記錄彙總成1條記錄。其輸入與輸出是多對一的關係,即將多條輸入記錄彙總成一條輸出值。詳情請參見自訂彙總函式(UDAF)。 |
UDTF(User Defined Table-valued Function) | 自訂表格值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自訂的純量涵式類似,但與純量涵式不同。資料表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列資料。詳情請參見自訂表格值函數(UDTF)。 |
自訂函數註冊
自訂函數參數傳遞
您可以在Flink開發控制台配置自訂函數中的參數並在UDF代碼中使用。這樣,後續可以直接在控制台上修改參數值,實現快速修改UDF參數值的目的。
自訂函數中提供了可選的open(FunctionContext context)方法,FunctionContext具備參數傳遞功能,自訂配置項通過此對象來傳遞。具體步驟如下:
在Flink開發控制台pipeline.global-job-parameters配置項,程式碼範例如下。
頁面部署詳情頁簽運行參數配置的其他配置中,添加pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
FunctionContext#getJobParameter只能擷取pipeline.global-job-parameters這一配置項的值。因此需要將UDF用到的所有配置項全部寫入到pipeline.global-job-parameters中。pipeline.global-job-parameters配置項填寫的具體操作步驟如下。
步驟
動作
具體操作
樣本
步驟1
定義key-value。
將key和value之間通過冒號(:)分隔,並將每一對key-value用單引號(')包圍起來。
說明如果key或value中含有半形冒號(:),則需要用雙引號(")將key或value包圍起來。
如果key或value中含有半形冒號(:)和雙引號("),則需要通過連寫兩個雙引號("")進行轉義。
當
key = k1,value = {hi,hello}
,則定義為'k1:{hi,hello}'
。當
key = k2,value = str:ing,str:ing
,則定義為'k2:"str:ing,str:ing"'
當
key = k3,value = str"ing,str:ing
,則定義為'k3:"str""ing,str:ing"'
步驟2
按照YAML檔案的格式,形成最終的pipeline.global-job-parameters。
將不同的key-value放在不同的行裡,並將所有key-value用逗號(,)串連。
說明YAML檔案的多行字串以豎線(| )開始。
YAML檔案的多行字串,每一行需要有相同的縮排。
pipeline.global-job-parameters: | 'k1:{hi,hello}', 'k2:"str:ing,str:ing"', 'k3:"str""ing,str:ing"'
在自訂函數代碼中,通過FunctionContext#getJobParameter擷取map的各項內容。
程式碼範例如下。
context.getJobParameter("k1", null); // 獲得字串 {hi,hello}。 context.getJobParameter("k2", null); // 獲得字串 str:ing,str:ing。 context.getJobParameter("k3", null); // 獲得字串 str"ing,str:ing。 context.getJobParameter("pipeline.global-job-parameters", null); // null,只能獲得pipeline.global-job-parameters裡定義的內容,而不能獲得任意的作業配置項。
具名引數
僅Realtime Compute引擎VVR 8.0.7及以上版本支援使用具名引數來實現自訂函數。
在SQL中調用函數時必須按順序指定所有參數欄位。當參數較多時,容易出現傳參個數、順序錯誤,而且不能省略非必填參數。通過使用具名引數,可以按需指定所需的參數,減少出錯機率,使用起來也更加方便。我們通過一個自訂純量涵式(ScalarFunction)的例子來介紹下具名引數的使用。
// 實現一個自訂純量涵式,後兩個入參為選擇性參數(isOptional = true)
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}
在SQL中使用該自訂函數時,您可以只指定第一個必選參數,或選擇性指定選擇性參數,程式碼範例如下。
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
CREATE temporary TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR,
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
CREATE temporary TABLE sink (
a INT,
b VARCHAR,
c VARCHAR,
d VARCHAR
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT a,
-- 僅指定第一個必選參數
MyNamedUdf(f1 => c) arg1_res,
-- 指定第一個必選參數及第二個選擇性參數
MyNamedUdf(f1 => c, f2 => a) arg2_res,
-- 指定第一個必選參數及第三個選擇性參數
MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;
相關文檔
Java自訂函數的開發和使用demo,請參見自訂彙總函式(UDAF)、自訂純量涵式(UDSF)和自訂表格值函數(UDTF)。
Python自訂函數的調試和調優方法,請參見概述。
Python自訂函數的開發和使用demo,請參見自訂彙總函式(UDAF)、自訂純量涵式(UDSF)和自訂表格值函數(UDTF)。