全部產品
Search
文件中心

Realtime Compute for Apache Flink:自訂表格值函數(UDTF)

更新時間:Sep 14, 2024

本文為您介紹Python自訂表格值函數(UDTF)的開發、註冊和使用流程。

定義

自訂表格值函數(UDTF),將0個、1個或多個標量值作為輸入參數(可以是變長參數)。資料表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列資料。與自訂的純量涵式類似,但與純量涵式不同。

使用限制

由於Flink全託管產品受部署環境和網路環境等因素的影響,所以開發Python自訂函數時,需要注意以下限制:
  • 僅支援開源Flink V1.12及以上版本。
  • Flink全託管叢集上已預裝了Python 3.7.9,因此需要您在Python 3.7.9版本開發代碼。
  • Flink全託管運行環境使用的是JDK 1.8,如果Python作業中依賴第三方JAR包,請確保JAR包相容JDK 1.8。
  • 僅支援開源Scala V2.11版本,如果Python作業中依賴第三方JAR包,請確保使用Scala V2.11對應的JAR包依賴。

UDTF開發

說明

Flink為您提供了Python UDX樣本,便於您快速開發UDX。Flink Python UDX樣本中包含了Python UDF、Python UDAF和Python UDTF的實現。本文以Windows作業系統為例,為您介紹如何進行UDTF開發。

  1. 下載並解壓python_demo-master樣本到本地。
  2. 在PyCharm中,單擊file > open,開啟剛才解壓縮完成的python_demo-master
  3. 雙擊開啟\python_demo-master\udx\udtfs.py後,根據您的業務,修改udtfs.py檔案內容。

    該樣本中,split定義了將一行字串按照豎線(|)分割成多列字串的代碼。

    from pyflink.table import DataTypes
    from pyflink.table.udf import udtf
    
    @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
    def split(s: str):
        splits = s.split("|")
        yield splits[0], splits[1]
  4. 在下載檔案中udx所在的目錄(即\python_demo目錄)下執行如下命令打包檔案。

    zip -r python_demo.zip udx

    \python_demo\目錄下會出現python_demo.zip的ZIP包,即代表完成了UDTF開發工作。

UDTF註冊

UDTF註冊過程,請參見管理自訂函數(UDF)

UDTF使用

在完成UDTF註冊後,您就可以使用UDTF,詳細的操作步驟如下。

  1. Flink SQL作業開發。詳情請參見SQL作業開發

    ASI_UDTF_Source表中每行字串的message欄位與字串aa按照豎線(|)串連之後,按照豎線(|)分割成多列,程式碼範例如下。

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(split(concat_ws('|', `message`, 'aa'))) as T(name,place);
  2. 營運中心 > 作業營運頁面,單擊目標作業名稱操作列的啟動

    啟動成功後,ASI_UDTF_Sink表會被插入兩列資料,這兩列資料是ASI_UDTF_Source表中message和aa欄位按照豎線(|)串連後,再按照豎線(|)分割成多列的字元。