Realtime ComputeFlink版支援在Flink SQL作業中使用Python自訂函數,本文為您介紹Flink Python自訂函數的分類、Python依賴使用方法和調優方式。
自訂函數分類
分類 | 描述 |
UDSF(User Defined Scalar Function) | 使用者自訂純量值函式,將0個、1個或多個標量值對應到一個新的標量值。其輸入與輸出是一對一的關係,即讀入一行資料,寫出一條輸出值。詳情請參見自訂純量涵式(UDSF)。 |
UDAF(User Defined Aggregation Function) | 自訂彙總函式,將多條記錄彙總成1條記錄。其輸入與輸出是多對一的關係,即將多條輸入記錄彙總成一條輸出值。詳情請參見自訂彙總函式(UDAF)。 |
UDTF(User Defined Table-valued Function) | 自訂表格值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自訂的純量涵式類似,但與純量涵式不同。資料表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列資料。詳情請參見自訂表格值函數(UDTF)。 |
使用Python依賴
Realtime ComputeFlink版叢集已預裝了Pandas、NumPy和PyArrow等常用的Python包,您可以在Python作業開發頁面,瞭解Realtime ComputeFlink版中已安裝的第三方Python包列表。預裝的Python包使用時需要在Python函數內部匯入。樣本如下。
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)
此外,您也可以在Python自訂函數中使用其他類型的第三方Python包。需要注意的是,如果使用了非預裝的第三方Python包,在註冊Python UDF時,需要將其作為依賴檔案上傳,詳情請參見管理自訂函數(UDF)和使用Python依賴。
代碼調試
您可以在Python自訂函數的代碼實現中,通過Logging的方式,輸出日誌資訊,方便問題定位,樣本如下。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + j
日誌輸出後,您可以在TaskManager的記錄檔中查看日誌,詳情請參見查看作業記錄。
效能調優
積極式載入資源
積極式載入資源可以在UDF初始化時提前載入資源,無需在每一次執行計算(即eval)時重新載入資源。例如,您可能只想載入一次大型深度學習模型,然後對模型多次運行批量預測。程式碼範例如下。
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
關於如何上傳Python資料檔案,可以參考文檔使用Python依賴。
使用Pandas庫
除了普通Python自訂函數之外,Realtime ComputeFlink版也支援您使用Pandas自訂函數。對於Pandas自訂函數,輸入資料的類型是Pandas中定義的資料結構,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自訂函數中使用Pandas和NumPy等高效能的Python庫,開發出高效能的Python自訂函數,詳情請參見Vectorized User-defined Functions。
配置參數
Python自訂函數的效能在很大程度取決於Python自訂函數自身的實現,如果遇到效能問題,您需要儘可能最佳化Python自訂函數的實現。除此之外,Python自訂函數的效能也受以下參數取值的影響。
參數 | 說明 |
python.fn-execution.bundle.size | Python UDF的計算是非同步,在執行過程中,Java運算元將資料非同步發送給Python進程進行處理。Java運算元在將資料發送給Python進程之前,會先將資料緩衝起來,到達一定閾值之後,再發送給Python進程。python.fn-execution.bundle.size參數可用來控制可快取的資料最大條數。 預設值為100000,單位是條數。 |
python.fn-execution.bundle.time | 用來控制資料的最大緩衝時間。當緩衝的資料條數到達python.fn-execution.bundle.size定義的閾值或緩衝時間到達python.fn-execution.bundle.time定義的閾值時,會觸發快取資料的計算。 預設值為1000,單位是毫秒。 |
python.fn-execution.arrow.batch.size | 使用Pandas UDF時,一個arrow batch可容納的資料最大條數,預設值為10000。 說明 python.fn-execution.arrow.batch.size參數值不能大於python.fn-execution.bundle.size參數值。 |
以上3個參數並不是配置的越大越好,當這些參數取值配置過大時,可能會導致Checkpoint時,需要處理過多的資料,從而導致Checkpoint時間過長,甚至會導致Checkpoint失敗。以上參數的更多詳情,請參見Configuration。
相關文檔
自訂函數的註冊、更新及刪除方法,請參見管理自訂函數(UDF)。
Python自訂函數的開發和使用demo,請參見自訂彙總函式(UDAF)、自訂純量涵式(UDSF)和自訂表格值函數(UDTF)。
如何在Flink Python作業中使用自訂的Python虛擬環境、第三方Python包、JAR包和資料檔案等,請參見使用Python依賴。
JAVA自訂函數的開發和使用demo,請參見自訂彙總函式(UDAF)、自訂純量涵式(UDSF)和自訂表格值函數(UDTF)。
JAVA自訂函數的調試和調優方法,請參見概述。