全部產品
Search
文件中心

:建立作業(DAG 類型)

更新時間:Jul 06, 2024

create_job

參數說明:

說明

所有類型的參數將被轉換為包含屬性資訊的字典對象。

參數

類型

描述

job_desc

JobDescription object, str, dict

作業的簡單描述和工作物件中各個任務的描述資訊,以及各個任務之間的DAG依賴關係

傳回值說明:

說明

create_job 方法將返回一個CreateResponse對象, 以下是 CreateResponse 對象的屬性。可以通過 response.Id 的方式擷取新任務的 ID。

屬性

類型

描述

Id

str

新任務的任務標識符

e.g.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from batchcompute import Client, ClientError
from batchcompute import CN_ZHANGJIAKOU as REGION
from batchcompute.resources import (
    ClusterDescription, GroupDescription, Configs, Networks, VPC,
    JobDescription, TaskDescription, DAG,Mounts,
    AutoCluster,Disks,Notification,
)

access_key_id = "" # your access key id
access_key_secret = "" # your access key secret
image_id = "m-8vbd8lo9xxxx" # the id of a image created before,鏡像需要確保已經註冊給批次運算
instance_type = "ecs.sn1.medium" # instance type
inputOssPath = "oss://xxx/input/" # your input oss path
outputOssPath = "oss://xxx/output/" #your output oss path
stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path

def getAutoClusterDesc():
    auto_desc = AutoCluster()

    auto_desc.ECSImageId = image_id

    #任務失敗保留環境,程式調試階段設定。環境保留費用會繼續產生請注意及時手動清除環境任務失敗保留環境,
    # 程式調試階段設定。環境保留費用會繼續產生請注意及時手動清除環境
    auto_desc.ReserveOnFail = False

    # 執行個體規格
    auto_desc.InstanceType = instance_type

    #case1 設定上限價格的競價執行個體; 
    # auto_desc.ResourceType = "Spot"
    # auto_desc.SpotStrategy = "SpotWithPriceLimit"
    # auto_desc.SpotPriceLimit = 0.5

    #case2 系統自動出價,最高隨用隨付價格
    # auto_desc.ResourceType = "Spot"
    # auto_desc.SpotStrategy = "SpotAsPriceGo"

    #case3 按量
    auto_desc.ResourceType = "OnDemand"

    #Configs
    configs = Configs()
    #Configs.Networks
    networks  = Networks()
    vpc = VPC()

    #case1 只給CidrBlock
    vpc.CidrBlock = '192.168.0.0/16'

    #case2 CidrBlock和VpcId 都傳入,必須保證VpcId的CidrBlock 和傳入的CidrBlock保持一致
    # vpc.CidrBlock = '172.26.0.0/16'
    # vpc.VpcId = "vpc-8vbfxdyhxxxx"

    networks.VPC = vpc
    configs.Networks = networks

    # 設定系統硬碟type(cloud_efficiency/cloud_ssd)以及size(單位GB)
    configs.add_system_disk(size=40, type_='cloud_efficiency')

    #設定資料盤type(必須和系統硬碟type保持一致)size(單位GB)掛載點
    # case1 linux環境
    # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='/path/to/mount/')

    # case2 windows環境
    # configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='E:')

    # 設定節點個數
    configs.InstanceCount = 1
    auto_desc.Configs = configs
    return auto_desc

