create_job
参数说明:
所有类型的参数将被转换为包含属性信息的字典对象。
参数 | 类型 | 描述 |
job_desc | JobDescription object, str, dict | 作业的简单描述和作业对象中各个任务的描述信息,以及各个任务之间的DAG依赖关系 |
返回值说明:
create_job 方法将返回一个CreateResponse对象, 以下是 CreateResponse 对象的属性。可以通过 response.Id 的方式获取新任务的 ID。
属性 | 类型 | 描述 |
Id | str | 新任务的任务标识符 |
e.g.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from batchcompute import Client, ClientError
from batchcompute import CN_ZHANGJIAKOU as REGION
from batchcompute.resources import (
ClusterDescription, GroupDescription, Configs, Networks, VPC,
JobDescription, TaskDescription, DAG,Mounts,
AutoCluster,Disks,Notification,
)
access_key_id = "" # your access key id
access_key_secret = "" # your access key secret
image_id = "m-8vbd8lo9xxxx" # the id of a image created before,镜像需要确保已经注册给批量计算
instance_type = "ecs.sn1.medium" # instance type
inputOssPath = "oss://xxx/input/" # your input oss path
outputOssPath = "oss://xxx/output/" #your output oss path
stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path
def getAutoClusterDesc():
auto_desc = AutoCluster()
auto_desc.ECSImageId = image_id
#任务失败保留环境,程序调试阶段设置。环境保留费用会继续产生请注意及时手动清除环境任务失败保留环境,
# 程序调试阶段设置。环境保留费用会继续产生请注意及时手动清除环境
auto_desc.ReserveOnFail = False
# 实例规格
auto_desc.InstanceType = instance_type
#case1 设置上限价格的竞价实例;
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotWithPriceLimit"
# auto_desc.SpotPriceLimit = 0.5
#case2 系统自动出价,最高按量付费价格
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotAsPriceGo"
#case3 按量
auto_desc.ResourceType = "OnDemand"
#Configs
configs = Configs()
#Configs.Networks
networks = Networks()
vpc = VPC()
#case1 只给CidrBlock
vpc.CidrBlock = '192.168.0.0/16'
#case2 CidrBlock和VpcId 都传入,必须保证VpcId的CidrBlock 和传入的CidrBlock保持一致
# vpc.CidrBlock = '172.26.0.0/16'
# vpc.VpcId = "vpc-8vbfxdyhxxxx"
networks.VPC = vpc
configs.Networks = networks
# 设置系统盘type(cloud_efficiency/cloud_ssd)以及size(单位GB)
configs.add_system_disk(size=40, type_='cloud_efficiency')
#设置数据盘type(必须和系统盘type保持一致)size(单位GB)挂载点
# case1 linux环境
# configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='/path/to/mount/')
# case2 windows环境
# configs.add_data_disk(size=40, type_='cloud_efficiency', mount_point='E:')
# 设置节点个数
configs.InstanceCount = 1
auto_desc.Configs = configs
return auto_desc
def getDagJobDesc(clusterId = None):
job_desc = JobDescription()
dag_desc = DAG()
mounts_desc = Mounts()
job_desc.Name = "testBatchSdkJob"
job_desc.Description = "test job"
job_desc.Priority = 1
# 订阅job完成或者失败事件
noti_desc = Notification()
noti_desc.Topic['Name'] = "test-topic"
noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
# job_desc.Notification = noti_desc
job_desc.JobFailOnInstanceFail = False
# 作业运行成功后户自动会被立即释放掉
job_desc.AutoRelease = False
job_desc.Type = "DAG"
echo_task = TaskDescription()
# 程序的输入路径映射,程序直接访问/home/test/input/来访问oss://xxx/input/中的文件
# 支持文件挂载,在程序中直接访问文件
# echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
# "oss://xxx/test/file": "/home/test/test/file"}
echo_task.InputMapping = {inputOssPath: "/home/test/input/"}
# 程序的输出路径映射,可执行程序将结果输出到/home/test/output/,
# 程序执行完毕后批量计算将/home/test/output/中的结果上传到oss://xxx/output/中
# 输入和输出oss路径不要有交叉,如输入为oss://xxx/input/,输出为oss://xxx/input/output/;
# 这样会导致未定义行为程序执行性能不能保证
echo_task.OutputMapping = {"/home/test/output/":outputOssPath}
#触发程序运行的命令行
#case1 执行linux命令行
echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"
#case2 执行Windows CMD.exe
# echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"
#case3 输入可执行文件
# PackagePath存放commandLine中的可执行文件或者二进制包
# echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
# echo_task.Parameters.Command.CommandLine = "sh test.sh"
# 设置程序运行过程中相关环境变量信息
echo_task.Parameters.Command.EnvVars["key1"] = "value1"
echo_task.Parameters.Command.EnvVars["key2"] = "value2"
# 设置docker参数
#case1 docker镜像在oss registry上
# echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_IMAGE"] = "localhost:5000/yuorBucket/dockers:0.1"
# echo_task.Parameters.Command.EnvVars["BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH"] = "oss://your-bucket/dockers"
#case2 docker镜像在容器仓库
# echo_task.Parameters.Command.Docker.Image = "registry.cn-beijing.aliyuncs.com/demotest/test:0.1"
# 设置程序的标准输出地址,程序中的print打印会实时上传到指定的oss地址
echo_task.Parameters.StdoutRedirectPath = stdoutOssPath
# 设置程序的标准错误输出地址,程序抛出的异常错误会实时上传到指定的oss地址
echo_task.Parameters.StderrRedirectPath = stderrOssPath
# 设置任务的超时时间
echo_task.Timeout = 600
# 设置任务所需实例个数
# 环境变量BATCH_COMPUTE_INSTANCE_ID为0到InstanceCount-1
# 在执行程序中访问BATCH_COMPUTE_INSTANCE_ID,实现数据访问的切片实现单任务并发执行
echo_task.InstanceCount = 1
# 设置任务失败后重试次数
echo_task.MaxRetryCount = 0
# NAS数据挂载
#采用NAS时必须保证网络和NAS在同一个VPC内
nasMountEntry = {
"Source": "nas://xxxx.nas.aliyuncs.com:/",
"Destination": "/home/mnt/",
"WriteSupport":True,
}
mounts_desc.add_entry(nasMountEntry)
mounts_desc.Locale = "utf-8"
mounts_desc.Lock = False
# echo_task.Mounts = mounts_desc
# 采用固定集群提交作业
# echo_task.ClusterId = clusterId
#采用auto集群提交作业
echo_task.AutoCluster = getAutoClusterDesc()
# 添加任务
dag_desc.add_task('echoTask', echo_task)
# 可以设置多个task,每个task可以根据需求进行设置各项参数
# dag_desc.add_task('echoTask2', echo_task)
# Dependencies设置多个task之间的依赖关系,echoTask2依赖echoTask;echoTask3依赖echoTask2
# dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
#
job_desc.DAG = dag_desc
return job_desc
if __name__ == "__main__":
client = Client(REGION, access_key_id, access_key_secret)
try:
job_desc = getDagJobDesc()
job_id = client.create_job(job_desc).Id
print('job created: %s' % job_id)
except ClientError,e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
Notice: 关于Mounts的注意事项 Job 级别的 Mounts 参数会覆盖 Cluster 级别的配置信息; Modify Cluster 后,需要调用 RecreateInstance 接口才能使新指定的 Mounts 配置生效; 挂载 NAS 需要以 nas:// 做为 source 的前缀,否则会出错; 每个类的具体成员信息参见以下表格
(1)JobDescription 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, JobDescription object | 包含作业描述信息的对象 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Name | str | 作业名称 |
2. | Description | str | 作业的简短描述信息 |
3. | Priority | int | 优先级用一个[0,1000]范围内的整数指定,数值越高表示作业调度时的优先级越高 |
4. | Notification | dict | 消息通知配置,可以配置 MNS 服务的 Topic 和 Job 相关事件 |
5. | JobFailOnInstanceFail | bool | Instance 失败是否直接使 Job 失败 |
6. | AutoRelease | boolean | 表示 Job 运行成功自动会被立即释放(删除)掉,默认为 False |
7. | Type | str | 目前仅支持有向无环图(directed acycline graph,DAG)形式描述任务 |
8. | DAG | dict, DAG object | DAG 描述 |
(2)DAG 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, DAG object | 所有任务的映射以及任务间依赖关系的描述信息 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Tasks | dict | 所有任务名与任务描述的映射关系 |
2. | Dependencies | dict | 所有任务间的相互依赖关系 |
方法说明 :
序号 | 方法 | 描述 |
1. | add_task(task_name, task) | 增加一个任务 |
2. | get_task(task_name) | 通过任务名获取任务信息 |
3. | delete_task(task_name) | 删除某个任务 |
(3) TaskDescription 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, TaskDescription object | 单个任务的描述信息 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Parameters | dict, Parameters object | 任务参数详情 |
2. | InputMapping | dict | OSS 到本地路径的映射 |
3. | OutputMapping | dict | 本地路径到 OSS 的映射 |
4. | LogMapping | dict | 本地日志路径对 OSS 映射 |
5. | Timeout | int | 任务超时时间 |
6. | InstanceCount | int | 任务中实例的个数,正数 |
7. | MaxRetryCount | int | 最大重试次数,默认为0 |
8. | ClusterId | str | 集群标识符 |
9. | Mounts | dict, Mounts object | 实例的网络挂载配置信息,由 Mounts 描述,目前支持 NAS 和 OSS 挂载。 |
10. | AutoCluster | dict, AutoCluster object | 匿名集群,和集群标示符最多只能指定一个 |
(4) Parameters 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Parameters object | 任务参数的描述信息 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Command | dict, Command object | 用户程序相关命令行参数 |
2. | InputMappingConfig | dict, InputMappingConfig object | NFS 挂载服务配置项 |
3. | StdoutRedirectPath | str | 标准输出的 OSS 路径 |
4. | StderrRedirectPath | str | 标准错误的 OSS 路径 |
(5) AutoCluster 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, AutoCluster object | 匿名集群信息 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | ECSImageId | str | ECS 镜像 ID,可以使用系统提供的镜像 |
2. | InstanceType | str | 实例规格,实例类型 |
3. | ResourceType | str | 资源类型,目前支持:“OnDemand” 和 “Spot”,默认为“OnDemand” |
4. | UserData | dict | 一个 KeyValue 映射,用户自定义的信息,使用 ECS 的 metaserver 获取 |
5. | Configs | Configs object | 集群的配置信息, 详见4.13 节中 ClusterDescription 的介绍 |
6. | SpotStrategy | str | 实例的竞价策略,只有在 ResourceType 为 Spot 的情况下有效。取值范围:SpotWithPriceLimit:设置上限价格的竞价实例; SpotAsPriceGo:系统自动出价,最高按量付费价格。 |
7. | SpotPriceLimit | float | 实例的每小时最高价格(每个实例规格的价格而非每核小时的价格)。支持最大 3 位小数,SpotStrategy 为 SpotWithPriceLimit 生效。 |
8. | ReserveOnFail | bool | 任务失败时不释放相关的虚拟机,会继续收取这些资源的费用直到用户删除作业,默认为 False,仅用于调查问题。 |
9. | DependencyIsvService | string | 执行程序依赖的阿里云提供的ISV服务,目前提供的ISV服务有:“GTX" ,默认为””,不依赖任何ISV服务。 |
(6) Command 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Command object | 用户程序相关命令行参数 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | CommandLine | str | 执行用户程序的命令 |
2. | PackagePath | str | 用户程序所在 OSS 路径 |
3. | EnvVars | dict | 用户程序执行时的环境变量 |
(7) InputMappingConfig 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, InputMappingConfig object | NFS 挂载服务配置项 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Locale | str | OSS object 挂载到本地时使用的字符集。可选范围包括 GBK、GB2312-80、BIG5、ANSI、EUC-JP、EUC-TW、EUC-KR、SHIFT-JIS、KSC5601 等 |
2. | Lock | bool | NFS 挂载服务是否支持网络文件锁 |
(8) Notification 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Command object | 用户程序相关命令行参数 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Topic | Topic Object | 消息 Topic |
(9) Topic 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Command object | 用户程序相关命令行参数 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Endpoint | str | MNS 区域 endpoint,格式如: |
2. | Name | str | Topic 名称。 |
3. | Events | list | 事件列表,请填写 cluster 相关的事件名。 |
(10) Mounts 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Command object | 创建集群时的网络磁盘挂载配置信息。 |
属性说明:
序号 | 属性 | 类型 | 描述 |
1. | Entries | array | 网络磁盘挂载点信息列表, 由 MountPoint 描述。 |
2. | Locale | str | 挂载 OSS,NAS 存储时语言选项。 |
3. | Lock | bool | 挂载 OSS,NAS 存储时文件锁支持选项。 |
4. | NAS | dict | NAS 配置信息。 |
5. | OSS | dict | OSS 配置信息。 |
(11) MountPoint 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Command object | 网络挂载点。 |
属性说明:
序号 | 属性名称 | 类型 | 描述 |
1. | Source | str | 网络磁盘挂载来源路径,可以是 |
2. | Destination | str | 网络磁盘本地挂载点路径。 |
3. | WriteSupport | bool | 挂载点是否可写。 |
(12) NAS 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Command object | NAS 配置信息。 |
属性说明:
序号 | 属性名称 | 类型 | 描述 |
1. | AccessGroup | list | 需要将集群实例加入到的 NAS 访问组。 |
2. | FileSystem | list | 需要访问的文件系统。 |
(13) OSS 类
参数说明:
参数 | 类型 | 描述 |
properties | dict, str, Command object | OSS 配置信息。 |
属性说明:
序号 | 属性名称 | 类型 | 描述 |
1. | AccessKeyId | str | OSS挂载使用的 Access ID。 |
2. | AccessKeySecret | str | OSS挂载使用的 Access Secret。 |
3. | SecurityToken | str | OSS挂载使用的 Security Token。 |