1 つのジョブに複数のタスクを含めることができ、1 つのタスクプログラムを複数のインスタンスで実行できます。
タスクを並行して実行する方法
JSON 形式の次のジョブ記述例を参照してください。
{
"DAG": {
...
"Tasks": {
...
"count": {
"InstanceCount": 3, //インスタンス数を指定します。3 つの VM です。
"LogMapping": {},
"AutoCluster": {
"ResourceType": "OnDemand",
"ImageId": "img-ubuntu",
"InstanceType": "ecs.sn1.medium"
},
"Parameters": {
"Command": {
"EnvVars": {},
"CommandLine": "python count.py",
"PackagePath": "oss://your-bucket/log-count/worker.tar.gz"
},
"InputMappingConfig": {
"Lock": true
},
"StdoutRedirectPath": "oss://your-bucket/log-count/logs/",
"StderrRedirectPath": "oss://your-bucket/log-count/logs/"
},
"OutputMapping": {},
"MaxRetryCount": 0,
"Timeout": 21600,
"InputMapping": {}
}
}
},
"Description": "batchcompute job",
"Priority": 0,
"JobFailOnInstanceFail": true,
"Type": "DAG",
"Name": "log-count"
}
タスクの InstanceCount
は 3 に設定されています。これは、3 つのインスタンス、つまりタスクプログラムが 3 つの仮想マシン (VM) で実行されることを意味します。
異なるセグメントのデータを並行して処理する
同じタスクプログラムを実行して、複数の VM 上で異なるデータを処理できます。タスクプログラムでは、環境変数 BATCH_COMPUTE_DAG_INSTANCE_ID(インスタンス ID)
を分割に使用して、タスクプログラムが異なるセグメントのデータを処理するようにします。以下は count.py のコードスニペットです。
...
# instance_id: 0 から開始する必要があります
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
...
filename = 'part_%s.txt' % instance_id
...
# 1. 一部をダウンロードする
oss_tool.download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
...
# 3. 結果を OSS にアップロードする
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
oss_tool.put_data(json.dumps(m), upload_to)
...
詳細については、クイックスタート例を参照してください。