全部產品
Search
文件中心

:MapReduce API

更新時間:Aug 15, 2024

本文為您介紹MapReduce API。

PyODPS DataFrame支援MapReduce API,您可以分別編寫mapreduce函數(map_reduce可以只有mapper或者reducer過程)。

wordcount的樣本如下。

>>> #encoding=utf-8
>>> from odps import ODPS
>>> from odps import options
>>> options.verbose = True
>>> o = ODPS('your-access-id', 'your-secret-access-key',project='DMP_UC_dev', endpoint='http://service-corp.odps.aliyun-inc.com/api')
>>> from odps.df import DataFrame

>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> def reducer(keys):
>>>     # 這裡使用list而不是cnt=0,否則h內的cnt會被認為是局部變數,其中的賦值無法輸出。
>>>     cnt = [0]
>>>     def h(row, done):  # done表示這個key已經迭代結束。
>>>         cnt[0] += row[1]
>>>         if done:
>>>             yield keys[0], cnt[0]
>>>     return h
>>> # zx_word_count表只有一列,為STRING類型。
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group=['word', ],
                        mapper_output_names=['word', 'cnt'],
                        mapper_output_types=['string', 'int'],
                        reducer_output_names=['word', 'cnt'],
                        reducer_output_types=['string', 'int'])
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

group參數用於指定reduce按哪些欄位做分組,如果不指定,會按全部欄位做分組。reducer需要接收彙總的keys進行初始化,並能繼續處理按這些keys彙總的每行資料。done表示與這些keys相關的所有行是否都迭代完成。

為了方便,此處寫成了函數閉包的方式,您也可以寫成Callable的類。

class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # done表示這個key已經迭代結束。
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt

使用output進行注釋會讓代碼更簡單。

>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>>     # 此處使用list而不是cnt=0,否則h內的cnt會被認為是局部變數,其中的賦值無法輸出。
>>>     cnt = [0]
>>>     def h(row, done):  # done表示這個key已經迭代結束。
>>>         cnt[0] += row.cnt
>>>         if done:
>>>             yield keys.word, cnt[0]
>>>     return h
>>>
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group='word')
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

在迭代的時候,可以使用sort參數實現按指定列排序,通過ascending參數指定升序降序。ascending參數可以是一個BOOL值,表示所有的sort欄位是相同升序或降序,也可以是一個列表,長度必須和sort欄位長度相同。

指定COMBINER

combiner表示在map_reduce API裡表示在mapper端,就先對資料進行彙總操作,它的用法和reducer是完全一致的,但不能引用資源。 並且,combiner的輸出的欄位名和欄位類型必須和mapper完全一致。

上面的例子,您可以使用reducer作為combinermapper端對資料做初步的彙總,減少Shuffle出去的資料量。

>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

引用資源

在MapReduce API裡,您可以分別指定mapperreducer所要引用的資源。

以下樣本為對mapper裡的單詞做停詞過濾,在reducer裡對白名單的單詞數量加5。

>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>>     stop_words = set(r[0].strip() for r in resources[0])
>>>     def h(row):
>>>         for word in row[0].split():
>>>             if word not in stop_words:
>>>                 yield word, 1
>>>     return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>>     d = dict()
>>>     d['white_list'] = set(word.strip() for word in resources[0])
>>>     d['cnt'] = 0
>>>     def inner(keys):
>>>         d['cnt'] = 0
>>>         def h(row, done):
>>>             d['cnt'] += row.cnt
>>>             if done:
>>>                 if row.word in d['white_list']:
>>>                     d['cnt'] += 5
>>>                 yield keys.word, d['cnt']
>>>         return h
>>>     return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>>                     mapper_resources=[stop_words], reducer_resources=[white_list_file])
     word  cnt
0   hello    2
1    life    1
2  python    7
3   world    6
4   short    1
5     use    1

使用第三方Python庫

使用方法類似在MAP中使用第三方Python庫。

  • 在全域指定使用的庫。

    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • 在立即執行的方法中,局部指定使用的庫。

    >>> df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(libraries=['six.whl', 'python_dateutil.whl'])
    說明

    由於位元組碼定義的差異,Python 3下使用新語言特性(例如yield from)時,代碼在使用 Python 2.7的ODPS Worker上執行時會發生錯誤。因此,建議您在Python 3下使用MapReduce API編寫生產作業前,先確認相關代碼是否能正常執行。

重排資料

當資料在叢集上分布不均勻時,您可以調用reshuffle介面對資料重排。

>>> df1 = df.reshuffle()

預設會按隨機數做雜湊來分布,但您也可以按指定列做分布,並且可以指定重排後的排序次序。

>>> df1.reshuffle('name', sort='id', ascending=False)

布隆過濾器

PyODPS DataFrame提供了bloom_filter介面進行布隆過濾器的計算。

給定某個Collection和它的某個列計算的sequence1,對另外一個sequence2進行布隆過濾時,sequence1不存在於sequence2中的資料一定會被過濾掉, 但可能無法完全過濾。所以這種過濾方式是一種近似的過濾方法。這樣的好處是對Collection進行快速過濾一些無用資料。這在一邊資料量遠大過另一邊資料量(大部分資料並不會參與join運算)時的大規模join情境很有用。例如,在join使用者的瀏覽資料和交易資料時,使用者的瀏覽資料量遠大於交易資料量,可以利用交易資料先對瀏覽資料進行布隆過濾, 然後再join可以很好地提升效能。

