一個作業(Job)中可以有多個任務(Task),一個任務可以指定在多個執行個體(Instance)上運行程式。
如何運行並發任務
請看下面 job description json 例子:
{
"DAG": {
...
"Tasks": {
...
"count": {
"InstanceCount": 3, //指定需要執行個體數:3台VM
"LogMapping": {},
"AutoCluster": {
"ResourceType": "OnDemand",
"ImageId": "img-ubuntu",
"InstanceType": "ecs.sn1.medium"
},
"Parameters": {
"Command": {
"EnvVars": {},
"CommandLine": "python count.py",
"PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
},
"InputMappingConfig": {
"Lock": true
},
"StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
"StderrRedirectPath": "oss://your-bucket/log-count/logs/"
},
"OutputMapping": {},
"MaxRetryCount": 0,
"Timeout": 21600,
"InputMapping": {}
}
}
},
"Description": "batchcompute job",
"Priority": 0,
"JobFailOnInstanceFail": true,
"Type": "DAG",
"Name": "log-count"
}
任務count中配置了InstanceCount為3, 表示需要執行個體數3台, 即在3台VM上運行這個任務的程式。
並發處理不同資料片段
您可以使用 環境變數同時在多台 VM 上運行同一的任務程式,但是處理不同的資料呢。例如使用 BATCH_COMPUTE_DAG_INSTANCE_ID
(實
例ID)
可以處理不同片段的資料。以下是 count.py 程式碼片段:
...
# instance_id: should start from 0
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
...
filename = 'part_%s.txt' % instance_id
...
# 1. download a part
oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
...
# 3. upload result to oss
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
oss_tool.put_data(json.dumps(m), upload_to)
...
更多詳情,請參閱 快速開始。