接口
createJob
参数
参数 | 类型 | 是否必需 | 说明 |
jobDescription | JobDescription | 是 | Job对象中有各个任务的描述信息,和任务的DAG依赖。 |
jobDescription 的具体属性信息参考DAG作业和APP作业。
返回值
创建成功后返回一个 CreateJobResponse
实例,可以通过 response.getJobId()
获取创建的作业ID。创建失败后,抛出异常: ClientException
。
例子
Java 源码
import com.aliyuncs.batchcompute.main.v20151111.*;
import com.aliyuncs.batchcompute.model.v20151111.*;
import com.aliyuncs.batchcompute.pojo.v20151111.*;
import com.aliyuncs.exceptions.ClientException;
public class CreateAppJob {
static String ACCESS_KEY_ID = "xxx"; //这里填写您的 AccessKeyId
static String ACCESS_KEY_SECRET = "xxx"; //这里填写您的 AccessKeySecret
static String REGION_ID = "cn-xxx"; //这里填写 region
static String ClusterId = "cls-xxx"; //提交DAG固定集群作业需要修改其他场景不需要修改
static boolean IS_DAG_JOB = true; //APP作业和DAG作业开关,默认提交DAG作业
static boolean IS_AUTO_CLUSTER = true; //固定集群和非固定集群作业开关,默认提交固定集群作业
public static void main(String[] args) {
BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
try {
JobDescription jobDescription = getJobDesc();
CreateJobResponse response = client.createJob(jobDescription);
String jobId = response.getJobId();
//创建成功
System.out.println("jobId:" + jobId);
System.out.println("RequestId: " + response.getRequestId());
System.out.println("StatusCode: " + response.getStatusCode());
} catch (ClientException e) {
e.printStackTrace();
//创建失败
}
}
private static JobDescription getJobDesc() {
JobDescription desc = new JobDescription();
desc.setName("javaSdkJob");
desc.setDescription("javaSdkJob");
//设置作业优先级
desc.setPriority(1);
desc.setJobFailOnInstanceFail(true);
desc.setAutoRelease(false);
if (IS_DAG_JOB) {
//设置 DAG task
desc.setType("DAG");
desc.setDag(getDagDesc());
}else{
//设置 APP task
desc.setType("App");
desc.setApp(getAppJobDescription());
}
//根据业务需要设置订阅作业实践
// Notification noti = new Notification();
// Topic topic = new Topic();
// topic.addEvent(Topic.ON_JOB_FAILED);
// topic.addEvent(Topic.ON_JOB_FINISHED);
// noti.setTopic(topic);
// topic.setName("tp_n1");
// topic.setEndpoint("xxxxx");
// desc.setNotification(noti);
return desc;
}
private static AppJobDescription getAppJobDescription() {
AppJobDescription appJobDescription = new AppJobDescription();
appJobDescription.setAppName("JavaSdkApp");
appJobDescription.addInputs("inputFile", "oss://test/input/cromwell_app.txt");
appJobDescription.addOutputs("outputFile", "oss://test/output/ret/");
AppJobDescription.Logging logging = new AppJobDescription.Logging();
logging.setStderrPath("oss://test/output/error/");
logging.setStdoutPath("oss://test/output/log/");
appJobDescription.setLogging(logging);
appJobDescription.addConfig("ResourceType", "OnDemand");
appJobDescription.addConfig("InstanceType", "ecs.sn2ne.large");
appJobDescription.addConfig("InstanceCount", 1);
appJobDescription.addConfig("MinDiskSize", 40);
appJobDescription.addConfig("DiskType", "cloud_efficiency");
appJobDescription.addConfig("MaxRetryCount", 1);
appJobDescription.addConfig("Timeout", 1000);
appJobDescription.addConfig("ReserveOnFail", true);
appJobDescription.addConfig("ClassicNetwork", false);//设置集群网络方式,false为VPC组网
appJobDescription.addConfig("MinDataDiskSize", 40);
//注意磁盘类型和 DiskType 保持一致
appJobDescription.addConfig("DataDiskType", "cloud_efficiency");
//挂载点根据需要做修改,windows 为 “E:, F:, G: 等”
appJobDescription.addConfig("DataDiskMountPoint", "/home/mount/");
return appJobDescription;
}
private static DAG getDagDesc() {
DAG dag = new DAG();
TaskDescription task = new TaskDescription();
task.setTaskName("javaSdkTask");
//设置实例信息
task.setInstanceCount(1);
if (IS_AUTO_CLUSTER){
//设置Auto cluster
task.setAutoCluster(getAutoCluster());
}else{
//设置固定集群信息
task.setClusterId(ClusterId);
}
task.setMaxRetryCount(2);
task.setTimeout(10000);
Parameters parameters = new Parameters();
Command cmd = new Command();
//设置程序启动命令
cmd.setCommandLine("python runtask.py 顿雳意当更冁");
//设置程序启动脚本或者执行文件地址
cmd.setPackagePath("oss://yuanhyyshenzhen/test/installpackage/runtask.tar.gz");
//docker 镜像设置方式:推荐使用容器镜像模式
//1、镜像在oss registry上,设置docker的方式
//oss registry模式,参数设置好后自行打开注释
//cmd.addEnvVars("BATCH_COMPUTE_DOCKER_IMAGE", "localhost:5000/yuorBucket/dockers:0.1");//镜像名称;
//cmd.addEnvVars("BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH", "oss://your-bucket/dockers");//设置OSS地址
//2、镜像在容器镜像仓库,设置docker方式
//Command.Docker docker = new Command.Docker();
//docker.setImage("registry.cn-beijing.aliyuncs.com/demotest/test:0.1");
//cmd.setDocker(docker);
parameters.setCommand(cmd);
//设置标准输出,上传的OSS路径
parameters.setStderrRedirectPath("oss://test/output/error/");
parameters.setStdoutRedirectPath("oss://test/output/log/");
InputMappingConfig input = new InputMappingConfig();
input.setLocale("GBK");
input.setLock(true);
parameters.setInputMappingConfig(input);
task.setParameters(parameters);
//设置输入OSS路径和本地路径关系
task.addInputMapping("oss://test/input/", "/home/admin/disk1/");
//设置输出本地路径和OSS地址
task.addOutputMapping("/home/admin/disk2/", "oss://test/output/ret/");
//设置挂载信息
Mounts mounts = new Mounts();
MountEntry mountEntry = new MountEntry();
mountEntry.setDestination("/home/mount");
mountEntry.setSource("oss://test/mount/");
mountEntry.setWriteSupport(false);
mounts.setCacheSupport(false);
//windows set GBK; Liux set utf-8
//mounts.setLocale("GBK");
mounts.setLock(false);
mounts.addEntries(mountEntry);
//task.setMounts(mounts);
dag.addTask(task);
return dag;
}
private static AutoCluster getAutoCluster() {
AutoCluster autoCluster = new AutoCluster();
//设置集群镜像信息ECSImageId 在不同region可能会发生变化
//autoCluster.setECSImageId("m-wz9dk5nao5z3fw6bo9k6");
//建议使用setImageId接口设置
autoCluster.setImageId("img-ubuntu");
autoCluster.setInstanceType("ecs.s3.large");
autoCluster.setReserveOnFail(true);
//设置资源类型只有ResourceType为Spot的情况下后面两项有效
autoCluster.setResourceType("OnDemand");
//autoCluster.setSpotPriceLimit(5.6f);
//autoCluster.setSpotStrategy("Spot");
//设置config信息
autoCluster.setConfigs(getConfigDesc());
return autoCluster;
}
private static Configs getConfigDesc() {
Configs configs = new Configs();
//设置系统磁盘类型以及大型
Disks disks = new Disks();
SystemDisk systemDisk = new SystemDisk();
systemDisk.setSize(40);//GB
systemDisk.setType("cloud_efficiency");
disks.setSystemDisk(systemDisk);
DataDisk dataDisk = new DataDisk();
dataDisk.setMountPoint("/home/dataDisk/");
dataDisk.setSize(40);
dataDisk.setType("cloud_efficiency");
disks.setDataDisk(dataDisk);
configs.setDisks(disks);
//设置网络类型
Networks networks = new Networks();
VPC vpc = new VPC();
vpc.setCidrBlock("10.0.0.0/12");
networks.setVpc(vpc);
configs.setNetworks(networks);
return configs;
}
}
执行结果:
```JSON
{
jobId: job-000000005BE3E897000007FA00114EE9
RequestId: null
StatusCode: 201
}
注意
本实例代码支持提交 APP 和 DAG 类型作业,支持 AutoCluster 和固定集群类型的作业,提交作业之前根据业务需要修改开关(
IS_DAG_JOB
和IS_AUTO_CLUSTER
)即可。若是提交 APP 类型作业,需要在提交作业之前创建 APP,然后根据 APP 的创建参数做对应修改作业参数,最后进行作业提交。
提交固定集群作业之前需要先创建集群,修改
ClusterId
为新创建的集群,然后提交作业。提交作业前,请确保 OSS 地址填写正确并且已经上传输入或者执行文件到对应的 OSS 路径