全部產品
Search
文件中心

:PyODPS概述

更新時間:Jun 19, 2024

PyODPS是MaxCompute的Python版本的SDK。提供簡單方便的Python編程介面,以便您使用Python編寫MaxCompute作業、查詢MaxCompute表和視圖,以及管理MaxCompute資源。PyODPS提供了與ODPS命令列工具類似的功能,例如上傳和下載檔案、建立表、運行ODPS SQL查詢等,同時提供了一些進階功能,如提交MapReduce任務、使用ODPS UDF等。本文為您介紹PyODPS的應用情境、支援的工具,及使用過程中需要關注的注意事項

功能介紹

PyODPS應用情境請參見:

支援的工具

PyODPS支援在本地環境、DataWorks、PAI Notebooks中使用。

重要

無論您通過何種工具使用PyODPS,建議您盡量避免將全量資料下載到本地直接運行PyODPS任務,容易佔用大量記憶體造成OOM,建議您將任務提交到MaxCompute進行分布式運行,對比介紹請參見下文的注意事項:請勿下載全量資料到本地並運行PyODPS

  • 本地環境:您可以在本地環境安裝並使用PyODPS,操作指導可參見通過本地環境使用PyODPS

  • DataWorks:DataWorks的PyODPS節點已安裝好了PyODPS,您可以直接在DataWorks的PyODPS節點上開發PyODPS任務並周期性運行,操作指導請參見通過DataWorks使用PyODPS

  • PAI Notebooks:PAI的Python環境也可安裝運行PyODPS,其中PAI的內建鏡像均已安裝好了PyODPS可直接使用,如PAI-Designer的自訂Python組件,在PAI Notebooks中使用PyODPS的方式與通用的使用方式基本一致,可參考基本操作概述DataFrame概述

注意事項:請勿下載全量資料到本地並運行PyODPS

PyODPS作為一個SDK,本身運行於各種用戶端,包括PC、DataWorks(資料開發的PyODPS節點)或PAI Notebooks的運行環境。pyodps環境需要注意的是,PyODPS提供了多種方便拉取資料到本地的操作,如tunnel下載操作、execute操作、to_pandas操作等,因此,很多初始使用PyODPS的使用者會試圖把資料拉取到本地,處理完成後再上傳到 MaxCompute上,很多時候這種方式是十分低效的,拉取資料到本地徹底喪失了MaxCompute的大規模並行能力的優勢。

資料處理方式

描述

情境樣本

拉取到本地處理(不推薦,易OOM)

例如DataWorks中的PyODPS節點,內建了PyODPS包以及必要的Python環境,是一個資源非常受限的用戶端運行容器,並不使用MaxCompute計算資源,有較強的記憶體限制。

PyODPS提供了to_pandas介面,可以直接將MaxCompute資料轉化成Pandas DataFrame資料結構,但這個介面只應該被用於擷取小規模資料做本地開發調試使用,而不是用來大規模處理資料,因為使用這個介面會觸發下載行為,將位於MaxCompute中的海量資料下載到本地,如果後續操作的都是本地的DataFrame,則喪失了MaxCompute 的大規模並行計算能力,且資料量稍大時,單機記憶體就很容易產生OOM。

提交到MaxCompute分布式執行(推薦)

推薦您合理利用PyODPS提供的分布式DataFrame功能,將主要的計算提交到MaxCompute分布式執行而不是在PyODPS用戶端節點下載處理,這是正確使用PyODPS的關鍵。

推薦使用PyODPS DataFrame介面來完成資料處理。常見的需求,比如需要對每一行資料處理然後寫回表,或者一行資料要拆成多行,都可以通過PyODPS DataFrame中的map或者apply實現,有些甚至只需要一行代碼,足夠高效與簡潔,案例可參見使用自訂函數及Python第三方庫

使用這些介面最終都會翻譯成SQL到MaxCompute計算叢集做分散式運算,並且本地幾乎沒有任何的記憶體消耗,相比於單機有很大的效能提升。

以下以一個分詞的樣本為例,為您對比兩種方式的代碼區別。

  • 樣本情境

    使用者需要通過分析每天產生的日誌字串來提取一些資訊,有一個只有一列的表,類型是string,通過jieba分詞可以將中文語句分詞,然後再找到想要的關鍵詞儲存到資訊表裡。

  • 低效處理代碼demo

    import jieba
    t = o.get_table('word_split')
    out = []
    with t.open_reader() as reader:
        for r in reader:
            words = list(jieba.cut(r[0]))
            #
            # 處理邏輯,產生出 processed_data
            #
            out.append(processed_data)
    out_t = o.get_table('words')
    with out_t.open_writer() as writer:
        writer.write(out)

    單機處理資料的思維,逐行讀取資料,然後逐行處理資料,再逐行寫入目標表。整個流程中,下載上傳資料消耗了大量的時間,並且在執行指令碼的機器上需要很大的記憶體處理所有的資料,特別是對於使用DataWorks節點的使用者來說,很容易因為超過預設分配的記憶體值而導致OOM運行報錯。

  • 高效處理代碼demo

    from odps.df import output
    out_table = o.get_table('words')
    df = o.get_table('word_split').to_df()
    
    # 假定需要返回的欄位及類型如下
    out_names = ["word", "count"]
    out_types = ["string", "int"]
    
    @output(out_names, out_types)
    def handle(row):
        import jieba
        words = list(jieba.cut(row[0]))
        #
        # 處理邏輯,產生出 processed_data
        #
        yield processed_data
    df.apply(handle, axis=1).persist(out_table.name)

    利用apply實現分布式執行:

    • 複雜邏輯都放在handle這個函數裡,這個函數會被自動序列化到服務端作為UDF使用,在服務端調用執行,且因為handle服務端實際執行時也是對每一行進行處理的,所以邏輯上是沒有區別的。不同的是,這樣寫的程式在提交到MaxCompute端執行時,有多台機器同時處理資料,可以節約很多時間。

    • 調用persist介面會將產生的資料直接寫到另一張MaxCompute表中,所有的資料產生與消費都在 MaxCompute叢集完成,也節約了本地的網路與記憶體。

    • 在這個例子中也使用到了三方包,MaxCompute是支援自訂函數中使用三方包的(樣本中的jieba),所以無需擔心代碼改動帶來的成本,您可以幾乎不需要改動主要邏輯就可以享受到MaxCompute的大規模計算能力。

使用限制