This topic describes how to use user-defined functions (UDFs) and the third-party Python libraries.
Use UDFs
map
method for a Sequence object to call UDFs on all of its elements. >>> iris.sepallength.map(lambda x: x + 1).head(5)
sepallength
0 6.1
1 5.9
2 5.7
3 5.6
4 6.0
map
method is executed, you need to explicitly specify the new type of the Sequence.>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
sepallength
0 t5.1
1 t4.9
2 t4.7
3 t4.6
4 t5.0
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(lambda x: x + i))
SequenceExpr
object in dfs
is df.sepal_length + 9
. To solve this problem, you can use the function as the return value of another function,
or use partial
. See the following two examples. >>> dfs = []
>>> def get_mapper(i):
>>> return lambda x: x + i
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>> dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))
The map
method also supports existing UDFs. You can pass in str type parameters, which represent
function names, or Function objects. For more information, see Functions.
When you implement the map
method for Python functions, you need to use MaxCompute Python UDF. If your project
does not support Python UDFs, you cannot use the map
method. In addition, all Python UDF limits apply.
The only available third-party library (contains the code in C language) is NumPy. For more information about how to use the third-party Python libraries, see Use the third-party Python libraries.
In addition to UDFs, DataFrame provides many built-in functions, some of which are
implemented by using the map
function. Therefore, if your project does not support Python UDF, you cannot use
these functions. Note that Alibaba Cloud public services do not support Python UDF.
yield from
), errors may occur when the code is executed on MaxCompute Worker of Python 2.7.
Therefore, we recommend that you make sure your code runs normally before you write
production code by using MapReduce API in Python 3.
from odps.udf import get_execution_context
def h(x):
ctx = get_execution_context()
counters = ctx.get_counters()
counters.get_counter('df', 'add_one').increment(1)
return x + 1
df.field.map(h)
You can find the counter value in JSONSummary of LogView.Use UDFs for one row
apply
method. The axis
parameter must be set to 1 to indicate that the operation is performed on the row.
The UDF of the apply
method takes a parameter, which is a row of data from the preceding Collection object.
You can retrieve the data in a field by using the attribute or offset.
- If
the reduce parameter is set to True, a Sequence object is returned. Otherwise, a Collection object is returned. You can use the
names
andtypes
parameters to specify the field names and types of the returned Sequence or Collection object. By default, if you do not specify a type, STRING type is used.>>> iris.apply(lambda row: row.sepallength + row.sepalwidth, axis=1, reduce=True, types='float').rename('sepaladd').head(3) sepaladd 0 8.6 1 7.9 2 7.9
- If
reduce
is False in the UDF of theapply
method, you can use theyield
keyword to return multiple rows of results.>>> iris.count() 150 >>> >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1, names=['iris_add', 'iris_sub'], types=['float', 'float']).count() 300
- You can also annotate the returned field names and types in the function. You do not
need to specify them when you call the the function.
>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1).count() 300
- You can also use
map_reduce
ofmap-only
, which is equivalent to theapply
method withaxis=1
.>>> iris.map_reduce(mapper=handle).count() 300
- To use an existing user-defined table function (UDTF) in MaxCompute, specify the name
of the UDTF.
>>> iris['name', 'sepallength'].apply('your_func', axis=1, names=['name2', 'sepallength2'], types=['string', 'float'])
- When you use the
apply
method for rows andreduce
is False, you can use the lateral view with the existing rows for purposes such as aggregation.>>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris[iris.category, iris.apply(handle, axis=1)]
Use custom aggregations for all columns
apply
method, if axis
is not specified or the value of axis
is 0, you can pass in a custom aggregation class to aggregate all Sequence objects.
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
>>> iris.exclude('name').apply(Agg)
sepallength_aggregation sepalwidth_aggregation petallength_aggregation petalwidth_aggregation
0 5.843333 3.054 3.758667 1.198667
Reference resources
>>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
>>>
>>> iris_names_collection = iris.distinct('name')[:2]
>>> iris_names_collection
sepallength
0 Iris-setosa
1 Iris-versicolor
>>> def myfunc(resources): # resources are passed in by calling order
>>> names = set()
>>> fileobj = resources[0] # file resources are represented by a file-like object
>>> for l in fileobj:
>>> names.add(l)
>>> collection = resources[1]
>>> for r in collection:
>>> names.add(r.name) # retrieve the values by using the field name or offset
>>> def h(x):
>>> if x in names:
>>> return True
>>> else:
>>> return False
>>> return h
>>>
>>> df = iris.distinct('name')
>>> df = df[df.name,
>>> df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
>>>
>>> df
name isin
0 Iris-setosa True
1 Iris-versicolor True
2 Iris-virginica False
axis
is 1 in a row operation, you need to write a function closure or callable class.
For aggregation operations on columns, you only need to use the __init__
function to read resources. >>> words_df
sentence
0 Hello World
1 Hello Python
2 Life is short I use Python
>>>
>>> import pandas as pd
>>> stop_words = DataFrame(pd.DataFrame({'stops': ['is', 'a', 'I']}))
>>>
>>> @output(['sentence'], ['string'])
>>> def filter_stops(resources):
>>> stop_words = set([r[0] for r in resources[0]])
>>> def h(row):
>>> return ' '.join(w for w in row[0].split() if w not in stop_words),
>>> return h
>>>
>>> words_df.apply(filter_stops, axis=1, resources=[stop_words])
sentence
0 Hello World
1 Hello Python
2 Life short use Python
stop_words
is a local variable, which is referenced as a resource in MaxCompute during execution.
Use the third-party Python libraries
You can upload the third-party Python packages as resources to MaxCompute. Supported formats include whl, egg, zip, and tar.gz. When you use a global method or an immediately-invoked method, you need to specify the packages you need before you can use the third-party libraries in UDFs. You also need to specify the dependencies of the third-party libraries. Otherwise, errors may occur when you import the files.
- You can upload resources by calling the PyODPS resource upload API
create_resource
operation.Take a python-dateutil package as an example.- You can run the
pip download
command to download the package and its dependencies to a specific path. Two packages are downloaded: six-1.10.0-py2.py3-none-any.whl and python_dateutil-2.5.3-py2.py3-none-any.whl. Note that the downloaded packages must support the Linux environment.$ pip download python-dateutil -d /to/path/
- Then upload the two files to MaxCompute as resources.
>>> # make sure that file extensions are correct. >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
- Now you have a DataFrame object that only contains a STRING field.
>>> df datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29
- Use the following third-party libraries in global configurations.
Alternatively, use the>>> from odps import options >>> >>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> options.df.libraries = ['six.whl', 'python_dateutil.whl'] >>> df.datestr.map(get_year) datestr 0 2016 1 2015
libraries
parameter of a method to specify the third-party libraries.>>> def get_year(t): >>> from dateutil.parser import parse >>> return parse(t).strftime('%Y') >>> >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl']) datestr 0 2016 1 2015
By default, PyODPS supports the third-party libraries that contain pure Python code but no file operations. In later versions of MaxCompute, PyODPS also supports Python libraries that contain binary code or file operations. These libraries must be suffixed with cp27-cp27m-manylinux1_x86_64 and uploaded as archives. The .whl packages must be renamed to .zip files. You also need to set odps.isolation.session.enable to True or enableisolation
in your project. The following example shows how to upload and use the special functions inscipy
.>>> # packages containing binary code must be uploaded by using the archive method, and the file extension .whl must be replaced with .zip. >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb')) >>> >>> # if isolation is enabled for your project, the following option is optional. >>> options.sql.settings = { 'odps.isolation.session.enable': True } >>> >>> def psi(value): >>> # we recommend that you import the third-party libraries inside the function to avoid the errors caused by the varying structure of a binary package between different operating systems. >>> from scipy.special import psi >>> return float(psi(value)) >>> >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])
Binary packages that only contain source code can be packed into Wheel files and uploaded in Linux Shell. The Wheel files generated in Mac and Windows cannot be used in MaxCompute.python setup.py bdist_wheel
- You can run the
- You can also use MaxCompute Console to upload resources.
- Most of the Python packages are supplied with .whl packages, including the packages containing binary files on various platforms. Therefore, you need to first find the packages that can run on MaxCompute.
- In addition, if you want to use a third-party package in MaxCompute, all dependencies
must be included, which is complex. The following table lists the dependencies of
the packages.
Package name Dependencies pandas numpy, python-dateutil, pytz, six scipy numpy scikit-learn numpy, scipy Note The numpy package is already provided. You only need to upload python-dateutil, pytz, pandas, SciPy, sklearn, and six packages to run the pandas, scipy, and scikit-learn packages. - You can find python-dateutil-2.6.0.zip in python-dateutils and download it.
- Rename the downloaded file to python-dateutil.zip, and upload the file as a resource
in the MaxCompute console.
add archive python-dateutil.zip;
Note Find, download, and upload pytz-2017.2.zip and six-1.11.0.tar.gz files in the same way as you do for the python-dateutil file. - To ensure that the packages that contain code in C language, such as Pandas, run on
MaxCompute normally, you need to find the .whl package whose name contains cp27-cp27m-manylinux1_x86_64.
Therefore, you need to find and download pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl, change its extension to .zip, and run
add archive pandas.zip;
on MaxCompute Console to upload it. Upload scipy and scikit-learn packages to MaxCompute in the same way as you do in the preceding procedure.
The following table lists the resources to be downloaded for all packages.Package name File name Name of resource for upload python-dateutil python-dateutil-2.6.0.zip python-dateutil.zip pytz pytz-2017.2.zip pytz.zip six six-1.11.0.tar.gz six.tar.gz pandas pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip pandas.zip scipy scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip scipy.zip scikit-learn scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip sklearn.zip
Specify the third-party Python libraries
- Globally specify the libraries to be used.
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
- In an immediately-invoked method, locally specify the libraries to be used.
>>> df.apply(my_func, axis=1).to_pandas(libraries=['six.whl', 'python_dateutil.whl'])