def getDagJobDesc(clusterId = None):
    job_desc = JobDescription()
    dag_desc = DAG()
    mounts_desc = Mounts()

    job_desc.Name = "testBatchSdkJob"
    job_desc.Description = "test job"
    job_desc.Priority = 1

    # 訂閱job完成或者失敗事件
    noti_desc = Notification()
    noti_desc.Topic['Name'] = "test-topic"
    noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
    noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
    # job_desc.Notification = noti_desc

    job_desc.JobFailOnInstanceFail = False

    # 作業運行成功後戶自動會被立即釋放掉
    job_desc.AutoRelease = False
    job_desc.Type = "DAG"

    echo_task = TaskDescription()

    # 程式的輸入路徑映射,程式直接存取/home/test/input/來訪問oss://xxx/input/中的檔案
    # 支援檔案掛載,在程式中直接存取檔案
    # echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
    #                          "oss://xxx/test/file": "/home/test/test/file"}
    echo_task.InputMapping = {inputOssPath: "/home/test/input/"}

    # 程式的輸出路徑映射,可執行程式將結果輸出到/home/test/output/,
    # 程式執行完畢後批次運算將/home/test/output/中的結果上傳到oss://xxx/output/中
    # 輸入和輸出oss路徑不要有交叉,如輸入為oss://xxx/input/,輸出為oss://xxx/input/output/;
    # 這樣會導致未定義行為程式執行效能不能保證
    echo_task.OutputMapping = {"/home/test/output/":outputOssPath}

    #觸發程式啟動並執行命令列
    #case1 執行linux命令列
    echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"

    #case2 執行Windows CMD.exe
    # echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"

    #case3 輸入可執行檔
    # PackagePath存放commandLine中的可執行檔或者二進位包
    # echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
    # echo_task.Parameters.Command.CommandLine = "sh test.sh"

    # 設定程式運行過程中相關環境變數資訊
    echo_task.Parameters.Command.EnvVars["key1"] = "value1"
    echo_task.Parameters.Command.EnvVars["key2"] = "value2"

    # 設定docker參數
    #case1 docker鏡像在oss registry上
    # echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_IMAGE"] = "localhost:5000/yuorBucket/dockers:0.1"
    # echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH"] = "oss://your-bucket/dockers"

    #case2 docker鏡像在容器倉庫
    # echo_task.Parameters.Command.Docker.Image = "registry.cn-beijing.aliyuncs.com/demotest/test:0.1"

    # 設定程式的標準輸出地址,程式中的print列印會即時上傳到指定的oss地址
    echo_task.Parameters.StdoutRedirectPath = stdoutOssPath

    # 設定程式的標準錯誤輸出地址,程式拋出的異常錯誤會即時上傳到指定的oss地址
    echo_task.Parameters.StderrRedirectPath = stderrOssPath

    # 設定任務的逾時時間
    echo_task.Timeout = 600

    # 設定任務所需執行個體個數
    # 環境變數BATCH_COMPUTE_INSTANCE_ID為0到InstanceCount-1
    # 在執行程式中訪問BATCH_COMPUTE_INSTANCE_ID,實現資料訪問的切片實現單任務並發執行
    echo_task.InstanceCount = 1

    # 設定任務失敗後重試次數
    echo_task.MaxRetryCount = 0

    # NAS資料掛載
    #採用NAS時必須保證網路和NAS在同一個VPC內
    nasMountEntry = {
        "Source": "nas://xxxx.nas.aliyuncs.com:/",
        "Destination": "/home/mnt/",
        "WriteSupport":True,
    }
    mounts_desc.add_entry(nasMountEntry)
    mounts_desc.Locale = "utf-8"
    mounts_desc.Lock = False
    # echo_task.Mounts = mounts_desc

    # 採用固定叢集提交作業
    # echo_task.ClusterId = clusterId

    #採用auto叢集提交作業
    echo_task.AutoCluster = getAutoClusterDesc()

    # 新增工作
    dag_desc.add_task('echoTask', echo_task)

    # 可以設定多個task,每個task可以根據需求進行設定各項參數
    # dag_desc.add_task('echoTask2', echo_task)

    # Dependencies設定多個task之間的依賴關係,echoTask2依賴echoTask;echoTask3依賴echoTask2
    # dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
    # 
    job_desc.DAG = dag_desc
    return job_desc

if __name__ == "__main__":
    client = Client(REGION, access_key_id, access_key_secret)
    try:
        job_desc = getDagJobDesc()
        job_id = client.create_job(job_desc).Id
        print('job created: %s' % job_id)
    except ClientError,e:
        print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
說明

