DataWorks為您提供PyODPS 3節點,您可以在該節點中直接使用Python代碼編寫MaxCompute作業,並進行作業的周期性調度。本文為您介紹如何通過DataWorks實現Python任務的配置與調度。
前提條件
已建立PyODPS 3節點,詳情請參見建立並管理MaxCompute節點。
背景資訊
PyODPS是MaxCompute的Python版本的SDK,提供簡單方便的Python編程介面,以便您使用Python編寫MaxCompute作業、查詢MaxCompute表和視圖,以及管理MaxCompute資源,詳情請參見PyODPS概述。在DataWorks中,您可通過PyODPS節點實現Python任務的調度運行,以及與其他作業的整合操作。
注意事項
在DataWorks資源群組本地運行PyODPS節點代碼時,若代碼中需要調用第三方包,Serverless資源群組可通過自訂鏡像安裝第三方包。
說明如果代碼中存在UDF引用第三方包的情況,不支援使用上述方式,具體配置方法,請參見UDF樣本:Python UDF使用第三方包。
如果您的PyODPS任務需要訪問特殊的網路環境(如VPC網路或IDC網路中的資料來源或服務等),請使用Serverless調度資源群組,並參考網路連通解決方案打通Serverless資源群組與目標環境的網路連通。
PyODPS文法及更多資訊請參見PyODPS文檔。
PyODPS節點分為PyODPS 2和PyODPS 3兩種,二者的區別在於底層Python語言版本不同。PyODPS 2底層Python語言版本為Python 2,PyODPS 3底層Python語言版本為Python 3,請您根據實際使用的Python語言版本建立PyODPS節點。
若通過PyODPS節點執行SQL無法正常產生資料血緣關係,即資料血緣在資料地圖無法正常展示,您可在任務代碼處通過手動設定DataWorks調度啟動並執行相關參數解決。查看資料血緣,詳情請參見查看血緣資訊;參數設定,詳情請參見設定運行參數hints。任務運行時所需參數可參考如下代碼擷取。
import os ... # get DataWorks sheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submiting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
Pyodps節點的輸出日誌最大支援4MB。建議您盡量避免在日誌中直接輸出大量的資料結果。相反,建議您多輸出警示日誌和正常進度的日誌,以提供更有價值的資訊。
使用限制
使用獨享調度資源群組執行PyODPS節點時,建議在節點內擷取到獨享資源群組本地處理的資料不超過50MB,該操作受限於獨享調度資源群組的規格,處理的本機資料過多並超出作業系統閾值時可能發生OOM(Got Killed)錯誤。請避免在PyODPS節點中寫入過多的資料處理代碼。詳情請參見高效使用PyODPS最佳實務。
使用Serverless資源群組執行PYODPS節點時,您可根據節點內需要處理的資料量合理配置PyODPS節點的CU。
如果您發現有Got killed報錯,即表明記憶體使用量超限,進程被中止。因此,請盡量避免本地的資料操作。通過PyODPS發起的SQL和DataFrame任務(除to_pandas外)不受此限制。
非自訂函數代碼可以使用平台預裝的Numpy和Pandas。不支援其他帶有二進位代碼的三方包。
由於相容性原因,在DataWorks中,options.tunnel.use_instance_tunnel預設設定為False。如果需要全域開啟instance tunnel,需要手動將該值設定為True。
當Python 3的子版本號碼不同(例如Python 3.8和Python 3.7)時,位元組碼的定義有所不同。
目前MaxCompute使用的Python 3版本為3.7,當使用其它版本Python 3中的部分文法(例如Python 3.8中的finally block)時,執行會報錯,建議您選擇Python 3.7。
PyODPS 3支援運行在Serverless資源群組上。如需購買使用,請參見新增和使用Serverless資源群組。
編輯代碼:簡單樣本
建立PyODPS節點後,您可以進行代碼編輯及運行,更多關於PyODPS文法說明,請參見基本操作概述。
ODPS入口
DataWorks的PyODPS節點中,將會包含一個全域的變數odps或o,即ODPS入口,您無需手動定義ODPS入口。
print(odps.exist_table('PyODPS_iris'))
執行SQL
您可以在PyODPS節點中執行SQL,詳情請參見SQL。
DataWorks上預設未開啟instance tunnel,即instance.open_reader預設使用Result介面(最多一萬條記錄)。您可以通過reader.count擷取記錄數。如果您需要迭代擷取全部資料,則需要關閉
limit
限制。您可以通過下列語句在全域範圍內開啟Instance Tunnel並關閉limit
限制。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 關閉limit限制,讀取全部資料。 with instance.open_reader() as reader: # 通過Instance Tunnel可讀取全部資料。
您也可以通過在
上添加tunnel=True
,實現僅對本次open_reader開啟instance tunnel。同時,您還可以添加limit=False
,實現僅對本次關閉limit
限制。# 本次open_reader使用Instance Tunnel介面,且能讀取全部資料。 with instance.open_reader(tunnel=True, limit=False) as reader:
設定運行參數
您可以通過設定hints參數,來設定運行時的參數,參數類型是dict。 Hints參數的詳情請參見SET操作。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
對全域配置設定sql.settings後,每次運行時,都需要添加相關的運行時的參數。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # 根據全域配置添加hints。
讀取運行結果
運行SQL的執行個體能夠直接執行open_reader的操作,有以下兩種情況:
SQL返回了結構化的資料。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 處理每一個record。
可能執行的是desc等SQL語句,通過reader.raw屬性,擷取到原始的SQL執行結果。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
說明如果使用了自訂調度參數,頁面上直接觸發運行PyODPS 3節點時,需要寫死時間,PyODPS節點無法直接替換。
DataFrame
您還可以通過DataFrame的方式處理資料。
執行
在DataWorks的環境裡,DataFrame的執行需要顯式調用立即執行的方法。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 調用立即執行的方法,處理每條Record。
如果您需要在Print時調用立即執行,需要開啟
options.interactive
。from odps import options from odps.df import DataFrame options.interactive = True # 在開始處開啟開關。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print時會立即執行。
列印詳細資料
通過設定
options.verbose
選項。在DataWorks上,預設已經處於開啟狀態,運行過程會列印Logview等詳細過程。
樣本
以下以一個簡單樣本為您介紹PyODPS節點的使用:
準備資料集,建立pyodps_iris樣本表,具體操作請參見Dataframe資料處理。
建立DataFrame,詳情請參見從MaxCompute表建立DataFrame。
在PyODPS節點中輸入以下代碼並運行。
from odps.df import DataFrame # 從ODPS表建立DataFrame。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
返回結果:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
編輯代碼:進階樣本
若節點需要周期性調度,您需要定義節點調度時的相關屬性,調度配置詳情請參見任務調度屬性配置概述。
使用調度參數
單擊節點編輯地區右側的調度配置,在參數地區配置自訂參數,PyODPS節點與SQL節點定義變數的方式不同,詳情請參見調度參數配置。
與DataWorks中的SQL節點不同,為了避免影響代碼,PyODPS節點不會在代碼中替換類似 ${param_name}的字串,而是在執行代碼前,在全域變數中增加一個名為args
的dict,調度參數可以在此擷取。例如,在參數中設定ds=${yyyymmdd}
,則可以通過以下方式在代碼中擷取該參數。
print('ds=' + args['ds'])
ds=20161116
如果您需要擷取名為ds
的分區,則可以使用如下方法。
o.get_table('table_name').get_partition('ds=' + args['ds'])
更多情境的PyODPS任務開發,請參考:
後續步驟
如何判斷Shell自訂指令碼任務的成功完成:Python自訂指令碼任務的成功完成的判斷邏輯與Shell節點一致,您可通過該方法進行判斷。
發布任務:如果您使用的是標準模式的工作空間,需要通過任務發布流程,將任務發布至生產環境後,任務才會周期調度運行。
周期任務營運:任務提交發布至生產營運中心調度後,您可通過DataWorks營運中心進行相關營運操作。
PyODPS常見問題:您可瞭解PyODPS執行過程中的常見問題,便於出現異常時快速排查解決。
常見問題
Q:使用PyODPS3節點通過代碼採集第三方介面資料(例如飛書)匯入到DataWorks中,在本地開發沒有問題,可以採集到資料,但提交到生產環境,在營運中心執行時卻報錯響應逾時,為什嗎?
A:請在
中的安全設定地區,配置沙箱白名單,添加相應第三方介面的白名單資訊,允許PyOSPS3中的任務可以訪問到目標第三方介面。例如: