全部產品
Search
文件中心

MaxCompute:Python 3 UDTF

更新時間:Aug 02, 2024

Python官方即將停止維護Python 2,MaxCompute已支援Python 3,對應版本為CPython-3.7.3。本文為您介紹如何通過Python 3語言編寫UDTF。

UDTF代碼結構

您可以通過MaxCompute Studio工具使用Python 3語言編寫UDTF代碼,代碼中需要包含如下資訊:

  • 匯入模組:必選。

    至少要包含from odps.udf import annotatefrom odps.udf import BaseUDTFfrom odps.udf import annotate用於匯入函數簽名模組,MaxCompute才可以識別後續代碼中定義的函數簽名。from odps.udf import BaseUDTF為Python UDTF的基類,您需要通過此類在衍生類別中實現processclose等方法。

    當UDTF代碼中需要引用檔案資源或表資源時,需要包含from odps.distcache import get_cache_file(檔案資源)或from odps.distcache import get_cache_table(表資源)。

  • 函數簽名:可選。

    格式為@annotate(<signature>)signature用於定義函數的輸入參數和傳回值的資料類型。如果不指定函數簽名,在SQL中調用UDTF時,可以匹配任意類型的輸入參數,但傳回值類型無法推導,所有輸出參數都將會是STRING類型。更多函數簽名資訊,請參見函數簽名及資料類型

  • 自訂Python類(衍生類別):必選。

    UDTF代碼的組織單位,定義了實現業務需求的變數及方法。您還可以在代碼中引用MaxCompute內建的第三方庫或引用檔案、表資源。更多資訊,請參見第三方庫引用資源

  • 實現Python類的方法:必選。

    Python類實現包含如下4個方法,您可以根據實際需要進行選擇。

    方法定義描述
    BaseUDTF.init()初始化方法。衍生類別如果需要實現此方法,必須在一開始調用基類的初始化方法super(BaseUDTF, self).init()init方法在整個UDTF生命週期中只會被調用一次,即在處理第一條記錄之前。當UDTF需要儲存內部狀態時,可以通過此方法初始化所有狀態。
    BaseUDTF.process([args, ...])SQL中每一條記錄都會對應調用一次processprocess的參數為SQL語句中指定的UDTF輸入參數。
    BaseUDTF.forward([args, ...])UDTF的輸出方法。此方法由使用者代碼調用。每調用一次forward,就會輸出一條記錄。forward的參數為SQL語句中指定的UDTF的輸出參數。

    如果Python代碼中未指定函數簽名,在調用forward方法時,必須將所有輸出值轉換為STRING類型。

    BaseUDTF.close()UDTF的結束方法。只會被調用一次,即在處理完最後一條記錄之後被調用。

UDTF程式碼範例如下。

#匯入函數簽名模組及基類。
from odps.udf import annotate
from odps.udf import BaseUDTF
#函數簽名。
@annotate('string -> string')
#自訂Python類。
class Explode(BaseUDTF):
#實現Python類的方法。
   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)
說明

Python 2 UDTF與Python 3 UDTF區別在於底層Python語言版本不一致,請您根據對應版本語言支援的能力編寫UDTF。

注意事項

Python 3與Python 2不相容。在您使用Python 3之前,需要考慮相容性問題,在一個SQL中不允許同時使用Python 3和Python 2。

說明

Python 2官方已於2020年初停止維護,建議您根據項目類型執行遷移操作:全新專案:新MaxCompute停止維護Python 2,

Python 2 UDTF遷移

Python 2官方即將停止維護,建議您根據項目類型執行遷移操作:

  • 全新專案:新MaxCompute專案,或第一次使用Python語言編寫UDTF的MaxCompute專案。建議所有的Python UDTF都直接使用Python 3語言編寫。

  • 存量專案:建立了大量Python 2 UDTF的MaxCompute專案。請您謹慎開啟Python 3。如果您計劃逐步將所有Python 2 UDTF遷移為Python 3 UDTF,推薦方法如下:

    • 新作業和新UDTF:使用Python 3語言編寫,在Session層級開啟Python 3。開啟Python 3方法,請參見開啟Python 3

    • Python 2 UDTF:改寫Python 2 UDTF,使其可以同時相容Python 2和Python 3。改寫方法請參見將Python 2代碼移植到Python 3

      說明

      如果您需要編寫公用UDTF,並為多個MaxCompute專案授權UDTF的操作許可權,建議UDTF同時相容Python 2和Python 3。

開啟Python 3

MaxCompute預設使用Python 2,如果您要使用Python 3,可以在Session層級設定如下屬性開啟Python 3,並與SQL語句一起提交執行。

set odps.sql.python.version=cp37;

第三方庫

