全部產品
Search
文件中心

MaxCompute:UDF開發(Python3)

更新時間:Jun 19, 2024

MaxCompute當前支援利用Python 3語言來開發自訂函數(UDF),以滿足特定的商務邏輯需求。本文為您介紹如何通過Python 3語言編寫UDF。

UDF代碼結構

您可以通過MaxCompute Studio工具使用Python 3語言編寫UDF代碼,代碼中需要包含如下資訊:

  • 匯入模組:必選。

    至少要包含from odps.udf import annotate,匯入函數簽名模組,MaxCompute才可以識別後續代碼中定義的函數簽名。當UDF代碼中需要引用檔案資源或表資源時,需要包含from odps.distcache import get_cache_file(檔案資源)或from odps.distcache import get_cache_table(表資源)。

  • 函數簽名:必選。

    格式為@annotate(<signature>)signature用於定義函數的輸入參數和傳回值的資料類型。更多函數簽名資訊,請參見函數簽名與資料類型

  • 自訂Python類:必選。

    UDF代碼的組織單位,定義了實現業務需求的變數及方法。您還可以在代碼中引用MaxCompute內建的第三方庫或引用檔案、表資源。更多資訊,請參見第三方庫引用資源

  • evaluate方法:必選。

    位於自訂的Python類中。evaluate方法定義了輸入參數和傳回值。一個Python類中只能包含一個evaluate方法。

UDF程式碼範例如下。

#匯入函數簽名模組。
from odps.udf import annotate
#函數簽名。
@annotate("bigint,bigint->bigint")
#自訂Python類。
class MyPlus(object):
#evaluate方法。
   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1

使用限制

  • 訪問外網

    MaxCompute預設不支援通過自訂函數訪問外網。如果您需要通過自訂函數訪問外網,請根據業務情況填寫並提交網路連接申請表單,MaxCompute支援人員團隊會及時聯絡您完成網路開通操作。表單填寫指導,請參見網路開通流程

  • 訪問VPC網路

    MaxCompute預設不支援通過UDF訪問VPC網路。如果您的UDF涉及訪問VPC網路中的資源時,需要先建立MaxCompute與目標VPC網路間的網路連接,才可以直接通過UDF訪問VPC網路中的資源,操作詳情請參見通過UDF訪問VPC網路資源

  • 讀取表資料

    目前版本不支援使用UDF/UDAF/UDTF讀取以下情境的表資料:

    • 做過表結構修改(Schema Evolution)的表資料。

    • 包含複雜資料類型的表資料。

    • 包含JSON資料類型的表資料。

    • Transactional表的表資料。

注意事項

Python 3與Python 2不相容。在您使用Python 3之前,需要考慮相容性問題,在一個SQL中不允許同時使用Python 3和Python 2。

說明

Python 2官方已於2020年初停止維護,建議您根據項目類型執行遷移操作:全新專案:新MaxCompute停止維護Python 2,

UDF開發:通用流程

開發UDF時通常需進行準備工作、編寫UDF代碼、上傳並註冊UDF、調用調試UDF這幾個步驟。同時MaxCompute支援多種工具,以下以常見的MaxCompute Studio、DataWorks、odpscmd三種工具為例,以一個具體的樣本為您介紹UDF開發的通用流程。

使用MaxCompute Studio

  1. 準備工作。

    使用MaxCompute Studio開發調試UDF時,您需要先安裝MaxCompute Studio並串連MaxCompute專案,做好UDF開發前準備工作。操作詳情請參見:

    1. 安裝MaxCompute Studio

    2. 建立MaxCompute專案串連

    3. 配置Python開發環境

  2. 編寫UDF代碼。

    1. Project地區MaxCompute Studio目錄下,按右鍵scripts,選擇New > MaxCompute Python

    2. Create new MaxCompute python class對話方塊中輸入類名Name,選擇類型為Python UDF,單擊OK完成。

    3. 在編輯框中編寫UDF代碼。

      from odps.udf import annotate
      
      @annotate("string,bigint->string")
      class GetUrlChar(object):
      
          def evaluate(self, url, n):
              if n == 0:
                  return ""
              try:
                  index = url.find(".htm")
                  if index < 0:
                      return ""
                  a = url[:index]
                  index = a.rfind("/")
                  b = a[index + 1:]
                  c = b.split("-")
                  if len(c) < n:
                      return ""
                  return c[-n]
              except Exception:
                  return "Internal error"
      說明

      如果需要本地調試Java UDF,請參見測試UDF

  3. 上傳並註冊UDF。

    按右鍵目標Python程式,選擇Deploy to server…。配置函數名稱後單擊ok操作詳情請參見上傳及註冊

    本樣本配置函數名稱為UDF_GET_URL_CHAR

  4. 調用UDF。

    在左側導覽列單擊Project Explore,在目標MaxCompute專案上單擊右鍵,選擇Open Console並在Console地區輸入調用UDF的SQL語句,按Enter鍵運行即可。SQL命令樣本如下。

    set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3
    select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);

    返回結果如下。

    +-----+
    | _c0 |
    +-----+
    |  a  |
    +-----+

