MaxCompute當前支援利用Python 3語言來開發自訂函數(UDF),以滿足特定的商務邏輯需求。本文為您介紹如何通過Python 3語言編寫UDF。
UDF代碼結構
您可以通過MaxCompute Studio工具使用Python 3語言編寫UDF代碼,代碼中需要包含如下資訊:
匯入模組:必選。
至少要包含
from odps.udf import annotate
,匯入函數簽名模組,MaxCompute才可以識別後續代碼中定義的函數簽名。當UDF代碼中需要引用檔案資源或表資源時,需要包含from odps.distcache import get_cache_file
(檔案資源)或from odps.distcache import get_cache_table
(表資源)。函數簽名:必選。
格式為
@annotate(<signature>)
,signature
用於定義函數的輸入參數和傳回值的資料類型。更多函數簽名資訊,請參見函數簽名與資料類型。自訂Python類:必選。
UDF代碼的組織單位,定義了實現業務需求的變數及方法。您還可以在代碼中引用MaxCompute內建的第三方庫或引用檔案、表資源。更多資訊,請參見第三方庫或引用資源。
evaluate
方法:必選。位於自訂的Python類中。
evaluate
方法定義了輸入參數和傳回值。一個Python類中只能包含一個evaluate
方法。
UDF程式碼範例如下。
#匯入函數簽名模組。
from odps.udf import annotate
#函數簽名。
@annotate("bigint,bigint->bigint")
#自訂Python類。
class MyPlus(object):
#evaluate方法。
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
使用限制
訪問外網
MaxCompute預設不支援通過自訂函數訪問外網。如果您需要通過自訂函數訪問外網,請根據業務情況填寫並提交網路連接申請表單,MaxCompute支援人員團隊會及時聯絡您完成網路開通操作。表單填寫指導,請參見網路開通流程。
訪問VPC網路
MaxCompute預設不支援通過UDF訪問VPC網路。如果您的UDF涉及訪問VPC網路中的資源時,需要先建立MaxCompute與目標VPC網路間的網路連接,才可以直接通過UDF訪問VPC網路中的資源,操作詳情請參見通過UDF訪問VPC網路資源。
讀取表資料
目前版本不支援使用UDF/UDAF/UDTF讀取以下情境的表資料:
做過表結構修改(Schema Evolution)的表資料。
包含複雜資料類型的表資料。
包含JSON資料類型的表資料。
Transactional表的表資料。
注意事項
Python 3與Python 2不相容。在您使用Python 3之前,需要考慮相容性問題,在一個SQL中不允許同時使用Python 3和Python 2。
Python 2官方已於2020年初停止維護,建議您根據項目類型執行遷移操作:全新專案:新MaxCompute停止維護Python 2,
UDF開發:通用流程
開發UDF時通常需進行準備工作、編寫UDF代碼、上傳並註冊UDF、調用調試UDF這幾個步驟。同時MaxCompute支援多種工具,以下以常見的MaxCompute Studio、DataWorks、odpscmd三種工具為例,以一個具體的樣本為您介紹UDF開發的通用流程。
使用MaxCompute Studio
準備工作。
使用MaxCompute Studio開發調試UDF時,您需要先安裝MaxCompute Studio並串連MaxCompute專案,做好UDF開發前準備工作。操作詳情請參見:
編寫UDF代碼。
在Project地區MaxCompute Studio目錄下,按右鍵scripts,選擇 。
在Create new MaxCompute python class對話方塊中輸入類名Name,選擇類型為Python UDF,單擊OK完成。
在編輯框中編寫UDF代碼。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
說明如果需要本地調試Java UDF,請參見測試UDF。
上傳並註冊UDF。
按右鍵目標Python程式,選擇Deploy to server…。配置函數名稱後單擊ok。操作詳情請參見上傳及註冊。
本樣本配置函數名稱為UDF_GET_URL_CHAR。
調用UDF。
在左側導覽列單擊Project Explore,在目標MaxCompute專案上單擊右鍵,選擇Open Console並在Console地區輸入調用UDF的SQL語句,按Enter鍵運行即可。SQL命令樣本如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
返回結果如下。
+-----+ | _c0 | +-----+ | a | +-----+
使用DataWorks
準備工作。
使用DataWorks開發調試UDF時,您需要先開通DataWorks並綁定MaxCompute專案,做好UDF開發前準備工作。操作詳情請參見使用DataWorks串連。
編寫UDF代碼。
您可以在任意Python開發工具中開發UDF代碼並打包為一個程式碼封裝。您可以使用以下UDF程式碼範例。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
上傳並註冊UDF。
您可以將已打包好的程式碼封裝通過DataWorks上傳並完成UDF註冊,操作詳情請參見:
調用UDF。
註冊完成UDF後,您可以建立一個ODPS SQL節點,在節點中編寫並建立SQL命令來調用調試UDF。建立ODPS SQL節點的操作請參見開發ODPS SQL任務,命令樣本如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
使用odpscmd
準備工作。
使用odpscmd開發調試UDF時,您需要先下載安裝odpscmd工具,並配置config檔案串連MaxCompute專案,做好UDF開發前準備工作。操作詳情請參見使用本地用戶端(odpscmd)串連。
編寫UDF代碼。
您可以在任意Python開發工具中開發UDF代碼並打包為一個程式碼封裝。您可以使用以下UDF程式碼範例。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
上傳並註冊UDF。
您可以將已打包好的程式碼封裝通過odpscmd上傳並完成UDF註冊,操作詳情請參見:
調用UDF。
註冊完成UDF後,您可以編寫並建立SQL命令來調用調試UDF。命令樣本如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
UDF開發:安裝第三方庫Numpy
MaxCompute內建的Python 3運行環境中未安裝第三方庫Numpy。如果您需要使用Numpy的UDF,請手動上傳Numpy的WHEEL包。從PyPI或鏡像下載Numpy包時,包的檔案名稱為numpy-<版本號碼>-cp37-cp37m-manylinux1_x86_64.whl。上傳包的操作請參見資源操作或Python UDF使用第三方包。
Python 3支援的標準庫列表請參見Python 3標準庫。
UDF開發:函數簽名與資料類型
函數簽名格式如下。
@annotate(<signature>)
signature
為字串,用於標識輸入參數和傳回值的資料類型。執行UDF時,UDF函數的輸入參數和傳回值類型要與函數簽名指定的類型一致。查詢語義解析階段會檢查不符合函數簽名定義的用法,檢查到類型不符時會報錯。具體格式如下。
'arg_type_list -> type'
其中:
arg_type_list
:表示輸入參數的資料類型。輸入參數可以為多個,用英文逗號(,)分隔。支援的資料類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。arg_type_list
還支援星號(*)或為空白(''):當
arg_type_list
為星號(*)時,表示輸入參數為任意個數。當
arg_type_list
為空白('')時,表示無輸入參數。
type
:表示傳回值的資料類型。UDF只返回一列。支援的資料類型為:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。
在編寫UDF代碼過程中,您可以根據MaxCompute專案的資料類型版本選取合適的資料類型,更多資料類型版本及各版本支援的資料類型資訊,請參見資料類型版本說明。
合法的函數簽名樣本如下。
函數簽名樣本 | 說明 |
| 輸入參數類型為BIGINT、DOUBLE,傳回值類型為STRING。 |
| 輸入任意個參數,傳回值類型為STRING。 |
| 無輸入參數,傳回值類型為DOUBLE。 |
| 輸入參數類型為ARRAY<BIGINT>,傳回值類型為STRUCT<x:STRING, y:INT>。 |
| 無輸入參數,傳回值類型為MAP<BIGINT, STRING>。 |
為確保編寫Python UDF過程中使用的資料類型與MaxCompute支援的資料類型保持一致,您需要關注二者間的資料類型映射關係。具體映射關係如下。
MaxCompute SQL Type | Python 3 Type |
BIGINT | INT |
STRING | UNICODE |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | DATETIME.DATETIME |
FLOAT | FLOAT |
CHAR | UNICODE |
VARCHAR | UNICODE |
BINARY | BYTES |
DATE | DATETIME.DATE |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
UDF開發:引用資源
Python UDF可以通過odps.distcache
模組引用資源,支援引用檔案資源和表資源。
odps.distcache.get_cache_file(resource_name, mode)
:以指定模式mode
返回指定檔案資源的內容。resource_name
支援STRING類型,對應當前MaxCompute專案中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。mode
支援STRING類型,預設值為't'
。當mode
為't'
時以文字格式設定開啟檔案,當mode
為'b'
時以二進位格式開啟檔案。傳回值為File-like對象。在使用完此對象後,您需要調用
close
方法釋放開啟的資源檔。
引用檔案資源樣本如下。
from odps.udf import annotate from odps.distcache import get_cache_file @annotate('bigint->string') class DistCacheExample(object): def __init__(self): cache_file = get_cache_file('test_distcache.txt') kv = {} for line in cache_file: line = line.strip() if not line: continue k, v = line.split() kv[int(k)] = v cache_file.close() self.kv = kv def evaluate(self, arg): return self.kv.get(arg)
odps.distcache.get_cache_table(resource_name)
:返回指定資源表的內容。resource_name
對應當前MaxCompute專案中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。支援讀取表中BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、FLOAT、CHAR、VARCHAR、BINARY、DATE、DECIMAL、ARRAY、MAP和STRUCT類型資料。傳回值為Generator類型,調用者通過遍曆擷取表的內容,每次遍曆得到的是以數組形式存在的表中的一條記錄。
參考資料表資源樣本如下。
from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
def __init__(self):
self.records = list(get_cache_table('udf_test'))
self.counter = 0
self.ln = len(self.records)
def evaluate(self):
if self.counter > self.ln - 1:
return None
ret = self.records[self.counter]
self.counter += 1
return str(ret)
UDF開發完成後:UDF調用說明
按照開發流程,完成Python 3 UDF開發後,您即可通過MaxCompute SQL調用Python 3 UDF。調用方法如下。
開啟Python 3
MaxCompute預設使用Python 2,如果您要使用Python 3,可以在Session層級設定如下屬性開啟Python 3,並與SQL語句一起提交執行。
set odps.sql.python.version=cp37;
調用函數
在歸屬MaxCompute專案中使用自訂函數:使用方法與內建函數類似,您可以參照內建函數的使用方法使用自訂函數。
跨專案使用自訂函數:即在專案A中使用專案B的自訂函數,跨專案分享語句樣本:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨專案分享資訊,請參見基於Package跨專案訪問資源。
Python 2 UDF遷移
Python 2官方已於2020年初停止維護,建議您根據項目類型執行遷移操作:
全新專案:新MaxCompute專案,或第一次使用Python語言編寫UDF的MaxCompute專案。建議所有的Python UDF都直接使用Python 3語言編寫。
存量專案:建立了大量Python 2 UDF的MaxCompute專案。請您謹慎開啟Python 3。如果您計劃逐步將所有Python 2 UDF遷移為Python 3 UDF,推薦方法如下:
新作業和新UDF:使用Python 3語言編寫,在Session層級開啟Python 3。開啟Python 3方法,請參見開啟Python 3。
Python 2 UDF:改寫Python 2 UDF,使其可以同時相容Python 2和Python 3。改寫方法請參見將Python 2代碼移植到Python 3。
說明如果您需要編寫公用UDF,並為多個MaxCompute專案授權UDF的操作許可權,建議UDF同時相容Python 2和Python 3。