本文為您介紹Python自訂純量涵式(UDSF)的開發、註冊和使用流程。
定義
自訂純量涵式(UDSF)將0個、1個或多個標量值對應到一個新的標量值。輸入與輸出是一對一的關係,即讀入一行資料,寫出一條輸出值。
使用限制
由於Realtime ComputeFlink版受部署環境和網路環境等因素的影響,開發Python自訂函數時,需要注意以下限制:
僅支援開源Flink V1.12及以上版本。
Flink工作空間已預裝了Python 3.7.9,因此需要您在Python 3.7.9版本開發代碼。
Flink運行環境僅支援JDK 8和JDK 11,如果Python作業中依賴第三方JAR包,請確保JAR包相容。
僅支援開源Scala V2.11版本,如果Python作業中依賴第三方JAR包,請確保使用Scala V2.11對應的JAR包依賴。
UDSF開發
Flink為您提供了Python自訂函數樣本,便於您快速開發自訂函數。Flink Python自訂函數樣本中包含了Python UDSF、Python UDAF和Python UDTF的實現。本文以Windows作業系統為例,為您介紹如何進行UDSF開發。
下載並解壓python_demo-master樣本到本地。
說明python_demo-master屬於第三方搭建的網站,訪問時可能會存在無法開啟或訪問延遲的問題。
在PyCharm中,單擊python_demo-master。
,開啟剛才解壓縮完成的雙擊開啟\python_demo-master\udx\udfs.py後,根據您的業務,修改udfs.py。
該樣本中,sub_string定義了擷取每條資料中從begin~end位的字元的代碼。
from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end]
在下載檔案中udx所在的目錄(即\python_demo-master目錄)下執行如下命令打包檔案。
zip -r python_demo.zip udx
\python_demo-master\目錄下會出現python_demo.zip的ZIP包,即代表完成了Python UDSF的開發工作。
UDSF註冊
UDSF註冊過程,請參見管理自訂函數(UDF)。
UDSF使用
在完成註冊UDSF後,您就可以使用UDSF,詳細的操作步驟如下。
Flink SQL作業開發。詳情請參見SQL作業開發。
擷取ASI_UDSF_Source表中a欄位中每行字串中第2~4位的字元,程式碼範例如下。
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;
在
頁面,單擊目標作業名稱操作列的啟動。啟動成功後,ASI_UDSF_Sink表每行會被插入ASI_UDSF_Source表中a欄位每行字串的第2~4位字元。