MaxCompute內建的Python 3運行環境中未安裝第三方庫Numpy。如果您需要使用Numpy的UDTF,請手動上傳Numpy的WHEEL包。從PyPI或鏡像下載Numpy包時,包的檔案名稱為numpy-<版本號碼>-cp37-cp37m-manylinux1_x86_64.whl。上傳包的操作請參見資源操作UDF樣本:Python UDF使用第三方包

函數簽名及資料類型

函數簽名格式如下。

@annotate(<signature>)

signature為函數簽名字串,用於標識輸入參數和傳回值的資料類型。執行UDTF時,UDTF函數的輸入參數和傳回值類型要與函數簽名指定的類型一致。查詢語義解析階段會檢查不符合函數簽名定義的用法,檢查到類型不符時會報錯。具體格式如下。

'arg_type_list -> type_list'

其中:

  • type_list:表示傳回值的資料類型。UDTF可以返回多列。支援的資料類型為:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。

  • arg_type_list:表示輸入參數的資料類型。輸入參數可以為多個,用英文逗號(,)分隔。支援的資料類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。

    arg_type_list還支援星號(*)或為空白(''):

    • arg_type_list為星號(*)時,表示輸入參數為任意個數。

    • arg_type_list為空白('')時,表示無輸入參數。

    更多Resolve註解文法擴充詳情,請參見UDAF和UDTF動態參數說明

說明

在編寫UDTF代碼過程中,您可以根據MaxCompute專案的資料類型版本選取合適的資料類型,更多資料類型版本及各版本支援的資料類型資訊,請參見資料類型版本說明

合法的函數簽名樣本如下。

函數簽名樣本

說明

@annotate('bigint,boolean->string,datetime')

輸入參數類型為BIGINT、BOOLEAN,傳回值類型為STRING、DATETIME。

@annotate('*->string, datetime')

輸入任意個參數,傳回值類型為STRING、DATETIME。

@annotate('->double, bigint, string')

無輸入參數,傳回值類型為DOUBLE、BIGINT、STRING。

@annotate("array<string>,struct<a1:bigint,b1:string>,string->map<string,bigint>,struct<b1:bigint>")

輸入參數類型為ARRAY、STRUCT、MAP,傳回值類型為MAP、STRUCT。

為確保編寫Python UDTF過程中使用的資料類型與MaxCompute支援的資料類型保持一致,您需要關注二者間的資料類型映射關係。具體映射關係如下。

MaxCompute SQL Type

Python 3 Type

BIGINT

INT

STRING

UNICODE

DOUBLE

FLOAT

BOOLEAN

BOOL

DATETIME

DATETIME.DATETIME

FLOAT

FLOAT

CHAR

UNICODE

VARCHAR

UNICODE

BINARY

BYTES

DATE

DATETIME.DATE

DECIMAL

DECIMAL.DECIMAL

ARRAY

LIST

MAP

DICT

STRUCT

COLLECTIONS.NAMEDTUPLE

引用資源

Python UDTF可以通過odps.distcache模組引用資源,支援引用檔案資源和表資源。

  • odps.distcache.get_cache_file(resource_name):返回指定檔案資源的內容。
    • resource_name為STRING類型,對應當前MaxCompute專案中已存在的檔案資源名。如果檔案資源名非法或者沒有相應的檔案資源,會返回異常。
      說明 使用UDTF訪問資源,在建立UDTF時需要聲明引用的資源,否則會報錯。
    • 傳回值為File-like對象。在使用完此對象後,您需要調用close方法釋放開啟的資源檔。
  • odps.distcache.get_cache_table(resource_name):返回指定表資源的內容。
    • resource_name支援STRING類型,對應當前MaxCompute專案中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。
    • 傳回值為GENERATOR類型,調用者以遍曆方式擷取表的內容,每次遍曆可得到以數組形式存在的表中的一條記錄。

引用檔案資源和表資源的程式碼範例如下。

from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
    """讀取資源檔和資源表裡的pageid、adid_list,產生dict
    """
    def __init__(self):
        import json
        cache_file = get_cache_file('test_json.txt')
        self.my_dict = json.load(cache_file)
        cache_file.close()
        records = list(get_cache_table('table_resource1'))
        for record in records:
            self.my_dict[record[0]] = record[1]
    """輸入pageid,輸出pageid以及它對應的所有adid
    """
    def process(self, pageid):
        for adid in self.my_dict[pageid]:
            self.forward(pageid, adid)

使用說明

按照開發流程,完成Python 3 UDTF開發後,您即可通過MaxCompute SQL調用Python 3 UDTF。調用方法如下:

  • 在歸屬MaxCompute專案中使用自訂函數:使用方法與內建函數類似,您可以參照內建函數的使用方法使用自訂函數。

  • 跨專案使用自訂函數:即在專案A中使用專案B的自訂函數,跨專案分享語句樣本:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨專案分享資訊,請參見基於Package跨專案訪問資源

使用MaxCompute Studio完整開發及調用Python 3 UDTF的操作,請參見開發Python UDF