全部產品
Search
文件中心

DataWorks:開發PyODPS 2任務

更新時間:Aug 30, 2024

DataWorks提供PyODPS 2節點類型,您可以在DataWorks上通過PyODPS文法進行PyODPS任務開發,PyODPS整合了MaxCompute的Python SDK。支援您在DataWorks的PyODPS 2節點上直接編輯Python代碼,操作MaxCompute。

前提條件

已建立PyODPS 2節點,詳情請參見建立並管理MaxCompute節點

背景資訊

PyODPS是MaxCompute的Python版本的SDK,提供簡單方便的Python編程介面,以便您使用Python編寫MaxCompute作業、查詢MaxCompute表和視圖,以及管理MaxCompute資源,詳情請參見PyODPS概述。在DataWorks中,您可通過PyODPS節點實現Python任務的調度運行,以及與其他作業的整合操作。

注意事項

  • 如果您的PyODPS任務需要訪問特殊的網路環境(如VPC網路或IDC網路中的資料來源或服務等),請使用Serverless調度資源群組,並參考網路連通解決方案打通Serverless資源群組與目標環境的網路連通。

  • PyODPS文法及更多資訊請參見PyODPS文檔

  • PyODPS節點分為PyODPS 2和PyODPS 3兩種,二者的區別在於底層Python語言版本不同。PyODPS 2底層Python語言版本為Python 2,PyODPS 3底層Python語言版本為Python 3,請您根據實際使用的Python語言版本建立PyODPS節點。

  • 若通過PyODPS節點執行SQL無法正常產生資料血緣關係,即資料血緣在資料地圖無法正常展示,您可在任務代碼處通過手動設定DataWorks調度啟動並執行相關參數解決。查看資料血緣,詳情請參見查看血緣資訊;參數設定,詳情請參見設定運行參數hints。任務運行時所需參數可參考如下代碼擷取。

    import os
    ...
    # get DataWorks sheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # setting hints while submiting a task
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • Pyodps節點的輸出日誌最大支援4MB。建議您盡量避免在日誌中直接輸出大量的資料結果。相反,建議您多輸出警示日誌和正常進度的日誌,以提供更有價值的資訊。

使用限制

  • 使用獨享調度資源群組執行PyODPS節點時,建議在節點內擷取到獨享資源群組本地處理的資料不超過50MB,該操作受限於獨享調度資源群組的規格,處理的本機資料過多並超出作業系統閾值時可能發生OOM(Got Killed)錯誤。請避免在PyODPS節點中寫入過多的資料處理代碼。詳情請參見高效使用PyODPS最佳實務

  • 使用Serverless資源群組執行PYODPS節點時,您可根據節點內需要處理的資料量合理配置PyODPS節點的CU。

  • 如果您發現有Got killed報錯,即表明記憶體使用量超限,進程被中止。因此,請盡量避免本地的資料操作。通過PyODPS發起的SQL和DataFrame任務(除to_pandas外)不受此限制。

  • 非自訂函數代碼可以使用平台預裝的Numpy和Pandas。不支援其他帶有二進位代碼的三方包。

  • 由於相容性原因,在DataWorks中,options.tunnel.use_instance_tunnel預設設定為False。如果需要全域開啟instance tunnel,需要手動將該值設定為True。

  • PyODPS 2節點底層的Python版本為2.7。

編輯代碼:簡單樣本

