本文為您介紹Python自訂表格值函數(UDTF)的開發、註冊和使用流程。
定義
自訂表格值函數(UDTF),將0個、1個或多個標量值作為輸入參數(可以是變長參數)。資料表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列資料。與自訂的純量涵式類似,但與純量涵式不同。
使用限制
由於Realtime ComputeFlink版受部署環境和網路環境等因素的影響,開發Python自訂函數時,需要注意以下限制:
僅支援開源Flink V1.12及以上版本。
Flink工作空間已預裝了Python 3.7.9,因此需要您在Python 3.7.9版本開發代碼。
Flink運行環境僅支援JDK 8和JDK 11,如果Python作業中依賴第三方JAR包,請確保JAR包相容。
僅支援開源Scala V2.11版本,如果Python作業中依賴第三方JAR包,請確保使用Scala V2.11對應的JAR包依賴。
UDTF開發
Flink為您提供了Python UDX樣本,便於您快速開發UDX。Flink Python UDX樣本中包含了Python UDF、Python UDAF和Python UDTF的實現。本文以Windows作業系統為例,為您介紹如何進行UDTF開發。
- 下載並解壓python_demo-master樣本到本地。
- 在PyCharm中,單擊python_demo-master。 ,開啟剛才解壓縮完成的
雙擊開啟\python_demo-master\udx\udtfs.py後,根據您的業務,修改udtfs.py檔案內容。
該樣本中,split定義了將一行字串按照豎線(|)分割成多列字串的代碼。
from pyflink.table import DataTypes from pyflink.table.udf import udtf @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()]) def split(s: str): splits = s.split("|") yield splits[0], splits[1]
在下載檔案中udx所在的目錄(即\python_demo目錄)下執行如下命令打包檔案。
zip -r python_demo.zip udx
\python_demo\目錄下會出現python_demo.zip的ZIP包,即代表完成了UDTF開發工作。
UDTF註冊
UDTF註冊過程,請參見管理自訂函數(UDF)。
UDTF使用
在完成UDTF註冊後,您就可以使用UDTF,詳細的操作步驟如下。
Flink SQL作業開發。詳情請參見SQL作業開發。
ASI_UDTF_Source表中每行字串的message欄位與字串aa按照豎線(|)串連之後,按照豎線(|)分割成多列,程式碼範例如下。
CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDTF_Sink SELECT name,place FROM ASI_UDTF_Source,lateral table(split(concat_ws('|', `message`, 'aa'))) as T(name,place);
在
頁面,單擊目標作業名稱操作列的啟動。啟動成功後,ASI_UDTF_Sink表會被插入兩列資料,這兩列資料是ASI_UDTF_Source表中message和aa欄位按照豎線(|)串連後,再按照豎線(|)分割成多列的字元。