本文為您介紹如何為Flink自訂表格值函數(UDTF)開發、註冊和使用流程。
定義
自訂表格值函數(UDTF),自訂表格值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自訂的純量涵式類似,但與純量涵式不同。資料表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列資料。詳情參見User-defined Functions。
UDTF開發
Flink為您提供了UDF樣本,便於您快速開發業務。Flink UDF樣本中包含UDSF、UDAF和UDTF的實現,樣本中已為您配置對應版本的開發環境,您無需進行環境搭建。
下載並解壓ASI_UDX_Demo樣本到本地。
說明ASI_UDX_Demo屬於第三方搭建的網站,訪問時可能會存在無法開啟或訪問延遲的問題。
解壓完成後,會產生ASI_UDX-main檔案夾。其中:
pom.xml:專案層級的設定檔,主要描述了專案的Maven座標、依賴關係,開發人員需要遵循的規則、缺陷管理系統,組織和Licenses,以及其他所有的專案相關因素。
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java:自訂表格值函數(UDTF)樣本的Java代碼。
在IntelliJ IDEA中,單擊ASI_UDX-main。
,開啟剛才解壓縮完成的雙擊開啟\ASI_UDX-main\src\main\java\ASI_UDTF後,根據您的業務,修改ASI_UDTF.java檔案內容。
該樣本中,ASI_UDTF.java已配置了將一行字串按照豎線(|)分割成多列字串的代碼。
package ASI_UDTF; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class ASI_UDTF extends TableFunction<Tuple2<String,String>> { public void eval(String str){ String[] split = str.split("\\|"); String name = split[0]; String place = split[1]; Tuple2<String,String> tuple2 = Tuple2.of(name,place); collect(tuple2); } }
TableFunction支援的資料類型及類型推導機制請參見Flink支援的資料類型和類型推導。
說明以上兩個文檔連結為Flink 1.15版本對應的文檔,不同Flink大版本中TableFunction支援的資料類型及推導機制可能會存在差異,請您通過VVR和Flink版本的映射關係去參考對應版本的Flink文檔。Flink版本查看方法請參見如何查看當前作業的Flink版本?。
常用的複合類型Tuple和Row樣本如下:
Tuple類型
TableFunction<Tuple2<String,Integer>
Row類型
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // use collect(...) to emit a row collect(Row.of(s, s.length())); } } }
雙擊開啟\ASI_UDX-main\後,配置pom.xml。
該樣本中,pom.xml檔案已配置了Flink 1.11版依賴的主要JAR包資訊。如果您的業務:
不依賴其他JAR包:不用配置pom.xml檔案,繼續下一步。
依賴其他JAR包:在pom.xml檔案中添加您所需依賴的JAR包資訊。
Flink 1.11版依賴的主要JAR包如下。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>
在下載檔案中pom.xml所在的目錄執行如下命令打包檔案。
mvn package -Dcheckstyle.skip
\ASI_UDX-main\target\目錄下會出現ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDTF開發工作。
UDTF註冊
UDTF註冊過程,請參見管理自訂函數(UDF)。
UDTF使用
在註冊UDTF完成後,您就可以使用UDTF,詳細的操作步驟如下。
Flink SQL作業開發。詳情請參見SQL作業開發。
ASI_UDTF_Source表中message欄位每行字串按照豎線(|)分割成多列,程式碼範例如下。
CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDTF_Sink SELECT name,place FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
在
頁面,單擊目標作業名稱操作列的啟動。啟動成功後,ASI_UDTF_Sink表會被插入ASI_UDTF_Source表中message欄位按照豎線(|)分割成多列的字元。