>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
       a  b
0  name1  1
1  name2  2
2  name3  3
3  name1  4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
       a
0  name1
>>> df1.bloom_filter('a', df2.a) # 這裡第0個參數可以是個計算運算式如: df1.a + '1'。
       a  b
0  name1  1
1  name1  4

樣本說明:

  • 由於資料量很小,df1中的aname2name3的行都被正確過濾掉了,當資料量很大的時候,可能無法過濾全部資料。

  • 之前提join情境中,少量資料不能被過濾的情況並不會影響資料的正確性,但可以較大地提升join的效能。

  • 您可以傳入capacityerror_rate來設定資料的量以及錯誤率,預設值是30000.01

說明

要注意,調大capacity或者減小error_rate會增加記憶體的使用,所以應當根據實際情況選擇一個合理的值。

Collection對象操作請參見DataFrame執行

透視表(PIVOT_TABLE)

PyODPS DataFrame提供了透視表的功能。樣本的表資料如下。

>>> df
     A    B      C  D  E
0  foo  one  small  1  3
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  6
4  foo  two  small  3  4
5  bar  one  large  4  5
6  bar  one  small  5  3
7  bar  two  small  6  2
8  bar  two  large  7  1
  • 使用透視表功能時,rows參數為必選參數,表示按一個或者多個欄位做取平均值的操作。

    >>> df['A', 'D', 'E'].pivot_table(rows='A')
         A  D_mean  E_mean
    0  bar     5.5    2.75
    1  foo     2.2    4.40
  • rows可以提供多個欄位,表示按多個欄位做彙總。

    >>> df.pivot_table(rows=['A', 'B', 'C'])
         A    B      C  D_mean  E_mean
    0  bar  one  large     4.0     5.0
    1  bar  one  small     5.0     3.0
    2  bar  two  large     7.0     1.0
    3  bar  two  small     6.0     2.0
    4  foo  one  large     2.0     4.5
    5  foo  one  small     1.0     3.0
    6  foo  two  small     3.0     5.0
  • 指定values顯示指定要計算的列。

    >>> df.pivot_table(rows=['A', 'B'], values='D')
         A    B    D_mean
    0  bar  one  4.500000
    1  bar  two  6.500000
    2  foo  one  1.666667
    3  foo  two  3.000000
  • 計算值列時,預設會計算平均值。通過aggfunc指定一個或者多個彙總函式。

    >>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
         A    B    D_mean  D_count  D_sum
    0  bar  one  4.500000        2      9
    1  bar  two  6.500000        2     13
    2  foo  one  1.666667        3      5
    3  foo  two  3.000000        2      6
  • 將未經處理資料的某一列的值,作為新的Collection的列。

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
         A    B  large_D_mean  small_D_mean
    0  bar  one           4.0           5.0
    1  bar  two           7.0           6.0
    2  foo  one           2.0           1.0
    3  foo  two           NaN           3.0
  • 使用fill_value填充空值。

    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
         A    B  large_D_mean  small_D_mean
    0  bar  one             4             5
    1  bar  two             7             6
    2  foo  one             2             1
    3  foo  two             0             3

KEY-VALUE字串轉換

DataFrame提供了將Key-Value對展開為列,以及將普通列轉換為Key-Value列的功能。DataFrame建立及其對象操作請參見建立DataFrame

  • 將Key-Value對展開為列,樣本資料如下。

    >>> df
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1

    通過extract_kv方法將Key-Value欄位展開。

    >>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    NaN   10.0    NaN    NaN
    1  name1    7.0    NaN    NaN    NaN    8.2    NaN
    2  name2    NaN    1.2    1.5    NaN    NaN    NaN
    3  name2    NaN    1.0    NaN    NaN    NaN    1.1

    其中,需要展開的欄位名由columns指定,Key和Value之間的分隔字元,以及Key-Value對之間的分隔字元分別由kv_delimitem_delim這兩個參數指定,預設分別為半形冒號和半形逗號。輸出的欄位名為原欄位名和Key值的組合,通過_相連。缺失值預設為NONE,可通過fill_value選擇需要填充的值。

    >>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    0.0   10.0    0.0    0.0
    1  name1    7.0    0.0    0.0    0.0    8.2    0.0
    2  name2    0.0    1.2    1.5    0.0    0.0    0.0
    3  name2    0.0    1.0    0.0    0.0    0.0    1.1
  • 將多列資料轉換為一個Key-Value列,樣本資料如下。

    >>> df
       name    k1   k2   k3    k5   k7   k9
    0  name1  1.0  3.0  NaN  10.0  NaN  NaN
    1  name1  7.0  NaN  NaN   NaN  8.2  NaN
    2  name2  NaN  1.2  1.5   NaN  NaN  NaN
    3  name2  NaN  1.0  NaN   NaN  NaN  1.1

    通過to_kv方法轉換為Key-Value表示的格式。

    >>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1