本文為您介紹Python自訂彙總函式(UDAF)開發、註冊和使用流程。
定義
自訂彙總函式(UDAF),將多條記錄彙總成1條記錄。其輸入與輸出是多對一的關係,即將多條輸入記錄彙總成一條輸出值。
使用限制
由於Flink全託管產品受部署環境和網路環境等因素的影響,所以開發Python自訂函數時,需要注意以下限制:
- 僅支援開源Flink V1.12及以上版本。
- Flink全託管叢集上已預裝了Python 3.7.9,因此需要您在Python 3.7.9版本開發代碼。
- Flink全託管運行環境使用的是JDK 1.8,如果Python作業中依賴第三方JAR包,請確保JAR包相容JDK 1.8。
- 僅支援開源Scala V2.11版本,如果Python作業中依賴第三方JAR包,請確保使用Scala V2.11對應的JAR包依賴。
UDAF開發
說明
Flink為您提供了Python UDX樣本,便於您快速開發UDX。Flink Python UDX樣本中包含了Python UDF、Python UDAF和Python UDTF的實現。本文以Windows作業系統為例,為您介紹如何進行UDAF開發。
- 下載並解壓python_demo-master樣本到本地。
- 在PyCharm中,單擊python_demo-master。 ,開啟剛才解壓縮完成的
雙擊開啟\python_demo-master\udx\udafs.py後,根據您的業務,配置udafs.py。
該樣本中,weighted_avg定義了當前資料和歷史資料求含權重的均值的代碼。
from pyflink.common import Row from pyflink.table import AggregateFunction, DataTypes from pyflink.table.udf import udaf class WeightedAvg(AggregateFunction): def create_accumulator(self): # Row(sum, count) return Row(0, 0) def get_value(self, accumulator: Row) -> float: if accumulator[1] == 0: return 0 else: return accumulator[0] / accumulator[1] def accumulate(self, accumulator: Row, value, weight): accumulator[0] += value * weight accumulator[1] += weight def retract(self, accumulator: Row, value, weight): accumulator[0] -= value * weight accumulator[1] -= weight weighted_avg = udaf(f=WeightedAvg(), result_type=DataTypes.DOUBLE(), accumulator_type=DataTypes.ROW([ DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())]))
在下載檔案中udx所在的目錄(即\python_demo-master目錄)下執行如下命令打包檔案。
zip -r python_demo.zip udx
\python_demo-master\目錄下會出現python_demo.zip的ZIP包,即代表完成了UDAF開發工作。
UDAF註冊
UDAF註冊過程,請參見管理自訂函數(UDF)。
UDAF使用
在完成註冊UDAF後,您就可以使用UDAF,詳細的操作步驟如下。
Flink SQL作業開發。詳情請參見SQL作業開發。
擷取ASI_UDAF_Source表中a欄位以b欄位為權重的值,程式碼範例如下。
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( avg_value DOUBLE ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDAF_Sink SELECT weighted_avg(a, b) FROM ASI_UDAF_Source;
在
頁面,單擊目標作業名稱操作列的啟動。啟動成功後,ASI_UDAF_Sink表每行會被插入ASI_UDAF_Source表中以b欄位為權重的a欄位當前資料和歷史資料的均值。