全部產品
Search
文件中心

:任務執行個體

更新時間:Jun 19, 2024

Task是MaxCompute的基本計算單元,例如SQL Task。本文為您介紹PyODPS中任務執行個體的基本操作。

基本操作

Task在執行時會被執行個體化, 以MaxCompute執行個體(下文簡稱為執行個體或Instance)的形式存在。執行個體常見的操作如下:

  • list_instances():擷取專案空間下的所有的Instance。

  • exist_instance():判斷某執行個體是否存在。

  • get_instance():擷取執行個體。

  • stop_instance():停止執行個體。僅支援對運行中的執行個體進行停止執行個體操作,否則會報錯。

    說明

    通過Instance對象調用stop方法也可以用於停止Instance,如o.get_instance('instance id').stop

樣本

  • 擷取Instancede的ID

    for instance in o.list_instances():
        print(instance.id)
  • 判斷Instance是否存在

    print(o.exist_instance('my_instance_id'))

擷取Logview地址

  • 對於SQL等任務,通過Instance對象調用get_logview_address方法即可擷取Logview地址。

    #從已有的instance對象擷取Logview地址。
    instance = o.run_sql('desc pyodps_iris')
    print(instance.get_logview_address())
    
    #從instance id擷取Logview地址。
    instance = o.get_instance('my_instance_id')
    print(instance.get_logview_address())
  • 對於PAI類型任務,需要先枚舉其子任務,再擷取子任務的Logview。

    instance = o.run_xflow('AppendID', 'algo_public', {'inputTableName': 'input_table', 'outputTableName': 'output_table'})
    for sub_inst_name, sub_inst in o.get_xflow_sub_instances(instance).items():
        print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))

任務執行個體狀態

一個Instance的狀態可以是RunningSuspended或者 Terminated。關於任務執行個體狀態的方法有以下幾種:

  • status:屬性來擷取狀態。

  • is_terminated:返回當前Instance是否已經執行完畢的結果。

  • is_successful:返回當前Instance是否正確地執行完畢的結果。任務處於運行中或者執行失敗都會返回False。

程式碼範例如下。

  • 擷取Instance狀態。

    instance=o.get_instance('my_instance_id')
    print(instance.status)
    print(instance.status.value)

    傳回值樣本:

    Status.TERMINATED
    Terminated
  • 判斷當前樣本是否已經執行完畢。

    instance=o.get_instance('my_instance_id')
    from odps.models import Instance
    print(instance.status == Instance.Status.TERMINATED)

    傳回值為True表示已執行完畢。

說明

調用wait_for_completion方法會阻塞直到Instance執行完成。wait_for_success方法同樣會阻塞,但如果最終任務執行失敗,會拋出相關異常。

子任務操作

一個Instance在運行時,可能包含一個或者多個子任務,這些子任務稱為Task(注意:這裡的Task不同於MaxCompute的計算單元)。關於Task的方法有以下幾種:

  • get_task_names:通過此方法擷取所有的Task任務。該方法返回一個所有子任務的名稱列表。

    instance=o.get_instance('my_instance_id')
    instance.get_task_names()

    傳回值樣本:

    ['jdbc_sql_task']
  • get_task_result:指定Task名稱,擷取該Task的執行結果。該方法是以字典的形式返回每個Task的執行結果。 程式碼範例如下:

    • 擷取子任務名稱。

      instance=o.execute_sql('select*frompyodps_irislimit1')
      print(instance.get_task_names())

      傳回值樣本:

      ['AnonymousSQLTask']
    • 擷取子任務運行結果。

      print(instance.get_task_result('AnonymousSQLTask'))

      傳回值樣本:

      "sepallength","sepalwidth","petallength","petalwidth","name"
      4.9,3.0,1.4,0.2,"Iris-setosa"
    • 擷取子任務結果。

      print(instance.get_task_results())

      傳回值樣本:

      OrderedDict([('AnonymousSQLTask', '"sepallength","sepalwidth","petallength","petalwidth","name"\n4.9,3.0,1.4,0.2,"Iris-setosa"\n')])
  • get_task_progress:該方法擷取在任務執行個體運行時Task當前的運行進度。

    instance=o.get_instance('20160519101349613gzbzufck2')
    while not instance.is_terminated():
        for task_name in instance.get_task_names():
            print(instance.id, instance.get_task_progress(task_name).get_stage_progress_formatted_string())
            time.sleep(10)

    傳回值樣本:

    20160519101349613gzbzufck2 2016-05-19 18:14:03 M1_Stg1_job0:0/1/1[100%]