本文為您介紹MapReduce API。
PyODPS DataFrame支援MapReduce API,您可以分別編寫map
和reduce
函數(map_reduce
可以只有mapper
或者reducer
過程)。
wordcount
的樣本如下。
>>> #encoding=utf-8
>>> from odps import ODPS
>>> from odps import options
>>> options.verbose = True
>>> o = ODPS('your-access-id', 'your-secret-access-key',project='DMP_UC_dev', endpoint='http://service-corp.odps.aliyun-inc.com/api')
>>> from odps.df import DataFrame
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> def reducer(keys):
>>> # 這裡使用list而不是cnt=0,否則h內的cnt會被認為是局部變數,其中的賦值無法輸出。
>>> cnt = [0]
>>> def h(row, done): # done表示這個key已經迭代結束。
>>> cnt[0] += row[1]
>>> if done:
>>> yield keys[0], cnt[0]
>>> return h
>>> # zx_word_count表只有一列,為STRING類型。
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group=['word', ],
mapper_output_names=['word', 'cnt'],
mapper_output_types=['string', 'int'],
reducer_output_names=['word', 'cnt'],
reducer_output_types=['string', 'int'])
word cnt
0 are 1
1 day 1
2 doing? 1
3 everybody 1
4 first 1
5 hello 2
6 how 1
7 is 1
8 so 1
9 the 1
10 this 1
11 world 1
12 you 1
group
參數用於指定reduce
按哪些欄位做分組,如果不指定,會按全部欄位做分組。reducer
需要接收彙總的keys
進行初始化,並能繼續處理按這些keys
彙總的每行資料。done
表示與這些keys
相關的所有行是否都迭代完成。
為了方便,此處寫成了函數閉包的方式,您也可以寫成Callable
的類。
class reducer(object):
def __init__(self, keys):
self.cnt = 0
def __call__(self, row, done): # done表示這個key已經迭代結束。
self.cnt += row.cnt
if done:
yield row.word, self.cnt
使用output
進行注釋會讓代碼更簡單。
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> # 此處使用list而不是cnt=0,否則h內的cnt會被認為是局部變數,其中的賦值無法輸出。
>>> cnt = [0]
>>> def h(row, done): # done表示這個key已經迭代結束。
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>>
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group='word')
word cnt
0 are 1
1 day 1
2 doing? 1
3 everybody 1
4 first 1
5 hello 2
6 how 1
7 is 1
8 so 1
9 the 1
10 this 1
11 world 1
12 you 1
在迭代的時候,可以使用sort
參數實現按指定列排序,通過ascending
參數指定升序降序。ascending
參數可以是一個BOOL值,表示所有的sort
欄位是相同升序或降序,也可以是一個列表,長度必須和sort
欄位長度相同。
指定COMBINER
combiner
表示在map_reduce
API裡表示在mapper
端,就先對資料進行彙總操作,它的用法和reducer
是完全一致的,但不能引用資源。 並且,combiner
的輸出的欄位名和欄位類型必須和mapper
完全一致。
上面的例子,您可以使用reducer
作為combiner
在mapper
端對資料做初步的彙總,減少Shuffle出去的資料量。
>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')
引用資源
在MapReduce API裡,您可以分別指定mapper
和reducer
所要引用的資源。
以下樣本為對mapper
裡的單詞做停詞過濾,在reducer
裡對白名單的單詞數量加5。
>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>> stop_words = set(r[0].strip() for r in resources[0])
>>> def h(row):
>>> for word in row[0].split():
>>> if word not in stop_words:
>>> yield word, 1
>>> return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>> d = dict()
>>> d['white_list'] = set(word.strip() for word in resources[0])
>>> d['cnt'] = 0
>>> def inner(keys):
>>> d['cnt'] = 0
>>> def h(row, done):
>>> d['cnt'] += row.cnt
>>> if done:
>>> if row.word in d['white_list']:
>>> d['cnt'] += 5
>>> yield keys.word, d['cnt']
>>> return h
>>> return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>> mapper_resources=[stop_words], reducer_resources=[white_list_file])
word cnt
0 hello 2
1 life 1
2 python 7
3 world 6
4 short 1
5 use 1
使用第三方Python庫
使用方法類似在MAP中使用第三方Python庫。
在全域指定使用的庫。
>>> from odps import options >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
在立即執行的方法中,局部指定使用的庫。
>>> df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(libraries=['six.whl', 'python_dateutil.whl'])
說明由於位元組碼定義的差異,Python 3下使用新語言特性(例如
yield from
)時,代碼在使用 Python 2.7的ODPS Worker上執行時會發生錯誤。因此,建議您在Python 3下使用MapReduce API編寫生產作業前,先確認相關代碼是否能正常執行。
重排資料
當資料在叢集上分布不均勻時,您可以調用reshuffle
介面對資料重排。
>>> df1 = df.reshuffle()
預設會按隨機數做雜湊來分布,但您也可以按指定列做分布,並且可以指定重排後的排序次序。
>>> df1.reshuffle('name', sort='id', ascending=False)
布隆過濾器
PyODPS DataFrame提供了bloom_filter
介面進行布隆過濾器的計算。
給定某個Collection和它的某個列計算的sequence1
,對另外一個sequence2
進行布隆過濾時,sequence1
不存在於sequence2
中的資料一定會被過濾掉, 但可能無法完全過濾。所以這種過濾方式是一種近似的過濾方法。這樣的好處是對Collection進行快速過濾一些無用資料。這在一邊資料量遠大過另一邊資料量(大部分資料並不會參與join
運算)時的大規模join
情境很有用。例如,在join
使用者的瀏覽資料和交易資料時,使用者的瀏覽資料量遠大於交易資料量,可以利用交易資料先對瀏覽資料進行布隆過濾, 然後再join
可以很好地提升效能。
>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
a b
0 name1 1
1 name2 2
2 name3 3
3 name1 4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
a
0 name1
>>> df1.bloom_filter('a', df2.a) # 這裡第0個參數可以是個計算運算式如: df1.a + '1'。
a b
0 name1 1
1 name1 4
樣本說明:
由於資料量很小,
df1
中的a
為name2
和name3
的行都被正確過濾掉了,當資料量很大的時候,可能無法過濾全部資料。之前提
join
情境中,少量資料不能被過濾的情況並不會影響資料的正確性,但可以較大地提升join
的效能。您可以傳入
capacity
和error_rate
來設定資料的量以及錯誤率,預設值是3000
和0.01
。
要注意,調大capacity
或者減小error_rate
會增加記憶體的使用,所以應當根據實際情況選擇一個合理的值。
Collection對象操作請參見DataFrame執行。
透視表(PIVOT_TABLE)
PyODPS DataFrame提供了透視表的功能。樣本的表資料如下。
>>> df
A B C D E
0 foo one small 1 3
1 foo one large 2 4
2 foo one large 2 5
3 foo two small 3 6
4 foo two small 3 4
5 bar one large 4 5
6 bar one small 5 3
7 bar two small 6 2
8 bar two large 7 1
使用透視表功能時,
rows
參數為必選參數,表示按一個或者多個欄位做取平均值的操作。>>> df['A', 'D', 'E'].pivot_table(rows='A') A D_mean E_mean 0 bar 5.5 2.75 1 foo 2.2 4.40
rows
可以提供多個欄位,表示按多個欄位做彙總。>>> df.pivot_table(rows=['A', 'B', 'C']) A B C D_mean E_mean 0 bar one large 4.0 5.0 1 bar one small 5.0 3.0 2 bar two large 7.0 1.0 3 bar two small 6.0 2.0 4 foo one large 2.0 4.5 5 foo one small 1.0 3.0 6 foo two small 3.0 5.0
指定
values
顯示指定要計算的列。>>> df.pivot_table(rows=['A', 'B'], values='D') A B D_mean 0 bar one 4.500000 1 bar two 6.500000 2 foo one 1.666667 3 foo two 3.000000
計算值列時,預設會計算平均值。通過
aggfunc
指定一個或者多個彙總函式。>>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum']) A B D_mean D_count D_sum 0 bar one 4.500000 2 9 1 bar two 6.500000 2 13 2 foo one 1.666667 3 5 3 foo two 3.000000 2 6
將未經處理資料的某一列的值,作為新的Collection的列。
>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C') A B large_D_mean small_D_mean 0 bar one 4.0 5.0 1 bar two 7.0 6.0 2 foo one 2.0 1.0 3 foo two NaN 3.0
使用
fill_value
填充空值。>>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0) A B large_D_mean small_D_mean 0 bar one 4 5 1 bar two 7 6 2 foo one 2 1 3 foo two 0 3
KEY-VALUE字串轉換
DataFrame提供了將Key-Value對展開為列,以及將普通列轉換為Key-Value列的功能。DataFrame建立及其對象操作請參見建立DataFrame。
將Key-Value對展開為列,樣本資料如下。
>>> df name kv 0 name1 k1=1,k2=3,k5=10 1 name1 k1=7.1,k7=8.2 2 name2 k2=1.2,k3=1.5 3 name2 k9=1.1,k2=1
通過
extract_kv
方法將Key-Value欄位展開。>>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',') name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9 0 name1 1.0 3.0 NaN 10.0 NaN NaN 1 name1 7.0 NaN NaN NaN 8.2 NaN 2 name2 NaN 1.2 1.5 NaN NaN NaN 3 name2 NaN 1.0 NaN NaN NaN 1.1
其中,需要展開的欄位名由
columns
指定,Key和Value之間的分隔字元,以及Key-Value對之間的分隔字元分別由kv_delim
和item_delim
這兩個參數指定,預設分別為半形冒號和半形逗號。輸出的欄位名為原欄位名和Key值的組合,通過_
相連。缺失值預設為NONE,可通過fill_value
選擇需要填充的值。>>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0) name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9 0 name1 1.0 3.0 0.0 10.0 0.0 0.0 1 name1 7.0 0.0 0.0 0.0 8.2 0.0 2 name2 0.0 1.2 1.5 0.0 0.0 0.0 3 name2 0.0 1.0 0.0 0.0 0.0 1.1
將多列資料轉換為一個Key-Value列,樣本資料如下。
>>> df name k1 k2 k3 k5 k7 k9 0 name1 1.0 3.0 NaN 10.0 NaN NaN 1 name1 7.0 NaN NaN NaN 8.2 NaN 2 name2 NaN 1.2 1.5 NaN NaN NaN 3 name2 NaN 1.0 NaN NaN NaN 1.1
通過
to_kv
方法轉換為Key-Value表示的格式。>>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=') name kv 0 name1 k1=1,k2=3,k5=10 1 name1 k1=7.1,k7=8.2 2 name2 k2=1.2,k3=1.5 3 name2 k9=1.1,k2=1