Python官方即將停止維護Python 2,MaxCompute已支援Python 3,對應版本為CPython-3.7.3。本文為您介紹如何通過Python 3語言編寫UDTF。
UDTF代碼結構
您可以通過MaxCompute Studio工具使用Python 3語言編寫UDTF代碼,代碼中需要包含如下資訊:
- 匯入模組:必選。
至少要包含
from odps.udf import annotate
和from odps.udf import BaseUDTF
。from odps.udf import annotate
用於匯入函數簽名模組,MaxCompute才可以識別後續代碼中定義的函數簽名。from odps.udf import BaseUDTF
為Python UDTF的基類,您需要通過此類在衍生類別中實現process
或close
等方法。當UDTF代碼中需要引用檔案資源或表資源時,需要包含
from odps.distcache import get_cache_file
(檔案資源)或from odps.distcache import get_cache_table
(表資源)。 函數簽名:可選。
格式為
@annotate(<signature>)
,signature
用於定義函數的輸入參數和傳回值的資料類型。如果不指定函數簽名,在SQL中調用UDTF時,可以匹配任意類型的輸入參數,但傳回值類型無法推導,所有輸出參數都將會是STRING類型。更多函數簽名資訊,請參見函數簽名及資料類型。自訂Python類(衍生類別):必選。
UDTF代碼的組織單位,定義了實現業務需求的變數及方法。您還可以在代碼中引用MaxCompute內建的第三方庫或引用檔案、表資源。更多資訊,請參見第三方庫或引用資源。
- 實現Python類的方法:必選。
Python類實現包含如下4個方法,您可以根據實際需要進行選擇。
方法定義 描述 BaseUDTF.init()
初始化方法。衍生類別如果需要實現此方法,必須在一開始調用基類的初始化方法 super(BaseUDTF, self).init()
。init
方法在整個UDTF生命週期中只會被調用一次,即在處理第一條記錄之前。當UDTF需要儲存內部狀態時,可以通過此方法初始化所有狀態。BaseUDTF.process([args, ...])
SQL中每一條記錄都會對應調用一次 process
,process
的參數為SQL語句中指定的UDTF輸入參數。BaseUDTF.forward([args, ...])
UDTF的輸出方法。此方法由使用者代碼調用。每調用一次 forward
,就會輸出一條記錄。forward
的參數為SQL語句中指定的UDTF的輸出參數。如果Python代碼中未指定函數簽名,在調用
forward
方法時,必須將所有輸出值轉換為STRING類型。BaseUDTF.close()
UDTF的結束方法。只會被調用一次,即在處理完最後一條記錄之後被調用。
UDTF程式碼範例如下。
#匯入函數簽名模組及基類。
from odps.udf import annotate
from odps.udf import BaseUDTF
#函數簽名。
@annotate('string -> string')
#自訂Python類。
class Explode(BaseUDTF):
#實現Python類的方法。
def process(self, arg):
props = arg.split(',')
for p in props:
self.forward(p)
Python 2 UDTF與Python 3 UDTF區別在於底層Python語言版本不一致,請您根據對應版本語言支援的能力編寫UDTF。
注意事項
Python 3與Python 2不相容。在您使用Python 3之前,需要考慮相容性問題,在一個SQL中不允許同時使用Python 3和Python 2。
Python 2官方已於2020年初停止維護,建議您根據項目類型執行遷移操作:全新專案:新MaxCompute停止維護Python 2,
Python 2 UDTF遷移
Python 2官方即將停止維護,建議您根據項目類型執行遷移操作:
全新專案:新MaxCompute專案,或第一次使用Python語言編寫UDTF的MaxCompute專案。建議所有的Python UDTF都直接使用Python 3語言編寫。
存量專案:建立了大量Python 2 UDTF的MaxCompute專案。請您謹慎開啟Python 3。如果您計劃逐步將所有Python 2 UDTF遷移為Python 3 UDTF,推薦方法如下:
新作業和新UDTF:使用Python 3語言編寫,在Session層級開啟Python 3。開啟Python 3方法,請參見開啟Python 3。
Python 2 UDTF:改寫Python 2 UDTF,使其可以同時相容Python 2和Python 3。改寫方法請參見將Python 2代碼移植到Python 3。
說明如果您需要編寫公用UDTF,並為多個MaxCompute專案授權UDTF的操作許可權,建議UDTF同時相容Python 2和Python 3。
開啟Python 3
MaxCompute預設使用Python 2,如果您要使用Python 3,可以在Session層級設定如下屬性開啟Python 3,並與SQL語句一起提交執行。
set odps.sql.python.version=cp37;
第三方庫
MaxCompute內建的Python 3運行環境中未安裝第三方庫Numpy。如果您需要使用Numpy的UDTF,請手動上傳Numpy的WHEEL包。從PyPI或鏡像下載Numpy包時,包的檔案名稱為numpy-<版本號碼>-cp37-cp37m-manylinux1_x86_64.whl。上傳包的操作請參見資源操作或UDF樣本:Python UDF使用第三方包。
函數簽名及資料類型
函數簽名格式如下。
@annotate(<signature>)
signature
為函數簽名字串,用於標識輸入參數和傳回值的資料類型。執行UDTF時,UDTF函數的輸入參數和傳回值類型要與函數簽名指定的類型一致。查詢語義解析階段會檢查不符合函數簽名定義的用法,檢查到類型不符時會報錯。具體格式如下。
'arg_type_list -> type_list'
其中:
type_list
:表示傳回值的資料類型。UDTF可以返回多列。支援的資料類型為:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。arg_type_list
:表示輸入參數的資料類型。輸入參數可以為多個,用英文逗號(,)分隔。支援的資料類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。arg_type_list
還支援星號(*)或為空白(''):當
arg_type_list
為星號(*)時,表示輸入參數為任意個數。當
arg_type_list
為空白('')時,表示無輸入參數。
更多Resolve註解文法擴充詳情,請參見UDAF和UDTF動態參數說明。
在編寫UDTF代碼過程中,您可以根據MaxCompute專案的資料類型版本選取合適的資料類型,更多資料類型版本及各版本支援的資料類型資訊,請參見資料類型版本說明。
合法的函數簽名樣本如下。
函數簽名樣本 | 說明 |
| 輸入參數類型為BIGINT、BOOLEAN,傳回值類型為STRING、DATETIME。 |
| 輸入任意個參數,傳回值類型為STRING、DATETIME。 |
| 無輸入參數,傳回值類型為DOUBLE、BIGINT、STRING。 |
| 輸入參數類型為ARRAY、STRUCT、MAP,傳回值類型為MAP、STRUCT。 |
為確保編寫Python UDTF過程中使用的資料類型與MaxCompute支援的資料類型保持一致,您需要關注二者間的資料類型映射關係。具體映射關係如下。
MaxCompute SQL Type | Python 3 Type |
BIGINT | INT |
STRING | UNICODE |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | DATETIME.DATETIME |
FLOAT | FLOAT |
CHAR | UNICODE |
VARCHAR | UNICODE |
BINARY | BYTES |
DATE | DATETIME.DATE |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
引用資源
Python UDTF可以通過odps.distcache
模組引用資源,支援引用檔案資源和表資源。
odps.distcache.get_cache_file(resource_name)
:返回指定檔案資源的內容。resource_name
為STRING類型,對應當前MaxCompute專案中已存在的檔案資源名。如果檔案資源名非法或者沒有相應的檔案資源,會返回異常。說明 使用UDTF訪問資源,在建立UDTF時需要聲明引用的資源,否則會報錯。- 傳回值為File-like對象。在使用完此對象後,您需要調用
close
方法釋放開啟的資源檔。
odps.distcache.get_cache_table(resource_name)
:返回指定表資源的內容。resource_name
支援STRING類型,對應當前MaxCompute專案中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。- 傳回值為GENERATOR類型,調用者以遍曆方式擷取表的內容,每次遍曆可得到以數組形式存在的表中的一條記錄。
引用檔案資源和表資源的程式碼範例如下。
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
"""讀取資源檔和資源表裡的pageid、adid_list,產生dict
"""
def __init__(self):
import json
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = record[1]
"""輸入pageid,輸出pageid以及它對應的所有adid
"""
def process(self, pageid):
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)
使用說明
按照開發流程,完成Python 3 UDTF開發後,您即可通過MaxCompute SQL調用Python 3 UDTF。調用方法如下:
在歸屬MaxCompute專案中使用自訂函數:使用方法與內建函數類似,您可以參照內建函數的使用方法使用自訂函數。
跨專案使用自訂函數:即在專案A中使用專案B的自訂函數,跨專案分享語句樣本:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨專案分享資訊,請參見基於Package跨專案訪問資源。
使用MaxCompute Studio完整開發及調用Python 3 UDTF的操作,請參見開發Python UDF。