create_job
參數說明:
所有類型的參數將被轉換為包含屬性資訊的字典對象。
參數 | 類型 | 描述 |
job_desc | JobDescription object, str, dict | 作業的簡單描述和工作物件中各個任務的描述資訊,以及各個任務之間的DAG依賴關係 |
傳回值說明:
create_job 方法將返回一個CreateResponse對象, 以下是 CreateResponse 對象的屬性。可以通過 response.Id 的方式擷取新任務的 ID。
屬性 | 類型 | 描述 |
Id | str | 新任務的任務標識符 |
e.g.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from batchcompute import Client, ClientError
from batchcompute import CN_ZHANGJIAKOU as REGION
from batchcompute.resources import (
ClusterDescription, GroupDescription, Configs, Networks, VPC,
JobDescription, TaskDescription, DAG,Mounts,
AutoCluster,Disks,Notification,
)
access_key_id = "" # your access key id
access_key_secret = "" # your access key secret
image_id = "m-8vbd8lo9xxxx" # the id of a image created before,鏡像需要確保已經註冊給批次運算
instance_type = "ecs.sn1.medium" # instance type
inputOssPath = "oss://xxx/input/" # your input oss path
outputOssPath = "oss://xxx/output/" #your output oss path
stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path
def getAutoClusterDesc():
auto_desc = AutoCluster()
auto_desc.ECSImageId = image_id
#任務失敗保留環境,程式調試階段設定。環境保留費用會繼續產生請注意及時手動清除環境任務失敗保留環境,
# 程式調試階段設定。環境保留費用會繼續產生請注意及時手動清除環境
auto_desc.ReserveOnFail = False
# 執行個體規格
auto_desc.InstanceType = instance_type
#case1 設定上限價格的競價執行個體;
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotWithPriceLimit"
# auto_desc.SpotPriceLimit = 0.5
#case2 系統自動出價,最高隨用隨付價格
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotAsPriceGo"
#case3 按量
auto_desc.ResourceType = "OnDemand"
#Configs
configs = Configs()
#Configs.Networks
networks = Networks()
vpc = VPC()
#case1 只給CidrBlock
vpc.CidrBlock = '192.168.0.0/16'
#case2 CidrBlock和VpcId 都傳入,必須保證VpcId的CidrBlock 和傳入的CidrBlock保持一致
# vpc.CidrBlock = '172.26.0.0/16'
# vpc.VpcId = "vpc-8vbfxdyhxxxx"
networks.VPC = vpc
configs.Networks = networks
# 設定系統硬碟type(cloud_efficiency/cloud_ssd)以及size(單位GB)
configs.add_system_disk(size=40, type_='cloud_efficiency')
#設定資料盤type(必須和系統硬碟type保持一致)size(單位GB)掛載點
# case1 linux環境
# configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='/path/to/mount/')
# case2 windows環境
# configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='E:')
# 設定節點個數
configs.InstanceCount = 1
auto_desc.Configs = configs
return auto_desc
def getDagJobDesc(clusterId = None):
job_desc = JobDescription()
dag_desc = DAG()
mounts_desc = Mounts()
job_desc.Name = "testBatchSdkJob"
job_desc.Description = "test job"
job_desc.Priority = 1
# 訂閱job完成或者失敗事件
noti_desc = Notification()
noti_desc.Topic['Name'] = "test-topic"
noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
# job_desc.Notification = noti_desc
job_desc.JobFailOnInstanceFail = False
# 作業運行成功後戶自動會被立即釋放掉
job_desc.AutoRelease = False
job_desc.Type = "DAG"
echo_task = TaskDescription()
# 程式的輸入路徑映射,程式直接存取/home/test/input/來訪問oss://xxx/input/中的檔案
# 支援檔案掛載,在程式中直接存取檔案
# echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
# "oss://xxx/test/file": "/home/test/test/file"}
echo_task.InputMapping = {inputOssPath: "/home/test/input/"}
# 程式的輸出路徑映射,可執行程式將結果輸出到/home/test/output/,
# 程式執行完畢後批次運算將/home/test/output/中的結果上傳到oss://xxx/output/中
# 輸入和輸出oss路徑不要有交叉,如輸入為oss://xxx/input/,輸出為oss://xxx/input/output/;
# 這樣會導致未定義行為程式執行效能不能保證
echo_task.OutputMapping = {"/home/test/output/":outputOssPath}
#觸發程式啟動並執行命令列
#case1 執行linux命令列
echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"
#case2 執行Windows CMD.exe
# echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"
#case3 輸入可執行檔
# PackagePath存放commandLine中的可執行檔或者二進位包
# echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
# echo_task.Parameters.Command.CommandLine = "sh test.sh"
# 設定程式運行過程中相關環境變數資訊
echo_task.Parameters.Command.EnvVars["key1"] = "value1"
echo_task.Parameters.Command.EnvVars["key2"] = "value2"
# 設定docker參數
#case1 docker鏡像在oss registry上
# echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_IMAGE"] = "localhost:5000/yuorBucket/dockers:0.1"
# echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH"] = "oss://your-bucket/dockers"
#case2 docker鏡像在容器倉庫
# echo_task.Parameters.Command.Docker.Image = "registry.cn-beijing.aliyuncs.com/demotest/test:0.1"
# 設定程式的標準輸出地址,程式中的print列印會即時上傳到指定的oss地址
echo_task.Parameters.StdoutRedirectPath = stdoutOssPath
# 設定程式的標準錯誤輸出地址,程式拋出的異常錯誤會即時上傳到指定的oss地址
echo_task.Parameters.StderrRedirectPath = stderrOssPath
# 設定任務的逾時時間
echo_task.Timeout = 600
# 設定任務所需執行個體個數
# 環境變數BATCH_COMPUTE_INSTANCE_ID為0到InstanceCount-1
# 在執行程式中訪問BATCH_COMPUTE_INSTANCE_ID,實現資料訪問的切片實現單任務並發執行
echo_task.InstanceCount = 1
# 設定任務失敗後重試次數
echo_task.MaxRetryCount = 0
# NAS資料掛載
#採用NAS時必須保證網路和NAS在同一個VPC內
nasMountEntry = {
"Source": "nas://xxxx.nas.aliyuncs.com:/",
"Destination": "/home/mnt/",
"WriteSupport":True,
}
mounts_desc.add_entry(nasMountEntry)
mounts_desc.Locale = "utf-8"
mounts_desc.Lock = False
# echo_task.Mounts = mounts_desc
# 採用固定叢集提交作業
# echo_task.ClusterId = clusterId
#採用auto叢集提交作業
echo_task.AutoCluster = getAutoClusterDesc()
# 新增工作
dag_desc.add_task('echoTask', echo_task)
# 可以設定多個task,每個task可以根據需求進行設定各項參數
# dag_desc.add_task('echoTask2', echo_task)
# Dependencies設定多個task之間的依賴關係,echoTask2依賴echoTask;echoTask3依賴echoTask2
# dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
#
job_desc.DAG = dag_desc
return job_desc
if __name__ == "__main__":
client = Client(REGION, access_key_id, access_key_secret)
try:
job_desc = getDagJobDesc()
job_id = client.create_job(job_desc).Id
print('job created: %s' % job_id)
except ClientError,e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
Notice: 關於Mounts的注意事項 Job 層級的 Mounts 參數會覆蓋 Cluster 層級的配置資訊; Modify Cluster 後,需要調用 RecreateInstance 介面才能使新指定的 Mounts 配置生效; 掛載 NAS 需要以 nas:// 做為 source 的首碼,否則會出錯; 每個類的具體成員資訊參見以下表格
(1)JobDescription 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, JobDescription object | 包含作業描述資訊的對象 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Name | str | 作業名稱 |
2. | Description | str | 作業的簡短描述資訊 |
3. | Priority | int | 優先順序用一個[0,1000]範圍內的整數指定,數值越高表示作業調度時的優先順序越高 |
4. | Notification | dict | 訊息通知配置,可以配置 MNS 服務的 Topic 和 Job 相關事件 |
5. | JobFailOnInstanceFail | bool | Instance 失敗是否直接使 Job 失敗 |
6. | AutoRelease | boolean | 表示 Job 運行成功自動會被立即釋放(刪除)掉,預設為 False |
7. | Type | str | 目前僅支援有向非循環圖(directed acycline graph,DAG)形式描述任務 |
8. | DAG | dict, DAG object | DAG 描述 |
(2)DAG 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, DAG object | 所有任務的映射以及任務間依賴關係的描述資訊 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Tasks | dict | 所有任務名與任務描述的映射關係 |
2. | Dependencies | dict | 所有任務間的相互依賴關係 |
方法說明 :
序號 | 方法 | 描述 |
1. | add_task(task_name, task) | 增加一個任務 |
2. | get_task(task_name) | 通過任務名擷取任務資訊 |
3. | delete_task(task_name) | 刪除某個任務 |
(3) TaskDescription 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, TaskDescription object | 單個任務的描述資訊 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Parameters | dict, Parameters object | 任務參數詳情 |
2. | InputMapping | dict | OSS 到本地路徑的映射 |
3. | OutputMapping | dict | 本地路徑到 OSS 的映射 |
4. | LogMapping | dict | 本地日誌路徑對 OSS 映射 |
5. | Timeout | int | 任務逾時時間 |
6. | InstanceCount | int | 任務中執行個體的個數,正數 |
7. | MaxRetryCount | int | 最大重試次數,預設為0 |
8. | ClusterId | str | 叢集標識符 |
9. | Mounts | dict, Mounts object | 執行個體的網路掛載配置資訊,由 Mounts 描述,目前支援 NAS 和 OSS 掛載。 |
10. | AutoCluster | dict, AutoCluster object | 匿名叢集,和叢集標示符最多隻能指定一個 |
(4) Parameters 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Parameters object | 任務參數的描述資訊 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Command | dict, Command object | 使用者程式相關命令列參數 |
2. | InputMappingConfig | dict, InputMappingConfig object | NFS 掛載服務配置項 |
3. | StdoutRedirectPath | str | 標準輸出的 OSS 路徑 |
4. | StderrRedirectPath | str | 標準錯誤的 OSS 路徑 |
(5) AutoCluster 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, AutoCluster object | 匿名叢集資訊 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | ECSImageId | str | ECS 鏡像 ID,可以使用系統提供的鏡像 |
2. | InstanceType | str | 執行個體規格,執行個體類型 |
3. | ResourceType | str | 資源類型,目前支援:“OnDemand” 和 “Spot”,預設為“OnDemand” |
4. | UserData | dict | 一個 KeyValue 映射,使用者自訂的資訊,使用 ECS 的 metaserver 擷取 |
5. | Configs | Configs object | 叢集的配置資訊, 詳見4.13 節中 ClusterDescription 的介紹 |
6. | SpotStrategy | str | 執行個體的競價策略,只有在 ResourceType 為 Spot 的情況下有效。取值範圍:SpotWithPriceLimit:設定上限價格的競價執行個體; SpotAsPriceGo:系統自動出價,最高隨用隨付價格。 |
7. | SpotPriceLimit | float | 執行個體的每小時最高價格(每個執行個體規格的價格而非每核小時的價格)。支援最大 3 位小數,SpotStrategy 為 SpotWithPriceLimit 生效。 |
8. | ReserveOnFail | bool | 任務失敗時不釋放相關的虛擬機器,會繼續收取這些資源的費用直到使用者刪除作業,預設為 False,僅用於調查問題。 |
9. | DependencyIsvService | string | 執行程式依賴的阿里雲提供的ISV服務,目前提供的ISV服務有:“GTX" ,預設為””,不依賴任何ISV服務。 |
(6) Command 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Command object | 使用者程式相關命令列參數 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | CommandLine | str | 執行使用者程式的命令 |
2. | PackagePath | str | 使用者程式所在 OSS 路徑 |
3. | EnvVars | dict | 使用者程式執行時的環境變數 |
(7) InputMappingConfig 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, InputMappingConfig object | NFS 掛載服務配置項 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Locale | str | OSS object 掛載到本地時使用的字元集。可選範圍包括 GBK、GB2312-80、BIG5、ANSI、EUC-JP、EUC-TW、EUC-KR、SHIFT-JIS、KSC5601 等 |
2. | Lock | bool | NFS 掛載服務是否支援網路檔案鎖 |
(8) Notification 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Command object | 使用者程式相關命令列參數 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Topic | Topic Object | 訊息 Topic |
(9) Topic 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Command object | 使用者程式相關命令列參數 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Endpoint | str | MNS 地區 endpoint,格式如: |
2. | Name | str | Topic 名稱。 |
3. | Events | list | 事件列表,請填寫 cluster 相關的事件名。 |
(10) Mounts 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Command object | 建立叢集時的網路磁碟掛載配置資訊。 |
屬性說明:
序號 | 屬性 | 類型 | 描述 |
1. | Entries | array | 網路磁碟掛載點資訊列表, 由 MountPoint 描述。 |
2. | Locale | str | 掛載 OSS,NAS 儲存時語言選項。 |
3. | Lock | bool | 掛載 OSS,NAS 儲存時檔案鎖支援選項。 |
4. | NAS | dict | NAS 配置資訊。 |
5. | OSS | dict | OSS 配置資訊。 |
(11) MountPoint 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Command object | 網路掛載點。 |
屬性說明:
序號 | 屬性名稱 | 類型 | 描述 |
1. | Source | str | 網路磁碟掛載來源路徑,可以是 |
2. | Destination | str | 網路磁碟本地掛載點路徑。 |
3. | WriteSupport | bool | 掛載點是否可寫。 |
(12) NAS 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Command object | NAS 配置資訊。 |
屬性說明:
序號 | 屬性名稱 | 類型 | 描述 |
1. | AccessGroup | list | 需要將叢集執行個體加入到的 NAS 訪問組。 |
2. | FileSystem | list | 需要訪問的檔案系統。 |
(13) OSS 類
參數說明:
參數 | 類型 | 描述 |
properties | dict, str, Command object | OSS 配置資訊。 |
屬性說明:
序號 | 屬性名稱 | 類型 | 描述 |
1. | AccessKeyId | str | OSS掛載使用的 Access ID。 |
2. | AccessKeySecret | str | OSS掛載使用的 Access Secret。 |
3. | SecurityToken | str | OSS掛載使用的 Security Token。 |