Notice: 關於Mounts的注意事項 Job 層級的 Mounts 參數會覆蓋 Cluster 層級的配置資訊; Modify Cluster 後,需要調用 RecreateInstance 介面才能使新指定的 Mounts 配置生效; 掛載 NAS 需要以 nas:// 做為 source 的首碼,否則會出錯; 每個類的具體成員資訊參見以下表格

(1)JobDescription 類

參數說明:

參數

類型

描述

properties

dict, str, JobDescription object

包含作業描述資訊的對象

屬性說明:

序號

屬性

類型

描述

1.

Name

str

作業名稱

2.

Description

str

作業的簡短描述資訊

3.

Priority

int

優先順序用一個[0,1000]範圍內的整數指定,數值越高表示作業調度時的優先順序越高

4.

Notification

dict

訊息通知配置,可以配置 MNS 服務的 Topic 和 Job 相關事件

5.

JobFailOnInstanceFail

bool

Instance 失敗是否直接使 Job 失敗

6.

AutoRelease

boolean

表示 Job 運行成功自動會被立即釋放(刪除)掉,預設為 False

7.

Type

str

目前僅支援有向非循環圖(directed acycline graph,DAG)形式描述任務

8.

DAG

dict, DAG object

DAG 描述

(2)DAG 類

參數說明:

參數

類型

描述

properties

dict, str, DAG object

所有任務的映射以及任務間依賴關係的描述資訊

屬性說明:

序號

屬性

類型

描述

1.

Tasks

dict

所有任務名與任務描述的映射關係

2.

Dependencies

dict

所有任務間的相互依賴關係

方法說明 :

序號

方法

描述

1.

add_task(task_name, task)

增加一個任務

2.

get_task(task_name)

通過任務名擷取任務資訊

3.

delete_task(task_name)

刪除某個任務

(3) TaskDescription 類

參數說明:

參數

類型

描述

properties

dict, str, TaskDescription object

單個任務的描述資訊

屬性說明:

序號

屬性

類型

描述

1.

Parameters

dict, Parameters object

任務參數詳情

2.

InputMapping

dict

OSS 到本地路徑的映射

3.

OutputMapping

dict

本地路徑到 OSS 的映射

4.

LogMapping

dict

本地日誌路徑對 OSS 映射

5.

Timeout

int

任務逾時時間

6.

InstanceCount

int

任務中執行個體的個數,正數

7.

MaxRetryCount

int

最大重試次數,預設為0

8.

ClusterId

str

叢集標識符

9.

Mounts

dict, Mounts object

執行個體的網路掛載配置資訊,由 Mounts 描述,目前支援 NAS 和 OSS 掛載。

10.

AutoCluster

dict, AutoCluster object

匿名叢集,和叢集標示符最多隻能指定一個

(4) Parameters 類

參數說明:

參數

類型

描述

properties

dict, str, Parameters object

任務參數的描述資訊

屬性說明:

序號

屬性

類型

描述

1.

Command

dict, Command object

使用者程式相關命令列參數

2.

InputMappingConfig

dict, InputMappingConfig object

NFS 掛載服務配置項

3.

StdoutRedirectPath

str

標準輸出的 OSS 路徑

4.

StderrRedirectPath

str

標準錯誤的 OSS 路徑

(5) AutoCluster 類

參數說明:

參數

類型

描述

properties

dict, str, AutoCluster object

匿名叢集資訊

屬性說明:

序號

屬性

類型

描述

1.

ECSImageId

str

ECS 鏡像 ID,可以使用系統提供的鏡像

2.

InstanceType

str

執行個體規格,執行個體類型

3.

ResourceType

str

資源類型,目前支援:“OnDemand” 和 “Spot”,預設為“OnDemand”

4.

UserData

dict

一個 KeyValue 映射,使用者自訂的資訊,使用 ECS 的 metaserver 擷取

5.

Configs

Configs object

叢集的配置資訊, 詳見4.13 節中 ClusterDescription 的介紹

6.

SpotStrategy

str

