全部產品
Search
文件中心

:使用自訂函數及Python第三方庫

更新時間:Feb 28, 2024

本文為您介紹如何使用自訂函數及Python第三方庫。

使用自訂函數

DataFrame函數支援對Sequence使用map,它會對它的每個元素調用自訂函數。

>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0
說明

目前,自訂函數無法支援將List/Dict類型作為輸入或輸出。

如果map前後,Sequence的類型發生了變化,則需要顯式指定map後的類型。

>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0

如果在函數中包含閉包,則函數外閉包變數值的變化會引起函數內該變數值的變化。

>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))

結果為dfs中每個SequenceExpr均為df.sepal_length+9。為解決此問題,可以將函數作為另一函數的傳回值,或者使用partial。兩個樣本如下。

>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

map也支援使用現有的UDF函數,傳入的參數是str類型(函數名)或者Function對象,詳情請參見函數

map傳入Python函數的實現使用了MaxCompute Python UDF。因此,如果您所在的Project不支援Python UDF,則map函數無法使用。除此以外,所有Python UDF的限制在此都適用。

目前,預設可使用的第三方庫(包含C)只有NumPy,第三方庫使用詳情請參見使用第三方Python庫

除了調用自訂函數,DataFrame還提供了很多內建函數,這些函數中部分使用了map函數來實現。因此,如果您所在Project未開通Python UDF,則無法使用這些函數(注意:阿里雲公用服務暫不提供對Python UDF的支援)。

說明

由於位元組碼定義的差異,Python 3下使用新語言特性(例如yield from)時,代碼在使用Python 2.7的MaxCompute Worker上執行時會發生錯誤。因此,建議您在Python 3下使用MapReduce API編寫生產作業前,先確認相關代碼是否能正常執行。

樣本程式:使用計數器

from odps.udf import get_execution_context
def h(x):
    ctx = get_execution_context()
    counters = ctx.get_counters()
    counters.get_counter('df', 'add_one').increment(1)
    return x + 1
df.field.map(h)

Logview的JSONSummary中即可找到計數器值。

對一行資料使用自訂函數

如果您需要對一行資料使用自訂函數,可以使用apply方法。參數axis的值必須設為1,表示對行進行操作。apply的自訂函數接收一個參數,參數為上一步Collection的一行資料。您可以通過屬性或者位移獲得一個欄位的資料。

  • reduce為True時,表示返回結果為Sequence,否則返回結果為Collection。 namestypes參數分別指定返回的Sequence或Collection的欄位名和類型。 如果未指定類型,則會預設為STRING類型。

    >>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3)
       sepaladd
    0       8.6
    1       7.9
    2       7.9
  • apply的自訂函數中,reduce為False時,您可以使用yield關鍵字返回多行結果。

    >>> iris.count()
    150
    >>>
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count()
    300
  • 您也可以在函數中注釋返回的欄位和類型,無需在函數調用時再指定它們。

    >>> from odps.df import output
    >>>
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris.apply(handle, axis=1).count()
    300
  • 您也可以使用map-onlymap_reduce,該操作與axis=1apply操作是等價的。

    >>> iris.map_reduce(mapper=handle).count()
    300
  • 如果您想調用MaxCompute上已經存在的UDTF,函數指定為函數名即可。

    >>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
  • 使用apply對行操作,且reduce為False時,您可以使用並列多行輸出與已有的行結合,用於後續彙總等操作。

    >>> from odps.df import output
    >>>
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris[iris.category, iris.apply(handle, axis=1)]

對所有列調用自訂彙總

調用apply方法,當不指定axis,或者axis值為0時,您可以通過傳入一個自訂彙總類對所有Sequence進行彙總操作。

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
   sepallength_aggregation  sepalwidth_aggregation  petallength_aggregation  petalwidth_aggregation
0                 5.843333                   3.054                 3.758667                1.198667
說明

目前,自訂函數無法支援將LIST/DICT類型作為初始輸入或最終輸出結果。

引用資源

自訂函數也能讀取MaxCompute上的資源(表資源或檔案資源),或者引用一個Collection作為資源。此時,自訂函數需要寫成函數閉包或Callable的類。兩個樣本如下。

>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
       sepallength
0      Iris-setosa
1  Iris-versicolor
>>> def myfunc(resources):  # resources按調用順序傳入。
>>>     names = set()
>>>     fileobj = resources[0] # 檔案資源是一個file-like的object。
>>>     for l in fileobj:
>>>         names.add(l)
>>>     collection = resources[1]
>>>     for r in collection:
>>>         names.add(r.name)  # 這裡可以通過欄位名或者位移來取。
>>>     def h(x):
>>>         if x in names:
>>>             return True
>>>         else:
>>>             return False
>>>     return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
              name   isin
0      Iris-setosa   True
1  Iris-versicolor   True
2   Iris-virginica  False
說明

分區表資源在讀取時不包含分區欄位。

axis值為1,即在行上操作時,您需要寫一個函數閉包或者Callable的類。 而對於列上的彙總操作,您只需在 __init__ 函數裡讀取資源即可。

>>> words_df
                     sentence
