すべてのプロダクト
Search
ドキュメントセンター

Batch Compute:並列タスク

最終更新日:Dec 28, 2024

1 つのジョブに複数のタスクを含めることができ、1 つのタスクプログラムを複数のインスタンスで実行できます。

タスクを並行して実行する方法

JSON 形式の次のジョブ記述例を参照してください。

{
    "DAG": {
        ...
        "Tasks": {
            ...
            "count": {
                "InstanceCount": 3,  //インスタンス数を指定します。3 つの VM です。
                "LogMapping": {},
                "AutoCluster": {
                    "ResourceType": "OnDemand",
                    "ImageId": "img-ubuntu",
                    "InstanceType": "ecs.sn1.medium"
                },
                "Parameters": {
                    "Command": {
                        "EnvVars": {},
                        "CommandLine": "python count.py",
                        "PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
                    },
                    "InputMappingConfig": {
                        "Lock": true
                    },
                    "StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
                    "StderrRedirectPath": "oss://your-bucket/log-count/logs/"
                },
                "OutputMapping": {},
                "MaxRetryCount": 0,
                "Timeout": 21600,
                "InputMapping": {}
            }
        }
    },
    "Description": "batchcompute job",
    "Priority": 0,
    "JobFailOnInstanceFail": true,
    "Type": "DAG",
    "Name": "log-count"
}

タスクの InstanceCount は 3 に設定されています。これは、3 つのインスタンス、つまりタスクプログラムが 3 つの仮想マシン (VM) で実行されることを意味します。

異なるセグメントのデータを並行して処理する

同じタスクプログラムを実行して、複数の VM 上で異なるデータを処理できます。タスクプログラムでは、環境変数 BATCH_COMPUTE_DAG_INSTANCE_ID(インスタンス ID) を分割に使用して、タスクプログラムが異なるセグメントのデータを処理するようにします。以下は count.py のコードスニペットです。

...
# instance_id: 0 から開始する必要があります
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']

...

filename = 'part_%s.txt' %  instance_id
...

# 1. 一部をダウンロードする
oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)


...
# 3. 結果を OSS にアップロードする
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
oss_tool.put_data(json.dumps(m), upload_to)
...

詳細については、クイックスタート例を参照してください。