全部產品
Search
文件中心

Realtime Compute for Apache Flink:概述

更新時間:Jul 13, 2024

Realtime ComputeFlink版支援在Flink SQL作業中使用Python自訂函數,本文為您介紹Flink Python自訂函數的分類、Python依賴使用方法和能調優方式。

自訂函數分類

分類

描述

UDSF(User Defined Scalar Function)

使用者自訂純量值函式,將0個、1個或多個標量值對應到一個新的標量值。其輸入與輸出是一對一的關係,即讀入一行資料,寫出一條輸出值。詳情請參見自訂純量涵式(UDSF)

UDAF(User Defined Aggregation Function)

自訂彙總函式,將多條記錄彙總成1條記錄。其輸入與輸出是多對一的關係,即將多條輸入記錄彙總成一條輸出值。詳情請參見自訂彙總函式(UDAF)

UDTF(User Defined Table-valued Function)

自訂表格值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自訂的純量涵式類似,但與純量涵式不同。資料表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列資料。詳情請參見自訂表格值函數(UDTF)

使用Python依賴

Realtime ComputeFlink版叢集已預裝了Pandas、NumPy和PyArrow等常用的Python包,您可以在Python作業開發頁面,瞭解Realtime ComputeFlink版中已安裝的第三方Python包列表。預裝的Python包使用時需要在Python函數內部匯入。樣本如下。

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

此外,您也可以在Python自訂函數中使用其他類型的第三方Python包。需要注意的是,如果使用了非預裝的第三方Python包,在註冊Python UDF時,需要將其作為依賴檔案上傳,詳情請參見管理自訂函數(UDF)使用Python依賴

代碼調試

您可以在Python自訂函數的代碼實現中,通過Logging的方式,輸出日誌資訊,方便問題定位,樣本如下。

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

日誌輸出後,您可以在TaskManager的記錄檔中查看日誌,詳情請參見查看作業記錄

效能調優

積極式載入資源

積極式載入資源可以在UDF初始化時提前載入資源,無需在每一次執行計算(即eval)時重新載入資源。例如,您可能只想載入一次大型深度學習模型,然後對模型多次運行批量預測。程式碼範例如下。

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
說明

關於如何上傳Python資料檔案,可以參考文檔使用Python依賴

使用Pandas庫

除了普通Python自訂函數之外,Realtime ComputeFlink版也支援您使用Pandas自訂函數。對於Pandas自訂函數,輸入資料的類型是Pandas中定義的資料結構,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自訂函數中使用Pandas和Numpy等高效能的Python庫,開發出高效能的Python自訂函數,詳情請參見Vectorized User-defined Functions

配置參數

Python自訂函數的效能在很大程度取決於Python自訂函數自身的實現,如果遇到效能問題,您需要儘可能最佳化Python自訂函數的實現。除此之外,Python自訂函數的效能也受以下參數取值的影響。

參數

說明

python.fn-execution.bundle.size

Python UDF的計算是非同步,在執行過程中,Java運算元將資料非同步發送給Python進程進行處理。Java運算元在將資料發送給Python進程之前,會先將資料緩衝起來,到達一定閾值之後,再發送給Python進程。python.fn-execution.bundle.size參數可用來控制可快取的資料最大條數。

預設值為100000,單位是條數。

python.fn-execution.bundle.time

用來控制資料的最大緩衝時間。當緩衝的資料條數到達python.fn-execution.bundle.size定義的閾值或緩衝時間到達python.fn-execution.bundle.time定義的閾值時,會觸發快取資料的計算。

預設值為1000,單位是毫秒。

python.fn-execution.arrow.batch.size

使用Pandas UDF時,一個arrow batch可容納的資料最大條數,預設值為10000。

說明

python.fn-execution.arrow.batch.size參數值不能大於python.fn-execution.bundle.size參數值。

說明

以上3個參數並不是配置的越大越好,當這些參數取值配置過大時,可能會導致Checkpoint時,需要處理過多的資料,從而導致Checkpoint時間過長,甚至會導致Checkpoint失敗。以上參數的更多詳情,請參見Configuration

相關文檔