This topic describes the aggregation operations that are supported by Alibaba Cloud MaxCompute SDK for Python (PyODPS) DataFrames and describes how to group and aggregate data and write custom aggregations. PyODPS DataFrames also provide a HyperLogLog operation for you to count the number of distinct values in columns.
from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
You can perform the following common aggregation operations on the preceding DataFrame:
Call the
describe
function to view the quantity, maximum value, minimum value, mean value, and standard deviation of numerical columns in the DataFrame.print(iris.describe())
The following result is returned:
type sepal_length sepal_width petal_length petal_width 0 count 150.000000 150.000000 150.000000 150.000000 1 mean 5.843333 3.054000 3.758667 1.198667 2 std 0.828066 0.433594 1.764420 0.763161 3 min 4.300000 2.000000 1.000000 0.100000 4 max 7.900000 4.400000 6.900000 2.500000
Perform aggregation operations in a single column.
iris.sepallength.max()
The following result is returned:
7.9
To aggregate over a distinct sequence of data records, call the
unique
function before you call the related aggregate function.iris.name.unique().cat(sep=',')
The following result is returned:
u'Iris-setosa,Iris-versicolor,Iris-virginica'
If all columns support the same aggregation operation, perform this aggregation operation on the entire DataFrame.
iris.exclude('category').mean()
The following result is returned:
sepal_length sepal_width petal_length petal_width 1 5.843333 3.054000 3.758667 1.198667
Call the count function to calculate the total number of rows in the DataFrame.
iris.count()
The following result is returned:
150
NoteIf you want to display the result in logs, run the
print(iris.count().execute())
command.
The following table describes the aggregation operations that PyODPS supports.
Aggregation | Description |
count or size | Calculates the number of rows. |
unique | Calculates the number of distinct values. |
min | Calculates the minimum value. |
max | Calculates the maximum value. |
sum | Calculates the total sum of specified values. |
mean | Calculates the mean value. |
median | Calculates the median value. |
quantile(p) | Calculates the p-quantile. This function returns accurate results only if integers are calculated. |
var | Calculates the variance. |
std | Calculates the standard deviation. |
moment | Calculates the Nth central moment or the Nth moment. |
skew | Calculates the sample skewness. This function returns unbiased estimation results. |
kurtosis | Calculates the sample kurtosis. This function returns unbiased estimation results. |
cat | Concatenates character strings with a separator. |
tolist | Aggregates a column into a list. |
PyODPS DataFrames ignore null values of aggregation operations on columns in MaxCompute and the pandas backend. This is different from pandas DataFrames but is similar to the logic of SQL.
Group and aggregate data
You can use the following methods to group and aggregate data:
PyODPS DataFrames provide the
groupby
function to group data. After the data is grouped, call theagg
oraggregate
function to aggregate the data. The result columns include the grouped column and the aggregated column.iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())
The following result is returned:
name sepallength_max smin 0 Iris-setosa 5.8 4.3 1 Iris-versicolor 7.0 4.9 2 Iris-virginica 7.9 4.9
PyODPS DataFrame provides the
value_counts
function. After the data is grouped based on a specified column, you can sort the groups in descending order based on the number of distinct values in each group.Call the
groupby
function.iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)
The following result is returned:
name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50
Call the
value_counts
function.iris['name'].value_counts().head(5)
The following result is returned:
name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50
You can retrieve the column name of a single aggregated column. However, this operation restricts you to use only aggregate functions to manage the values in the aggregated column.
iris.groupby('name').petallength.sum()
The following result is returned:
petallength_sum 0 73.2 1 213.0 2 277.6
iris.groupby('name').agg(iris.petallength.notnull().sum())
The following result is returned:
name petallength_sum 0 Iris-setosa 50 1 Iris-versicolor 50 2 Iris-virginica 50
You can also group data by constant value. This operation requires Scalar initialization.
from odps.df import Scalar iris.groupby(Scalar(1)).petallength.sum()
The following result is returned:
petallength_sum 0 563.8
Write custom aggregations
Use the agg
or aggregate
function to call custom aggregations on columns. A custom aggregation requires a class to provide the following methods:
buffer()
: returns a mutable object such as LIST or DICT. Thebuffer
size must not increase with the amount of data.__call__(buffer, *val)
: aggregates values tobuffer
.merge(buffer, pbuffer)
: aggregatespbuffer
tobuffer
.getvalue(buffer)
: returns the final value.
The following sample code provides an example on how to calculate the mean value.
class Agg(object):
def buffer(self):
return [0.0, 0]
def __call__(self, buffer, val):
buffer[0] += val
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def getvalue(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
iris.sepalwidth.agg(Agg)
The following result is returned:
3.0540000000000007
When you write custom aggregations, take note of the following points:
If the data type of the output is different from the data type of the input, you must specify the data type for the output.
iris.sepalwidth.agg(Agg, 'float')
You can use custom aggregations to group and aggregate data.
iris.groupby('name').sepalwidth.agg(Agg)
The following result is returned:
petallength_aggregation 0 3.418 1 2.770 2 2.974
You can use the
agg
function to call custom aggregations on multiple columns.class Agg(object): def buffer(self): return [0.0, 0.0] def __call__(self, buffer, val1, val2): buffer[0] += val1 buffer[1] += val2 def merge(self, buffer, pbuffer): buffer[0] += pbuffer[0] buffer[1] += pbuffer[1] def getvalue(self, buffer): if buffer[1] == 0: return 0.0 return buffer[0] / buffer[1]
from odps.df import agg to_agg = agg([iris.sepalwidth, iris.sepallength], Agg2, rtype='float') # Call a user-defined aggregate function (UDAF) to aggregate data in two columns. iris.groupby('name').agg(val=to_agg)
The following result is returned:
name val 0 Iris-setosa 0.682781 1 Iris-versicolor 0.466644 2 Iris-virginica 0.451427
To call an existing UDAF in MaxCompute, you need to only specify the name of the UDAF.
iris.groupby('name').agg(iris.sepalwidth.agg('your_func')) # Aggregate the values in a single column. to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float') iris.groupby('name').agg(to_agg.rename('val')) # Aggregate the values in multiple columns.
NoteDue to the limits of Python user-defined functions (UDFs), you cannot specify the LIST or DICT type as the input or output data type for custom aggregations.
HyperLogLog counting
PyODPS DataFrame provides the hll_count
operation, which is a HyperLogLog operation. You can call this operation to count the number of distinct values in a column. This operation returns an estimated number. If a large amount of data is calculated, you can call this operation to estimate the number of distinct values.
For example, you can call this operation to calculate the number of unique visitors (UVs) and obtain an estimated number in a short period of time.
The following example uses a Pandas package. You can run the code in this example in an on-premises environment. If you run the code in a DataWorks environment, you must import the Pandas package by using a third-party package.
from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()
The following result is returned:
63270
df.a.nunique()
The following result is returned:
63250
The splitter
parameter is used to split columns and calculate the number of distinct values.