使用DataWorks

  1. 準備工作。

    使用DataWorks開發調試UDF時,您需要先開通DataWorks並綁定MaxCompute專案,做好UDF開發前準備工作。操作詳情請參見使用DataWorks串連

  2. 編寫UDF代碼。

    您可以在任意Python開發工具中開發UDF代碼並打包為一個程式碼封裝。您可以使用以下UDF程式碼範例。

    from odps.udf import annotate
    
    @annotate("string,bigint->string")
    class GetUrlChar(object):
    
        def evaluate(self, url, n):
            if n == 0:
                return ""
            try:
                index = url.find(".htm")
                if index < 0:
                    return ""
                a = url[:index]
                index = a.rfind("/")
                b = a[index + 1:]
                c = b.split("-")
                if len(c) < n:
                    return ""
                return c[-n]
            except Exception:
                return "Internal error"
  3. 上傳並註冊UDF。

    您可以將已打包好的程式碼封裝通過DataWorks上傳並完成UDF註冊,操作詳情請參見:

    1. 建立並使用MaxCompute資源

    2. 建立並使用自訂函數

  4. 調用UDF。

    註冊完成UDF後,您可以建立一個ODPS SQL節點,在節點中編寫並建立SQL命令來調用調試UDF。建立ODPS SQL節點的操作請參見開發ODPS SQL任務,命令樣本如下。

    set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3
    select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);

使用odpscmd

  1. 準備工作。

    使用odpscmd開發調試UDF時,您需要先下載安裝odpscmd工具,並配置config檔案串連MaxCompute專案,做好UDF開發前準備工作。操作詳情請參見使用本地用戶端(odpscmd)串連

  2. 編寫UDF代碼。

    您可以在任意Python開發工具中開發UDF代碼並打包為一個程式碼封裝。您可以使用以下UDF程式碼範例。

    from odps.udf import annotate
    
    @annotate("string,bigint->string")
    class GetUrlChar(object):
    
        def evaluate(self, url, n):
            if n == 0:
                return ""
            try:
                index = url.find(".htm")
                if index < 0:
                    return ""
                a = url[:index]
                index = a.rfind("/")
                b = a[index + 1:]
                c = b.split("-")
                if len(c) < n:
                    return ""
                return c[-n]
            except Exception:
                return "Internal error"
  3. 上傳並註冊UDF。

    您可以將已打包好的程式碼封裝通過odpscmd上傳並完成UDF註冊,操作詳情請參見:

    1. ADD PY

    2. CREATE FUNCTION

  4. 調用UDF。

    註冊完成UDF後,您可以編寫並建立SQL命令來調用調試UDF。命令樣本如下。

    set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3
    select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);

UDF開發:安裝第三方庫Numpy

MaxCompute內建的Python 3運行環境中未安裝第三方庫Numpy。如果您需要使用Numpy的UDF,請手動上傳Numpy的WHEEL包。從PyPI或鏡像下載Numpy包時,包的檔案名稱為numpy-<版本號碼>-cp37-cp37m-manylinux1_x86_64.whl。上傳包的操作請參見資源操作Python UDF使用第三方包

Python 3支援的標準庫列表請參見Python 3標準庫

UDF開發:函數簽名與資料類型

函數簽名格式如下。

@annotate(<signature>)

signature為字串,用於標識輸入參數和傳回值的資料類型。執行UDF時,UDF函數的輸入參數和傳回值類型要與函數簽名指定的類型一致。查詢語義解析階段會檢查不符合函數簽名定義的用法,檢查到類型不符時會報錯。具體格式如下。

'arg_type_list -> type'

其中:

  • arg_type_list:表示輸入參數的資料類型。輸入參數可以為多個,用英文逗號(,)分隔。支援的資料類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。

    arg_type_list還支援星號(*)或為空白(''):

    • arg_type_list為星號(*)時,表示輸入參數為任意個數。

    • arg_type_list為空白('')時,表示無輸入參數。

  • type:表示傳回值的資料類型。UDF只返回一列。支援的資料類型為:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、複雜資料類型(ARRAY、MAP、STRUCT)或複雜資料類型嵌套。

說明

在編寫UDF代碼過程中,您可以根據MaxCompute專案的資料類型版本選取合適的資料類型,更多資料類型版本及各版本支援的資料類型資訊,請參見資料類型版本說明

合法的函數簽名樣本如下。

函數簽名樣本

說明

'bigint,double->string'

輸入參數類型為BIGINT、DOUBLE,傳回值類型為STRING。

