All Products
Search
Document Center

Batch Compute:Concurrent tasks

Last Updated:May 21, 2024

One job can have multiple tasks and one task program can be run in multiple instances.

How to run tasks concurrently?

See the following job description example in JSON format:

{
    "DAG": {
        ...
        "Tasks": {
            ...
            "count": {
                "InstanceCount": 3,  //Specifies the number of instances: three VMs
                "LogMapping": {},
                "AutoCluster": {
                    "ResourceType": "OnDemand",
                    "ImageId": "img-ubuntu",
                    "InstanceType": "ecs.sn1.medium"
                },
                "Parameters": {
                    "Command": {
                        "EnvVars": {},
                        "CommandLine": "python count.py",
                        "PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
                    },
                    "InputMappingConfig": {
                        "Lock": true
                    },
                    "StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
                    "StderrRedirectPath": "oss://your-bucket/log-count/logs/"
                },
                "OutputMapping": {},
                "MaxRetryCount": 0,
                "Timeout": 21600,
                "InputMapping": {}
            }
        }
    },
    "Description": "batchcompute job",
    "Priority": 0,
    "JobFailOnInstanceFail": true,
    "Type": "DAG",
    "Name": "log-count"
}

The InstanceCount in the task is set to 3, which means three instances are needed, that is, the task program runs on three virtual machines (VMs).

Process data of different segments concurrently

You can run one same task program to process different data on several VMs. In the task program, use an environment variable BATCH_COMPUTE_DAG_INSTANCE_ID(instance ID) for division to make the task program process data of different segments. The following is a count.py code snippet:

...
# instance_id: should start from 0
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']

...

filename = 'part_%s.txt' %  instance_id
...

# 1. download a part
oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)


...
# 3. upload result to oss
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
oss_tool.put_data(json.dumps(m), upload_to)
...

For more information, see Quick start example.