建立PyODPS節點後,您可以進行代碼編輯及運行,更多關於PyODPS文法說明,請參見基本操作概述

  • ODPS入口

    DataWorks的PyODPS節點中,將會包含一個全域的變數odpso,即ODPS入口,您無需手動定義ODPS入口。

    print(odps.exist_table('PyODPS_iris'))
  • 執行SQL

    您可以在PyODPS節點中執行SQL,詳情請參見SQL

    • DataWorks上預設未開啟instance tunnel,即instance.open_reader預設使用Result介面(最多一萬條記錄)。您可以通過reader.count擷取記錄數。如果您需要迭代擷取全部資料,則需要關閉limit限制。您可以通過下列語句在全域範圍內開啟Instance Tunnel並關閉limit限制。

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # 關閉limit限制,讀取全部資料。
      
      with instance.open_reader() as reader:
        # 通過Instance Tunnel可讀取全部資料。
    • 您也可以通過在open_reader上添加tunnel=True,實現僅對本次open_reader開啟instance tunnel。同時,您還可以添加 limit=False,實現僅對本次關閉limit限制。

      # 本次open_reader使用Instance Tunnel介面,且能讀取全部資料。
      with instance.open_reader(tunnel=True, limit=False) as reader:
  • 設定運行參數

    • 您可以通過設定hints參數,來設定運行時的參數,參數類型是dict。 Hints參數的詳情請參見SET操作

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • 對全域配置設定sql.settings後,每次運行時,都需要添加相關的運行時的參數。

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # 根據全域配置添加hints。
  • 讀取運行結果

    運行SQL的執行個體能夠直接執行open_reader的操作,有以下兩種情況:

    • SQL返回了結構化的資料。

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # 處理每一個record。
    • 可能執行的是desc等SQL語句,通過reader.raw屬性,擷取到原始的SQL執行結果。

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      說明

      如果使用了自訂調度參數,頁面上直接觸發運行PyODPS 3節點時,需要寫死時間,PyODPS節點無法直接替換。

  • DataFrame

    您還可以通過DataFrame的方式處理資料。

    • 執行

      在DataWorks的環境裡,DataFrame的執行需要顯式調用立即執行的方法

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # 調用立即執行的方法,處理每條Record。

      如果您需要在Print時調用立即執行,需要開啟options.interactive

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # 在開始處開啟開關。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Print時會立即執行。
    • 列印詳細資料

      通過設定options.verbose選項。在DataWorks上,預設已經處於開啟狀態,運行過程會列印Logview等詳細過程。

樣本

以下以一個簡單樣本為您介紹PyODPS節點的使用:

  1. 準備資料集,建立pyodps_iris樣本表,具體操作請參見Dataframe資料處理

  2. 建立DataFrame,詳情請參見從MaxCompute表建立DataFrame

  3. 在PyODPS節點中輸入以下代碼並運行。

    from odps.df import DataFrame
    
    # 從ODPS表建立DataFrame。
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    返回結果:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

編輯代碼:進階樣本

若節點需要周期性調度,您需要定義節點調度時的相關屬性,調度配置詳情請參見任務調度屬性配置概述

使用調度參數

單擊節點編輯地區右側的調度配置,在參數地區配置自訂參數,PyODPS節點與SQL節點定義變數的方式不同,詳情請參見調度參數配置

與DataWorks中的SQL節點不同,為了避免影響代碼,PyODPS節點不會在代碼中替換類似 ${param_name}的字串,而是在執行代碼前,在全域變數中增加一個名為args的dict,調度參數可以在此擷取。例如,在參數中設定ds=${yyyymmdd},則可以通過以下方式在代碼中擷取該參數。

print('ds=' + args['ds'])
ds=20161116
說明

如果您需要擷取名為ds的分區,則可以使用如下方法。

o.get_table('table_name').get_partition('ds=' + args['ds'])

更多情境的PyODPS任務開發,請參考:

後續步驟

  • 如何判斷Shell自訂指令碼任務的成功完成:Python自訂指令碼任務的成功完成的判斷邏輯與Shell節點一致,您可通過該方法進行判斷。

  • 發布任務:如果您使用的是標準模式的工作空間,需要通過任務發布流程,將任務發布至生產環境後,任務才會周期調度運行。

  • 周期任務營運:任務提交發布至生產營運中心調度後,您可通過DataWorks營運中心進行相關營運操作。

  • PyODPS常見問題:您可瞭解PyODPS執行過程中的常見問題,便於出現異常時快速排查解決。