Task是MaxCompute的基本計算單元,例如SQL Task。本文為您介紹PyODPS中任務執行個體的基本操作。
基本操作
Task在執行時會被執行個體化, 以MaxCompute執行個體(下文簡稱為執行個體或Instance)的形式存在。執行個體常見的操作如下:
list_instances()
:擷取專案空間下的所有的Instance。exist_instance()
:判斷某執行個體是否存在。get_instance()
:擷取執行個體。stop_instance()
:停止執行個體。僅支援對運行中的執行個體進行停止執行個體操作,否則會報錯。說明通過Instance對象調用
stop
方法也可以用於停止Instance,如o.get_instance('instance id').stop
。
樣本
擷取Instancede的ID
for instance in o.list_instances(): print(instance.id)
判斷Instance是否存在
print(o.exist_instance('my_instance_id'))
擷取Logview地址
對於SQL等任務,通過Instance對象調用
get_logview_address
方法即可擷取Logview地址。#從已有的instance對象擷取Logview地址。 instance = o.run_sql('desc pyodps_iris') print(instance.get_logview_address()) #從instance id擷取Logview地址。 instance = o.get_instance('my_instance_id') print(instance.get_logview_address())
對於PAI類型任務,需要先枚舉其子任務,再擷取子任務的Logview。
instance = o.run_xflow('AppendID', 'algo_public', {'inputTableName': 'input_table', 'outputTableName': 'output_table'}) for sub_inst_name, sub_inst in o.get_xflow_sub_instances(instance).items(): print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
任務執行個體狀態
一個Instance的狀態可以是Running
、Suspended
或者 Terminated
。關於任務執行個體狀態的方法有以下幾種:
status
:屬性來擷取狀態。is_terminated
:返回當前Instance是否已經執行完畢的結果。is_successful
:返回當前Instance是否正確地執行完畢的結果。任務處於運行中或者執行失敗都會返回False。
程式碼範例如下。
擷取Instance狀態。
instance=o.get_instance('my_instance_id') print(instance.status) print(instance.status.value)
傳回值樣本:
Status.TERMINATED Terminated
判斷當前樣本是否已經執行完畢。
instance=o.get_instance('my_instance_id') from odps.models import Instance print(instance.status == Instance.Status.TERMINATED)
傳回值為
True
表示已執行完畢。
調用wait_for_completion
方法會阻塞直到Instance執行完成。wait_for_success
方法同樣會阻塞,但如果最終任務執行失敗,會拋出相關異常。
子任務操作
一個Instance在運行時,可能包含一個或者多個子任務,這些子任務稱為Task(注意:這裡的Task不同於MaxCompute的計算單元)。關於Task的方法有以下幾種:
get_task_names
:通過此方法擷取所有的Task任務。該方法返回一個所有子任務的名稱列表。instance=o.get_instance('my_instance_id') instance.get_task_names()
傳回值樣本:
['jdbc_sql_task']
get_task_result
:指定Task名稱,擷取該Task的執行結果。該方法是以字典的形式返回每個Task的執行結果。 程式碼範例如下:擷取子任務名稱。
instance=o.execute_sql('select*frompyodps_irislimit1') print(instance.get_task_names())
傳回值樣本:
['AnonymousSQLTask']
擷取子任務運行結果。
print(instance.get_task_result('AnonymousSQLTask'))
傳回值樣本:
"sepallength","sepalwidth","petallength","petalwidth","name" 4.9,3.0,1.4,0.2,"Iris-setosa"
擷取子任務結果。
print(instance.get_task_results())
傳回值樣本:
OrderedDict([('AnonymousSQLTask', '"sepallength","sepalwidth","petallength","petalwidth","name"\n4.9,3.0,1.4,0.2,"Iris-setosa"\n')])
get_task_progress
:該方法擷取在任務執行個體運行時Task當前的運行進度。instance=o.get_instance('20160519101349613gzbzufck2') while not instance.is_terminated(): for task_name in instance.get_task_names(): print(instance.id, instance.get_task_progress(task_name).get_stage_progress_formatted_string()) time.sleep(10)
傳回值樣本:
20160519101349613gzbzufck2 2016-05-19 18:14:03 M1_Stg1_job0:0/1/1[100%]