'*->string'

輸入任意個參數,傳回值類型為STRING。

'->double'

無輸入參數,傳回值類型為DOUBLE。

'array<bigint>->struct<x:string, y:int>'

輸入參數類型為ARRAY<BIGINT>,傳回值類型為STRUCT<x:STRING, y:INT>。

'->map<bigint, string>'

無輸入參數,傳回值類型為MAP<BIGINT, STRING>。

為確保編寫Python UDF過程中使用的資料類型與MaxCompute支援的資料類型保持一致,您需要關注二者間的資料類型映射關係。具體映射關係如下。

MaxCompute SQL Type

Python 3 Type

BIGINT

INT

STRING

UNICODE

DOUBLE

FLOAT

BOOLEAN

BOOL

DATETIME

DATETIME.DATETIME

FLOAT

FLOAT

CHAR

UNICODE

VARCHAR

UNICODE

BINARY

BYTES

DATE

DATETIME.DATE

DECIMAL

DECIMAL.DECIMAL

ARRAY

LIST

MAP

DICT

STRUCT

COLLECTIONS.NAMEDTUPLE

UDF開發:引用資源

Python UDF可以通過odps.distcache模組引用資源,支援引用檔案資源和表資源。

  • odps.distcache.get_cache_file(resource_name, mode):以指定模式mode返回指定檔案資源的內容。

    • resource_name支援STRING類型,對應當前MaxCompute專案中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。

    • mode支援STRING類型,預設值為't'。當mode't'時以文字格式設定開啟檔案,當mode'b'時以二進位格式開啟檔案。

    • 傳回值為File-like對象。在使用完此對象後,您需要調用close方法釋放開啟的資源檔。

    引用檔案資源樣本如下。

    from odps.udf import annotate
    from odps.distcache import get_cache_file
    @annotate('bigint->string')
    class DistCacheExample(object):
    def __init__(self):
        cache_file = get_cache_file('test_distcache.txt')
        kv = {}
        for line in cache_file:
            line = line.strip()
            if not line:
                continue
            k, v = line.split()
            kv[int(k)] = v
        cache_file.close()
        self.kv = kv
    def evaluate(self, arg):
        return self.kv.get(arg)
  • odps.distcache.get_cache_table(resource_name):返回指定資源表的內容。

    • resource_name對應當前MaxCompute專案中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。支援讀取表中BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、FLOAT、CHAR、VARCHAR、BINARY、DATE、DECIMAL、ARRAY、MAP和STRUCT類型資料。

    • 傳回值為Generator類型,調用者通過遍曆擷取表的內容,每次遍曆得到的是以數組形式存在的表中的一條記錄。

參考資料表資源樣本如下。

from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
    def __init__(self):
        self.records = list(get_cache_table('udf_test'))
        self.counter = 0
        self.ln = len(self.records)
    def evaluate(self):
        if self.counter > self.ln - 1:
            return None
        ret = self.records[self.counter]
        self.counter += 1
        return str(ret)

UDF開發完成後:UDF調用說明

按照開發流程,完成Python 3 UDF開發後,您即可通過MaxCompute SQL調用Python 3 UDF。調用方法如下。

開啟Python 3

MaxCompute預設使用Python 2,如果您要使用Python 3,可以在Session層級設定如下屬性開啟Python 3,並與SQL語句一起提交執行。

set odps.sql.python.version=cp37;

調用函數

  • 在歸屬MaxCompute專案中使用自訂函數:使用方法與內建函數類似,您可以參照內建函數的使用方法使用自訂函數。

  • 跨專案使用自訂函數:即在專案A中使用專案B的自訂函數,跨專案分享語句樣本:select B:udf_in_other_project(arg0, arg1) as res from table_t;。更多跨專案分享資訊,請參見基於Package跨專案訪問資源

Python 2 UDF遷移

Python 2官方已於2020年初停止維護,建議您根據項目類型執行遷移操作:

  • 全新專案:新MaxCompute專案,或第一次使用Python語言編寫UDF的MaxCompute專案。建議所有的Python UDF都直接使用Python 3語言編寫。

  • 存量專案:建立了大量Python 2 UDF的MaxCompute專案。請您謹慎開啟Python 3。如果您計劃逐步將所有Python 2 UDF遷移為Python 3 UDF,推薦方法如下:

    • 新作業和新UDF:使用Python 3語言編寫,在Session層級開啟Python 3。開啟Python 3方法,請參見開啟Python 3

    • Python 2 UDF:改寫Python 2 UDF,使其可以同時相容Python 2和Python 3。改寫方法請參見將Python 2代碼移植到Python 3

      說明

      如果您需要編寫公用UDF,並為多個MaxCompute專案授權UDF的操作許可權,建議UDF同時相容Python 2和Python 3。

UDF樣本demo