全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:聚合操作

更新时间:Sep 05, 2023

本文为您介绍DataFrame支持的聚合操作,以及如何实现分组聚合和编写自定义聚合。DataFrame提供对列进行HyperLogLog计数的接口。

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

常用聚合操作如下:

  • 使用describe函数,查看DataFrame里数字列的数量、最大值、最小值、平均值以及标准差。

    print(iris.describe())

    返回结果如下。

        type  sepal_length  sepal_width  petal_length  petal_width
    0  count    150.000000   150.000000    150.000000   150.000000
    1   mean      5.843333     3.054000      3.758667     1.198667
    2    std      0.828066     0.433594      1.764420     0.763161
    3    min      4.300000     2.000000      1.000000     0.100000
    4    max      7.900000     4.400000      6.900000     2.500000
  • 使用单列执行聚合操作。

    iris.sepallength.max()

    返回结果如下。

    7.9
  • 如果在消除重复后的列上进行聚合,可以先调用unique函数,再调用相应的聚合函数。

    iris.name.unique().cat(sep=',')

    返回结果如下。

    u'Iris-setosa,Iris-versicolor,Iris-virginica'
  • 如果所有列支持同一种聚合操作,可以直接在整个DataFrame上执行聚合操作。

    iris.exclude('category').mean()

    返回结果如下。

       sepal_length  sepal_width  petal_length  petal_width
    1      5.843333     3.054000      3.758667     1.198667
  • 使用count函数获取DataFrame的总行数。

    iris.count()

    返回结果如下。

    150
    说明

    如果需要打印对应数据到日志中,请执行print(iris.count().execute())

PyODPS支持的聚合操作,如下表所示。

聚合操作

说明

count(或size)

数量。

unique

不重复值数量。

min

最小值。

max

最大值。

sum

求和。

mean

均值。

median

中位数。

quantile(p)

p分位数,仅在整数值下可取得准确值。

var

方差。

std

标准差。

moment

n阶中心矩(或n阶矩)。

skew

样本偏度(无偏估计)。

kurtosis

样本峰度(无偏估计)。

cat

按sep做字符串连接操作。

tolist

组合为LIST。

说明

不同于Pandas,对于列上的聚合操作,无论是在MaxCompute还是Pandas后端,PyODPS DataFrame都会忽略空值。这一逻辑与SQL类似。

分组聚合

分组聚合操作如下:

  • DataFrame提供了groupby函数执行分组操作,分组后通过调用agg或者aggregate方法,执行聚合操作。最终的结果列中会包含分组的列和聚合的列。

    iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())

    返回结果如下。

                  name  sepallength_max  smin
    0      Iris-setosa              5.8   4.3
    1  Iris-versicolor              7.0   4.9
    2   Iris-virginica              7.9   4.9
  • DataFrame提供了value_counts函数,按某列分组后,将每个组的个数从大到小进行排列。

    • 使用groupby函数实现。

      iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)

      返回结果如下。

                    name  count
      0   Iris-virginica     50
      1  Iris-versicolor     50
      2      Iris-setosa     50
    • 使用value_counts函数实现。

      iris['name'].value_counts().head(5)

      返回结果如下。

                    name  count
      0   Iris-virginica     50
      1  Iris-versicolor     50
      2      Iris-setosa     50
  • 对于聚合后的单列操作,您也可以直接取出列名。但此时只能使用聚合函数。

    iris.groupby('name').petallength.sum()

    返回结果如下。

       petallength_sum
    0             73.2
    1            213.0
    2            277.6
    iris.groupby('name').agg(iris.petallength.notnull().sum())

    返回结果如下。

                  name  petallength_sum
    0      Iris-setosa               50
    1  Iris-versicolor               50
    2   Iris-virginica               50
  • 分组时也支持对常量进行分组,但是需要使用Scalar初始化。

    from odps.df import Scalar
    iris.groupby(Scalar(1)).petallength.sum()

    返回结果如下。

       petallength_sum
    0            563.8

编写自定义聚合

对字段使用agg或者aggregate方法调用自定义聚合。自定义聚合需要提供一个类,这个类需要提供以下方法:

  • buffer():返回一个Mutable的Object(例如LIST或DICT),buffer大小不应随数据量增大而递增。

  • __call__(buffer, *val):将值聚合到中间buffer

  • merge(buffer, pbuffer):将pbuffer聚合到buffer中。

  • getvalue(buffer):返回最终值。

计算平均值的示例如下。

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
iris.sepalwidth.agg(Agg)

返回结果如下。

3.0540000000000007

编写自定义聚合还需要关注如下内容:

  • 如果最终类型和输入类型发生了变化,则需要指定类型。

    iris.sepalwidth.agg(Agg, 'float')
  • 自定义聚合也可以用在分组聚合中。

    iris.groupby('name').sepalwidth.agg(Agg)

    返回结果如下。

       petallength_aggregation
    0                    3.418
    1                    2.770
    2                    2.974
  • 对多列可以使用agg方法调用自定义聚合。

    class Agg(object):
    
        def buffer(self):
            return [0.0, 0.0]
    
        def __call__(self, buffer, val1, val2):
            buffer[0] += val1
            buffer[1] += val2
    
        def merge(self, buffer, pbuffer):
            buffer[0] += pbuffer[0]
            buffer[1] += pbuffer[1]
    
        def getvalue(self, buffer):
            if buffer[1] == 0:
                return 0.0
            return buffer[0] / buffer[1]
    from odps.df import agg
    to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  # 对两列调用自定义聚合。
    iris.groupby('name').agg(val=to_agg)

    返回结果如下。

                  name       val
    0      Iris-setosa  0.682781
    1  Iris-versicolor  0.466644
    2   Iris-virginica  0.451427
  • 如果您需要调用MaxCompute上已经存在的UDAF,指定函数名即可。

    iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))  # 对单列聚合。
    to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float')
    iris.groupby('name').agg(to_agg.rename('val'))  # 对多列聚合。
    说明

    目前,因受限于Python UDF,自定义聚合无法支持将LIST或DICT类型作为初始输入或最终输出结果。

HyperLogLog计数

DataFrame提供了对列进行HyperLogLog计数的接口hll_count,这个接口是近似个数的估计接口。当数据量很大时,它可以较快地估计去重后的数据量。

使用该接口计算海量用户UV时,可以快速得出估计值。

说明

以下示例使用了Pandas包,您可以在本地环境中运行以下示例,如果在DataWorks环境中运行,您需要先通过三方包的方式导入Pandas包。

from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()

返回结果如下。

63270
df.a.nunique()

返回结果如下。

63250
说明

splitter参数会对每个字段进行分隔,再计算去重后的数据量。