全部產品
Search
文件中心

:彙總操作

更新時間:Feb 28, 2024

本文為您介紹DataFrame支援的彙總操作,以及如何?分組彙總和編寫自訂彙總。DataFrame提供對列進行HyperLogLog計數的介面。

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))

常用彙總操作如下:

  • 使用describe函數,查看DataFrame裡數字列的數量、最大值、最小值、平均值以及標準差。

    print(iris.describe())

    返回結果如下。

        type  sepal_length  sepal_width  petal_length  petal_width
    0  count    150.000000   150.000000    150.000000   150.000000
    1   mean      5.843333     3.054000      3.758667     1.198667
    2    std      0.828066     0.433594      1.764420     0.763161
    3    min      4.300000     2.000000      1.000000     0.100000
    4    max      7.900000     4.400000      6.900000     2.500000
  • 使用單列執行彙總操作。

    iris.sepallength.max()

    返回結果如下。

    7.9
  • 如果在消除重複後的列上進行彙總,可以先調用unique函數,再調用相應的彙總函式。

    iris.name.unique().cat(sep=',')

    返回結果如下。

    u'Iris-setosa,Iris-versicolor,Iris-virginica'
  • 如果所有列支援同一種彙總操作,可以直接在整個DataFrame上執行彙總操作。

    iris.exclude('category').mean()

    返回結果如下。

       sepal_length  sepal_width  petal_length  petal_width
    1      5.843333     3.054000      3.758667     1.198667
  • 使用count函數擷取DataFrame的總行數。

    iris.count()

    返回結果如下。

    150
    說明

    如果需要列印對應資料到日誌中,請執行print(iris.count().execute())

PyODPS支援的彙總操作,如下表所示。

彙總操作

說明

count(或size)

數量。

unique

不重複值數量。

min

最小值。

max

最大值。

sum

求和。

mean

均值。

median

中位元。

quantile(p)

p分位元,僅在整數值下可取得準確值。

var

方差。

std

標準差。

moment

n階中心矩(或n階矩)。

skew

樣本偏度(無偏估計)。

kurtosis

樣本峰度(無偏估計)。

cat

按sep做字串串連操作。

tolist

組合為LIST。

說明

不同於Pandas,對於列上的彙總操作,無論是在MaxCompute還是Pandas後端,PyODPS DataFrame都會忽略空值。這一邏輯與SQL類似。

分組彙總

分組彙總操作如下:

  • DataFrame提供了groupby函數執行分組操作,分組後通過調用agg或者aggregate方法,執行彙總操作。最終的結果列中會包含分組的列和彙總的列。

    iris.groupby('name').agg(iris.sepallength.max(), smin=iris.sepallength.min())

    返回結果如下。

                  name  sepallength_max  smin
    0      Iris-setosa              5.8   4.3
    1  Iris-versicolor              7.0   4.9
    2   Iris-virginica              7.9   4.9
  • DataFrame提供了value_counts函數,按某列分組後,將每個組的個數從大到小進行排列。

    • 使用groupby函數實現。

      iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False).head(5)

      返回結果如下。

                    name  count
      0   Iris-virginica     50
      1  Iris-versicolor     50
      2      Iris-setosa     50
    • 使用value_counts函數實現。

      iris['name'].value_counts().head(5)

      返回結果如下。

                    name  count
      0   Iris-virginica     50
      1  Iris-versicolor     50
      2      Iris-setosa     50
  • 對於彙總後的單列操作,您也可以直接取出列名。但此時只能使用彙總函式。

    iris.groupby('name').petallength.sum()

    返回結果如下。

       petallength_sum
    0             73.2
    1            213.0
    2            277.6
    iris.groupby('name').agg(iris.petallength.notnull().sum())

    返回結果如下。

                  name  petallength_sum
    0      Iris-setosa               50
    1  Iris-versicolor               50
    2   Iris-virginica               50
  • 分組時也支援對常量進行分組,但是需要使用Scalar初始化。

    from odps.df import Scalar
    iris.groupby(Scalar(1)).petallength.sum()

    返回結果如下。

       petallength_sum
    0            563.8

編寫自訂彙總

對欄位使用agg或者aggregate方法調用自訂彙總。自訂彙總需要提供一個類,這個類需要提供以下方法:

  • buffer():返回一個Mutable的Object(例如LIST或DICT),buffer大小不應隨資料量增大而遞增。

  • __call__(buffer, *val):將值彙總到中間buffer

  • merge(buffer, pbuffer):將pbuffer彙總到buffer中。

  • getvalue(buffer):返回最終值。

計算平均值的樣本如下。

class Agg(object):

    def buffer(self):
        return [0.0, 0]

    def __call__(self, buffer, val):
        buffer[0] += val
        buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def getvalue(self, buffer):
        if buffer[1] == 0:
            return 0.0
        return buffer[0] / buffer[1]
iris.sepalwidth.agg(Agg)

返回結果如下。

3.0540000000000007

編寫自訂彙總還需要關注如下內容:

  • 如果最終類型和輸入類型發生了變化,則需要指定類型。

    iris.sepalwidth.agg(Agg, 'float')
  • 自訂彙總也可以用在分組彙總中。

    iris.groupby('name').sepalwidth.agg(Agg)

    返回結果如下。

       petallength_aggregation
    0                    3.418
    1                    2.770
    2                    2.974
  • 對多列可以使用agg方法調用自訂彙總。

    class Agg(object):
    
        def buffer(self):
            return [0.0, 0.0]
    
        def __call__(self, buffer, val1, val2):
            buffer[0] += val1
            buffer[1] += val2
    
        def merge(self, buffer, pbuffer):
            buffer[0] += pbuffer[0]
            buffer[1] += pbuffer[1]
    
        def getvalue(self, buffer):
            if buffer[1] == 0:
                return 0.0
            return buffer[0] / buffer[1]
    from odps.df import agg
    to_agg = agg([iris.sepalwidth, iris.sepallength], Agg, rtype='float')  # 對兩列調用自訂彙總。
    iris.groupby('name').agg(val=to_agg)

    返回結果如下。

                  name       val
    0      Iris-setosa  0.682781
    1  Iris-versicolor  0.466644
    2   Iris-virginica  0.451427
  • 如果您需要調用MaxCompute上已經存在的UDAF,指定函數名即可。

    iris.groupby('name').agg(iris.sepalwidth.agg('your_func'))  # 對單列彙總。
    to_agg = agg([iris.sepalwidth, iris.sepallength], 'your_func', rtype='float')
    iris.groupby('name').agg(to_agg.rename('val'))  # 對多列彙總。
    說明

    目前,因受限於Python UDF,自訂彙總無法支援將LIST或DICT類型作為初始輸入或最終輸出結果。

HyperLogLog計數

DataFrame提供了對列進行HyperLogLog計數的介面hll_count,這個介面是近似個數的估計介面。當資料量很大時,它可以較快地估計去重後的資料量。

使用該介面計算海量使用者UV時,可以快速得出估計值。

說明

以下樣本使用了Pandas包,您可以在本地環境中運行以下樣本,如果在DataWorks環境中運行,您需要先通過三方包的方式匯入Pandas包。

from odps.df import DataFrame
import pandas as pd
import numpy as np
df = DataFrame(pd.DataFrame({'a': np.random.randint(100000, size=100000)}))
df.a.hll_count()

返回結果如下。

63270
df.a.nunique()

返回結果如下。

63250
說明

splitter參數會對每個欄位進行分隔,再計算去重後的資料量。