執行個體的競價策略,只有在 ResourceType 為 Spot 的情況下有效。取值範圍:SpotWithPriceLimit:設定上限價格的競價執行個體; SpotAsPriceGo:系統自動出價,最高隨用隨付價格。

7.

SpotPriceLimit

float

執行個體的每小時最高價格(每個執行個體規格的價格而非每核小時的價格)。支援最大 3 位小數,SpotStrategy 為 SpotWithPriceLimit 生效。

8.

ReserveOnFail

bool

任務失敗時不釋放相關的虛擬機器,會繼續收取這些資源的費用直到使用者刪除作業,預設為 False,僅用於調查問題。

9.

DependencyIsvService

string

執行程式依賴的阿里雲提供的ISV服務,目前提供的ISV服務有:“GTX" ,預設為””,不依賴任何ISV服務。

(6) Command 類

參數說明:

參數

類型

描述

properties

dict, str, Command object

使用者程式相關命令列參數

屬性說明:

序號

屬性

類型

描述

1.

CommandLine

str

執行使用者程式的命令

2.

PackagePath

str

使用者程式所在 OSS 路徑

3.

EnvVars

dict

使用者程式執行時的環境變數

(7) InputMappingConfig 類

參數說明:

參數

類型

描述

properties

dict, str, InputMappingConfig object

NFS 掛載服務配置項

屬性說明:

序號

屬性

類型

描述

1.

Locale

str

OSS object 掛載到本地時使用的字元集。可選範圍包括 GBK、GB2312-80、BIG5、ANSI、EUC-JP、EUC-TW、EUC-KR、SHIFT-JIS、KSC5601 等

2.

Lock

bool

NFS 掛載服務是否支援網路檔案鎖

(8) Notification 類

參數說明:

參數

類型

描述

properties

dict, str, Command object

使用者程式相關命令列參數

屬性說明:

序號

屬性

類型

描述

1.

Topic

Topic Object

訊息 Topic

(9) Topic 類

參數說明:

參數

類型

描述

properties

dict, str, Command object

使用者程式相關命令列參數

屬性說明:

序號

屬性

類型

描述

1.

Endpoint

str

MNS 地區 endpoint,格式如:http://${your_user_id}.mns.${region}-internal.aliyuncs.com/ ,請盡量使用內網 Endpoint。

2.

Name

str

Topic 名稱。

3.

Events

list

事件列表,請填寫 cluster 相關的事件名。

(10) Mounts 類

參數說明:

參數

類型

描述

properties

dict, str, Command object

建立叢集時的網路磁碟掛載配置資訊。

屬性說明:

序號

屬性

類型

描述

1.

Entries

array

網路磁碟掛載點資訊列表, 由 MountPoint 描述。

2.

Locale

str

掛載 OSS,NAS 儲存時語言選項。

3.

Lock

bool

掛載 OSS,NAS 儲存時檔案鎖支援選項。

4.

NAS

dict

NAS 配置資訊。

5.

OSS

dict

OSS 配置資訊。

(11) MountPoint 類

參數說明:

參數

類型

描述

properties

dict, str, Command object

網路掛載點。

屬性說明:

序號

屬性名稱

類型

描述

1.

Source

str

網路磁碟掛載來源路徑,可以是 nas://oss:// 開頭的字串。

2.

Destination

str

網路磁碟本地掛載點路徑。

3.

WriteSupport

bool

掛載點是否可寫。

(12) NAS 類

參數說明:

參數

類型

描述

properties

dict, str, Command object

NAS 配置資訊。

屬性說明:

序號

屬性名稱

類型

描述

1.

AccessGroup

list

需要將叢集執行個體加入到的 NAS 訪問組。

2.

FileSystem

list

需要訪問的檔案系統。

(13) OSS 類

參數說明:

參數

類型

描述

properties

dict, str, Command object

OSS 配置資訊。

屬性說明:

序號

屬性名稱

類型

描述

1.

AccessKeyId

str

OSS掛載使用的 Access ID。

2.

AccessKeySecret

str

OSS掛載使用的 Access Secret。

3.

SecurityToken

str

OSS掛載使用的 Security Token。