本文為您介紹如何使用自訂函數及Python第三方庫。
使用自訂函數
DataFrame函數支援對Sequence使用map
,它會對它的每個元素調用自訂函數。
>>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0
目前,自訂函數無法支援將List/Dict類型作為輸入或輸出。
如果map
前後,Sequence的類型發生了變化,則需要顯式指定map
後的類型。
>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0
如果在函數中包含閉包,則函數外閉包變數值的變化會引起函數內該變數值的變化。
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))
結果為dfs
中每個SequenceExpr
均為df.sepal_length+9
。為解決此問題,可以將函數作為另一函數的傳回值,或者使用partial
。兩個樣本如下。
>>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))
map
也支援使用現有的UDF函數,傳入的參數是str類型(函數名)或者Function對象,詳情請參見函數。
map
傳入Python函數的實現使用了MaxCompute Python UDF。因此,如果您所在的Project不支援Python UDF,則map
函數無法使用。除此以外,所有Python UDF的限制在此都適用。
目前,預設可使用的第三方庫(包含C)只有NumPy,第三方庫使用詳情請參見使用第三方Python庫。
除了調用自訂函數,DataFrame還提供了很多內建函數,這些函數中部分使用了map
函數來實現。因此,如果您所在Project未開通Python UDF,則無法使用這些函數(注意:阿里雲公用服務暫不提供對Python UDF的支援)。
由於位元組碼定義的差異,Python 3下使用新語言特性(例如yield from
)時,代碼在使用Python 2.7的MaxCompute Worker上執行時會發生錯誤。因此,建議您在Python 3下使用MapReduce API編寫生產作業前,先確認相關代碼是否能正常執行。
樣本程式:使用計數器
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)
Logview的JSONSummary中即可找到計數器值。
對一行資料使用自訂函數
如果您需要對一行資料使用自訂函數,可以使用apply
方法。參數axis
的值必須設為1,表示對行進行操作。apply
的自訂函數接收一個參數,參數為上一步Collection的一行資料。您可以通過屬性或者位移獲得一個欄位的資料。
reduce
為True時,表示返回結果為Sequence,否則返回結果為Collection。names
和types
參數分別指定返回的Sequence或Collection的欄位名和類型。 如果未指定類型,則會預設為STRING類型。>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3) sepaladd 0 8.6 1 7.9 2 7.9
在
apply
的自訂函數中,reduce
為False時,您可以使用yield
關鍵字返回多行結果。>>> iris.count() 150 >>> >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count() 300
您也可以在函數中注釋返回的欄位和類型,無需在函數調用時再指定它們。
>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1).count() 300
您也可以使用
map-only
的map_reduce
,該操作與axis=1
的apply
操作是等價的。>>> iris.map_reduce(mapper=handle).count() 300
如果您想調用MaxCompute上已經存在的UDTF,函數指定為函數名即可。
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
使用
apply
對行操作,且reduce
為False時,您可以使用並列多行輸出與已有的行結合,用於後續彙總等操作。>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris[iris.category, iris.apply(handle, axis=1)]
對所有列調用自訂彙總
調用apply
方法,當不指定axis
,或者axis
值為0時,您可以通過傳入一個自訂彙總類對所有Sequence進行彙總操作。
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667
目前,自訂函數無法支援將LIST/DICT類型作為初始輸入或最終輸出結果。
引用資源
自訂函數也能讀取MaxCompute上的資源(表資源或檔案資源),或者引用一個Collection作為資源。此時,自訂函數需要寫成函數閉包或Callable的類。兩個樣本如下。
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor
>>> def myfunc(resources): # resources按調用順序傳入。
>>> names = set()
>>> fileobj = resources[0] # 檔案資源是一個file-like的object。
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # 這裡可以通過欄位名或者位移來取。
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica False
分區表資源在讀取時不包含分區欄位。
當axis
值為1,即在行上操作時,您需要寫一個函數閉包或者Callable的類。 而對於列上的彙總操作,您只需在 __init__
函數裡讀取資源即可。
>>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use Python
這裡的stop_words
存放於本地,但在真正執行時會被上傳到MaxCompute作為資源引用。
使用第三方Python庫
您可以把第三方Python包作為資源上傳到MaxCompute,支援的格式有whl、egg、zip以及tar.gz。在全域或者在立即執行的方法時,指定需要使用的包檔案,即可在自訂函數中使用第三方庫。第三方庫的依賴庫,也必須指定,否則依然會導致匯入錯誤。
您可以通過PyODPS的資源上傳介面create_resource來完成資源的上傳。
下面以python-dateutil包進行舉例:
使用
pip download
命令,下載包以及其依賴到某個路徑。下載後會出現兩個包:six-1.10.0-py2.py3-none-any.whl和python_dateutil-2.5.3-py2.py3-none-any.whl(注意:這裡需要下載支援Linux環境的包)。$ pip download python-dateutil -d /to/path/
分別把兩個檔案上傳到MaxCompute資源。
>>> # 這裡要確保資源名的尾碼是正確的檔案類型。 >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
現在有個DataFrame,只有一個STRING類型欄位。
>>> df datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29
全域配置使用到的三方庫如下。
>>> from odps import options >>> >>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> options.df.libraries = ['six.whl', 'python_dateutil.whl'] >>> df.datestr.map(get_year) datestr 0 2016 1 2015
或者通過運行方法的
libraries
參數指定使用到的第三方庫。>>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl']) datestr 0 2016 1 2015
PyODPS預設支援執行僅包含Python且不含檔案操作的第三方庫。在較新版本的MaxCompute服務下,PyODPS也支援執行帶有二進位代碼或帶有檔案操作的Python庫。這些庫的尾碼必須是cp27-cp27m-manylinux1_x86_64,以
archive
方式上傳,whl尾碼的包需要重新命名為zip。同時,作業需要開啟odps.isolation.session.enable
選項,或者在Project層級開啟isolation
。以下樣本為您展示如何上傳並使用scipy
中的特殊函數。>>> # 對於含有二進位代碼的包,必須使用archive方式上傳資源,whl尾碼需要改為zip。 >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb')) >>> >>> # 如果Project開啟了isolation,下面的選項不是必需的。 >>> options.sql.settings = { 'odps.isolation.session.enable': True } >>> >>> def psi(value): >>> # 建議在函數內部import第三方庫,以防止不同作業系統下二進位包結構差異造成執行錯誤。 >>> from scipy.special import psi >>> return float(psi(value)) >>> >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])
對於只提供源碼的二進位包,可以在Linux Shell中打包成Wheel再上傳,Mac和Windows中產生的Wheel包無法在MaxCompute中使用。
python setup.py bdist_wheel
您也可以通過MaxCompute Console上傳資源。
現在主流的Python包都提供了whl包,提供了各平台包含二進位檔案的包,因此找到可以在MaxCompute上啟動並執行包是第一步。
其次,要想在MaxCompute上運行,需要包含所有的依賴包,這個是比較繁瑣的。各個包的依賴情況如下表所示。
包名
依賴
pandas
numpy、python-dateutil、pytz、six
scipy
numpy
scikit-learn
numpy、scipy
說明其中numpy已包含,您只需上傳python-dateutil、pytz、pandas、scipy、sklearn、six包,pandas、scipy和scikit-learn即可使用。
您可進入python-dateutil找到python-dateutil-2.6.0.zip進行下載。
重新命名為python-dateutil.zip,通過MaxCompute Console上傳資源。
add archive python-dateutil.zip;
說明pytz和six的上傳方式同上,分別找到 pytz-2017.2.zip和six-1.11.0.tar.gz進行下載和上傳資源操作。
對於Pandas這種包含c的包,需要找到名字中包含cp27-cp27m-manylinux1_x86_64的whl包,這樣才能在MaxCompute上正確執行。因此,您需要找到pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl進行下載,然後把尾碼改成zip,在MaxCompute Console中執行
add archive pandas.zip;
進行上傳。scipy和scikit-learn包的操作同上。
所有包需要下載的資源如下表所示。
包名
檔案名稱
上傳資源名
python-dateutil
python-dateutil.zip
pytz
pytz.zip
six
six.tar.gz
pandas
pandas.zip
scipy
scipy.zip
scikit-learn
sklearn.zip
指定第三方Python庫
在全域指定使用的庫。
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
在立即執行的方法中,局部指定使用的庫。
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])