0                 Hello World
1                Hello Python
2  Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>>     stop_words = set([r[0] for r in resources[0]])
>>>     def h(row):
>>>         return ' '.join(w for w in row[0].split() if w not in stop_words),
>>>     return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
                sentence
0            Hello World
1           Hello Python
2  Life short use Python
說明

這裡的stop_words存放於本地,但在真正執行時會被上傳到MaxCompute作為資源引用。

使用第三方Python庫

您可以把第三方Python包作為資源上傳到MaxCompute,支援的格式有whleggzip以及tar.gz。在全域或者在立即執行的方法時,指定需要使用的包檔案,即可在自訂函數中使用第三方庫。第三方庫的依賴庫,也必須指定,否則依然會導致匯入錯誤。

  • 您可以通過PyODPS的資源上傳介面create_resource來完成資源的上傳。

    下面以python-dateutil包進行舉例:

    1. 使用pip download命令,下載包以及其依賴到某個路徑。下載後會出現兩個包:six-1.10.0-py2.py3-none-any.whlpython_dateutil-2.5.3-py2.py3-none-any.whl(注意:這裡需要下載支援Linux環境的包)。

      $ pip download python-dateutil -d /to/path/
    2. 分別把兩個檔案上傳到MaxCompute資源。

      >>> # 這裡要確保資源名的尾碼是正確的檔案類型。
      >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
      >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
    3. 現在有個DataFrame,只有一個STRING類型欄位。

      >>> df
                     datestr
      0  2016-08-26 14:03:29
      1  2015-08-26 14:03:29
    4. 全域配置使用到的三方庫如下。

      >>> from odps import options
      >>>
      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
      >>> df.datestr.map(get_year)
         datestr
      0     2016
      1     2015

      或者通過運行方法的libraries參數指定使用到的第三方庫。

      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
         datestr
      0     2016
      1     2015

    PyODPS預設支援執行僅包含Python且不含檔案操作的第三方庫。在較新版本的MaxCompute服務下,PyODPS也支援執行帶有二進位代碼或帶有檔案操作的Python庫。這些庫的尾碼必須是cp27-cp27m-manylinux1_x86_64,以archive方式上傳,whl尾碼的包需要重新命名為zip。同時,作業需要開啟odps.isolation.session.enable選項,或者在Project層級開啟isolation。以下樣本為您展示如何上傳並使用scipy中的特殊函數。

    >>> # 對於含有二進位代碼的包,必須使用archive方式上傳資源,whl尾碼需要改為zip。
    >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
    >>>
    >>> # 如果Project開啟了isolation,下面的選項不是必需的。
    >>> options.sql.settings = { 'odps.isolation.session.enable': True }
    >>>
    >>> def psi(value):
    >>>     # 建議在函數內部import第三方庫,以防止不同作業系統下二進位包結構差異造成執行錯誤。
    >>>     from scipy.special import psi
    >>>     return float(psi(value))
    >>>
    >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])

    對於只提供源碼的二進位包,可以在Linux Shell中打包成Wheel再上傳,Mac和Windows中產生的Wheel包無法在MaxCompute中使用。

    python setup.py bdist_wheel
  • 您也可以通過MaxCompute Console上傳資源。

    1. 現在主流的Python包都提供了whl包,提供了各平台包含二進位檔案的包,因此找到可以在MaxCompute上啟動並執行包是第一步。

    2. 其次,要想在MaxCompute上運行,需要包含所有的依賴包,這個是比較繁瑣的。各個包的依賴情況如下表所示。

      包名

      依賴

      pandas

      numpy、python-dateutil、pytz、six

      scipy

      numpy

      scikit-learn

      numpy、scipy

      說明

      其中numpy已包含,您只需上傳python-dateutil、pytz、pandas、scipy、sklearn、six包,pandas、scipy和scikit-learn即可使用。

    3. 您可進入python-dateutil找到python-dateutil-2.6.0.zip進行下載。

    4. 重新命名為python-dateutil.zip,通過MaxCompute Console上傳資源。

      add archive python-dateutil.zip;
      說明

      pytz和six的上傳方式同上,分別找到 pytz-2017.2.zipsix-1.11.0.tar.gz進行下載和上傳資源操作。

    5. 對於Pandas這種包含c的包,需要找到名字中包含cp27-cp27m-manylinux1_x86_64的whl包,這樣才能在MaxCompute上正確執行。因此,您需要找到pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl進行下載,然後把尾碼改成zip,在MaxCompute Console中執行add archive pandas.zip;進行上傳。scipy和scikit-learn包的操作同上。

    所有包需要下載的資源如下表所示。

    包名

    檔案名稱

    上傳資源名

    python-dateutil

    python-dateutil-2.6.0.zip

    python-dateutil.zip

    pytz

    pytz-2017.2.zip

    pytz.zip

    six

    six-1.11.0.tar.gz

    six.tar.gz

    pandas

    pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip

    pandas.zip

    scipy

    scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip

    scipy.zip

    scikit-learn

    scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip

    sklearn.zip

指定第三方Python庫

  • 在全域指定使用的庫。

    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • 在立即執行的方法中,局部指定使用的庫。

    >>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])