一个作业(Job)中可以有多个任务(Task),一个任务可以指定在多个实例(Instance)上运行程序。
如何运行并发任务
请看下面 job description json 例子:
{
"DAG": {
...
"Tasks": {
...
"count": {
"InstanceCount": 3, //指定需要实例数:3台VM
"LogMapping": {},
"AutoCluster": {
"ResourceType": "OnDemand",
"ImageId": "img-ubuntu",
"InstanceType": "ecs.sn1.medium"
},
"Parameters": {
"Command": {
"EnvVars": {},
"CommandLine": "python count.py",
"PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
},
"InputMappingConfig": {
"Lock": true
},
"StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
"StderrRedirectPath": "oss://your-bucket/log-count/logs/"
},
"OutputMapping": {},
"MaxRetryCount": 0,
"Timeout": 21600,
"InputMapping": {}
}
}
},
"Description": "batchcompute job",
"Priority": 0,
"JobFailOnInstanceFail": true,
"Type": "DAG",
"Name": "log-count"
}
任务count中配置了InstanceCount为3, 表示需要实例数3台, 即在3台VM上运行这个任务的程序。
并发处理不同数据片段
您可以使用 环境变量同时在多台 VM 上运行同一的任务程序,但是处理不同的数据呢。例如使用 BATCH_COMPUTE_DAG_INSTANCE_ID
(实
例ID)
可以处理不同片段的数据。以下是 count.py 代码片段:
...
# instance_id: should start from 0
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
...
filename = 'part_%s.txt' % instance_id
...
# 1. download a part
oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
...
# 3. upload result to oss
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
oss_tool.put_data(json.dumps(m), upload_to)
...
更多详情,请参阅 快速开始。