全部產品
Search
文件中心

Batch Compute:並發任務

更新時間:Jul 06, 2024

一個作業(Job)中可以有多個任務(Task),一個任務可以指定在多個執行個體(Instance)上運行程式。

如何運行並發任務

請看下面 job description json 例子:

{
    "DAG": {
        ...
        "Tasks": {
            ...
            "count": {
                "InstanceCount": 3,  //指定需要執行個體數:3台VM
                "LogMapping": {},
                "AutoCluster": {
                    "ResourceType": "OnDemand",
                    "ImageId": "img-ubuntu",
                    "InstanceType": "ecs.sn1.medium"
                },
                "Parameters": {
                    "Command": {
                        "EnvVars": {},
                        "CommandLine": "python count.py",
                        "PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
                    },
                    "InputMappingConfig": {
                        "Lock": true
                    },
                    "StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
                    "StderrRedirectPath": "oss://your-bucket/log-count/logs/"
                },
                "OutputMapping": {},
                "MaxRetryCount": 0,
                "Timeout": 21600,
                "InputMapping": {}
            }
        }
    },
    "Description": "batchcompute job",
    "Priority": 0,
    "JobFailOnInstanceFail": true,
    "Type": "DAG",
    "Name": "log-count"
}

任務count中配置了InstanceCount為3, 表示需要執行個體數3台, 即在3台VM上運行這個任務的程式。

並發處理不同資料片段

您可以使用 環境變數同時在多台 VM 上運行同一的任務程式,但是處理不同的資料呢。例如使用 BATCH_COMPUTE_DAG_INSTANCE_ID(實例ID) 可以處理不同片段的資料。以下是 count.py 程式碼片段:

...
# instance_id: should start from 0
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']

...

filename = 'part_%s.txt' %  instance_id
...

# 1. download a part
oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)


...
# 3. upload result to oss
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
oss_tool.put_data(json.dumps(m), upload_to)
...

更多詳情,請參閱 快速開始