PyODPS是MaxCompute的Python版本的SDK。提供簡單方便的Python編程介面,以便您使用Python編寫MaxCompute作業、查詢MaxCompute表和視圖,以及管理MaxCompute資源。PyODPS提供了與ODPS命令列工具類似的功能,例如上傳和下載檔案、建立表、運行ODPS SQL查詢等,同時提供了一些進階功能,如提交MapReduce任務、使用ODPS UDF等。本文為您介紹PyODPS的應用情境、支援的工具,及使用過程中需要關注的注意事項
功能介紹
PyODPS應用情境請參見:
DataFrame操作:DataFrame快速入門。
讀取分區表資料:PyODPS讀取分區表資料。
參數傳遞:PyODPS參數傳遞。
使用第三方包:PyODPS使用第三方包。
查看一級分區:PyODPS查看一級分區。
條件查詢:PyODPS條件查詢。
DataFrame Sequence及執行:PyODPS的Sequence及執行操作。
支援的工具
PyODPS支援在本地環境、DataWorks、PAI Notebooks中使用。
無論您通過何種工具使用PyODPS,建議您盡量避免將全量資料下載到本地直接運行PyODPS任務,容易佔用大量記憶體造成OOM,建議您將任務提交到MaxCompute進行分布式運行,對比介紹請參見下文的注意事項:請勿下載全量資料到本地並運行PyODPS。
本地環境:您可以在本地環境安裝並使用PyODPS,操作指導可參見通過本地環境使用PyODPS。
DataWorks:DataWorks的PyODPS節點已安裝好了PyODPS,您可以直接在DataWorks的PyODPS節點上開發PyODPS任務並周期性運行,操作指導請參見通過DataWorks使用PyODPS。
PAI Notebooks:PAI的Python環境也可安裝運行PyODPS,其中PAI的內建鏡像均已安裝好了PyODPS可直接使用,如PAI-Designer的自訂Python組件,在PAI Notebooks中使用PyODPS的方式與通用的使用方式基本一致,可參考基本操作概述、DataFrame概述。
注意事項:請勿下載全量資料到本地並運行PyODPS
PyODPS作為一個SDK,本身運行於各種用戶端,包括PC、DataWorks(資料開發的PyODPS節點)或PAI Notebooks的運行環境。需要注意的是,PyODPS提供了多種方便拉取資料到本地的操作,如tunnel下載操作、execute操作、to_pandas操作等,因此,很多初始使用PyODPS的使用者會試圖把資料拉取到本地,處理完成後再上傳到 MaxCompute上,很多時候這種方式是十分低效的,拉取資料到本地徹底喪失了MaxCompute的大規模並行能力的優勢。
資料處理方式 | 描述 | 情境樣本 |
拉取到本地處理(不推薦,易OOM) | 例如DataWorks中的PyODPS節點,內建了PyODPS包以及必要的Python環境,是一個資源非常受限的用戶端運行容器,並不使用MaxCompute計算資源,有較強的記憶體限制。 | PyODPS提供了 |
提交到MaxCompute分布式執行(推薦) | 推薦您合理利用PyODPS提供的分布式DataFrame功能,將主要的計算提交到MaxCompute分布式執行而不是在PyODPS用戶端節點下載處理,這是正確使用PyODPS的關鍵。 | 推薦使用PyODPS DataFrame介面來完成資料處理。常見的需求,比如需要對每一行資料處理然後寫回表,或者一行資料要拆成多行,都可以通過PyODPS DataFrame中的 使用這些介面最終都會翻譯成SQL到MaxCompute計算叢集做分散式運算,並且本地幾乎沒有任何的記憶體消耗,相比於單機有很大的效能提升。 |
以下以一個分詞的樣本為例,為您對比兩種方式的代碼區別。
樣本情境
使用者需要通過分析每天產生的日誌字串來提取一些資訊,有一個只有一列的表,類型是string,通過jieba分詞可以將中文語句分詞,然後再找到想要的關鍵詞儲存到資訊表裡。
低效處理代碼demo
import jieba t = o.get_table('word_split') out = [] with t.open_reader() as reader: for r in reader: words = list(jieba.cut(r[0])) # # 處理邏輯,產生出 processed_data # out.append(processed_data) out_t = o.get_table('words') with out_t.open_writer() as writer: writer.write(out)
單機處理資料的思維,逐行讀取資料,然後逐行處理資料,再逐行寫入目標表。整個流程中,下載上傳資料消耗了大量的時間,並且在執行指令碼的機器上需要很大的記憶體處理所有的資料,特別是對於使用DataWorks節點的使用者來說,很容易因為超過預設分配的記憶體值而導致OOM運行報錯。
高效處理代碼demo
from odps.df import output out_table = o.get_table('words') df = o.get_table('word_split').to_df() # 假定需要返回的欄位及類型如下 out_names = ["word", "count"] out_types = ["string", "int"] @output(out_names, out_types) def handle(row): import jieba words = list(jieba.cut(row[0])) # # 處理邏輯,產生出 processed_data # yield processed_data df.apply(handle, axis=1).persist(out_table.name)
利用apply實現分布式執行:
複雜邏輯都放在handle這個函數裡,這個函數會被自動序列化到服務端作為UDF使用,在服務端調用執行,且因為handle服務端實際執行時也是對每一行進行處理的,所以邏輯上是沒有區別的。不同的是,這樣寫的程式在提交到MaxCompute端執行時,有多台機器同時處理資料,可以節約很多時間。
調用persist介面會將產生的資料直接寫到另一張MaxCompute表中,所有的資料產生與消費都在 MaxCompute叢集完成,也節約了本地的網路與記憶體。
在這個例子中也使用到了三方包,MaxCompute是支援自訂函數中使用三方包的(樣本中的
jieba
),所以無需擔心代碼改動帶來的成本,您可以幾乎不需要改動主要邏輯就可以享受到MaxCompute的大規模計算能力。
使用限制
由於沙箱的限制,部分Pandas計算後端執行本地調試通過的程式,無法在MaxCompute上調試通過。