全部產品
Search
文件中心

:建立(提交)作業

更新時間:Jul 06, 2024

介面

createJob

參數

參數

類型

是否必需

說明

jobDescription

JobDescription

Job對象中有各個任務的描述資訊,和任務的DAG依賴。

jobDescription 的具體屬性資訊參考DAG作業APP作業

傳回值

建立成功後返回一個 CreateJobResponse 執行個體,可以通過 response.getJobId() 擷取建立的作業ID。建立失敗後,拋出異常: ClientException

例子

Java 源碼

import com.aliyuncs.batchcompute.main.v20151111.*;
import com.aliyuncs.batchcompute.model.v20151111.*;
import com.aliyuncs.batchcompute.pojo.v20151111.*;
import com.aliyuncs.exceptions.ClientException;

public class CreateAppJob {
    static String ACCESS_KEY_ID = "xxx";  //這裡填寫您的 AccessKeyId
    static String ACCESS_KEY_SECRET = "xxx"; //這裡填寫您的 AccessKeySecret
    static String REGION_ID = "cn-xxx";   //這裡填寫 region
    static String ClusterId = "cls-xxx"; //提交DAG固定叢集作業需要修改其他情境不需要修改

    static boolean IS_DAG_JOB = true; //APP作業和DAG作業開關,預設提交DAG作業
    static boolean IS_AUTO_CLUSTER = true; //固定叢集和非固定叢集作業開關,預設提交固定叢集作業

    public static void main(String[] args) {
        BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
        try {
            JobDescription jobDescription = getJobDesc();

            CreateJobResponse response = client.createJob(jobDescription);
            String jobId = response.getJobId();
            //建立成功

            System.out.println("jobId:" + jobId);
            System.out.println("RequestId: " + response.getRequestId());
            System.out.println("StatusCode: " + response.getStatusCode());
        } catch (ClientException e) {
            e.printStackTrace();
            //建立失敗
        }
    }

    private static JobDescription getJobDesc() {
        JobDescription desc = new JobDescription();

        desc.setName("javaSdkJob");
        desc.setDescription("javaSdkJob");

        //設定作業優先順序
        desc.setPriority(1);

        desc.setJobFailOnInstanceFail(true);
        desc.setAutoRelease(false);

        if (IS_DAG_JOB) {
            //設定 DAG task
            desc.setType("DAG");
            desc.setDag(getDagDesc());
        }else{
            //設定 APP task
            desc.setType("App");
            desc.setApp(getAppJobDescription());
        }

        //根據業務需要設定訂閱作業實踐
//        Notification noti = new Notification();
//        Topic topic = new Topic();
//        topic.addEvent(Topic.ON_JOB_FAILED);
//        topic.addEvent(Topic.ON_JOB_FINISHED);
//        noti.setTopic(topic);
//        topic.setName("tp_n1");
//        topic.setEndpoint("xxxxx");
//        desc.setNotification(noti);

        return desc;
    }
    private static AppJobDescription getAppJobDescription() {
        AppJobDescription appJobDescription = new AppJobDescription();

        appJobDescription.setAppName("JavaSdkApp");
        appJobDescription.addInputs("inputFile", "oss://test/input/cromwell_app.txt");
        appJobDescription.addOutputs("outputFile", "oss://test/output/ret/");

        AppJobDescription.Logging logging = new AppJobDescription.Logging();
        logging.setStderrPath("oss://test/output/error/");
        logging.setStdoutPath("oss://test/output/log/");
        appJobDescription.setLogging(logging);

        appJobDescription.addConfig("ResourceType", "OnDemand");
        appJobDescription.addConfig("InstanceType", "ecs.sn2ne.large");
        appJobDescription.addConfig("InstanceCount", 1);

        appJobDescription.addConfig("MinDiskSize", 40);
        appJobDescription.addConfig("DiskType", "cloud_efficiency");
        appJobDescription.addConfig("MaxRetryCount", 1);
        appJobDescription.addConfig("Timeout", 1000);
        appJobDescription.addConfig("ReserveOnFail", true);
        appJobDescription.addConfig("ClassicNetwork", false);//設定叢集網路方式,false為VPC組網

        appJobDescription.addConfig("MinDataDiskSize", 40);
        //注意磁碟類型和 DiskType 保持一致
        appJobDescription.addConfig("DataDiskType", "cloud_efficiency");
        //掛載點根據需要做修改,windows 為 “E:, F:, G: 等”
        appJobDescription.addConfig("DataDiskMountPoint", "/home/mount/");
        return appJobDescription;
    }
    private static DAG getDagDesc() {
        DAG dag = new DAG();
        TaskDescription task = new TaskDescription();

        task.setTaskName("javaSdkTask");

        //設定執行個體資訊
        task.setInstanceCount(1);

        if (IS_AUTO_CLUSTER){
            //設定Auto cluster
            task.setAutoCluster(getAutoCluster());
        }else{
            //設定固定叢集資訊
            task.setClusterId(ClusterId);
        }

        task.setMaxRetryCount(2);
        task.setTimeout(10000);

        Parameters parameters = new Parameters();
        Command cmd = new Command();
        //設定程式啟動命令
        cmd.setCommandLine("python runtask.py 頓靂意當更囅");
        //設定程式啟動指令碼或者執行檔案地址
        cmd.setPackagePath("oss://yuanhyyshenzhen/test/installpackage/runtask.tar.gz");

        //docker 鏡像設定方式:推薦使用容器鏡像模式
        //1、鏡像在oss registry上,設定docker的方式
        //oss registry模式,參數設定好後自行開啟注釋
        //cmd.addEnvVars("BATCH_COMPUTE_DOCKER_IMAGE", "localhost:5000/yuorBucket/dockers:0.1");//鏡像名稱;
        //cmd.addEnvVars("BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH", "oss://your-bucket/dockers");//設定OSS地址

        //2、鏡像在容器鏡像倉庫,設定docker方式
        //Command.Docker docker = new Command.Docker();
        //docker.setImage("registry.cn-beijing.aliyuncs.com/demotest/test:0.1");
        //cmd.setDocker(docker);

        parameters.setCommand(cmd);

        //設定標準輸出,上傳的OSS路徑
        parameters.setStderrRedirectPath("oss://test/output/error/");
        parameters.setStdoutRedirectPath("oss://test/output/log/");

        InputMappingConfig input = new InputMappingConfig();
        input.setLocale("GBK");
        input.setLock(true);
        parameters.setInputMappingConfig(input);

        task.setParameters(parameters);

        //設定輸入OSS路徑和本地路徑關係
        task.addInputMapping("oss://test/input/", "/home/admin/disk1/");

        //設定輸出本地路徑和OSS地址
        task.addOutputMapping("/home/admin/disk2/", "oss://test/output/ret/");

        //設定掛載資訊
        Mounts mounts = new Mounts();

        MountEntry mountEntry = new MountEntry();
        mountEntry.setDestination("/home/mount");
        mountEntry.setSource("oss://test/mount/");
        mountEntry.setWriteSupport(false);

        mounts.setCacheSupport(false);

        //windows set GBK; Liux set utf-8
        //mounts.setLocale("GBK");
        mounts.setLock(false);
        mounts.addEntries(mountEntry);

        //task.setMounts(mounts);

        dag.addTask(task);
        return dag;
    }
    private static AutoCluster getAutoCluster() {
        AutoCluster autoCluster = new AutoCluster();

        //設定叢集鏡像資訊ECSImageId 在不同region可能會發生變化
        //autoCluster.setECSImageId("m-wz9dk5nao5z3fw6bo9k6");
        //建議使用setImageId介面設定
        autoCluster.setImageId("img-ubuntu");

        autoCluster.setInstanceType("ecs.s3.large");
        autoCluster.setReserveOnFail(true);

        //設定資源類型只有ResourceType為Spot的情況下後面兩項有效
        autoCluster.setResourceType("OnDemand");
        //autoCluster.setSpotPriceLimit(5.6f);
        //autoCluster.setSpotStrategy("Spot");

        //設定config資訊
        autoCluster.setConfigs(getConfigDesc());

        return autoCluster;
    }
    private static Configs getConfigDesc() {
        Configs configs = new Configs();

        //設定系統磁碟類型以及大型
        Disks disks = new Disks();
        SystemDisk systemDisk = new SystemDisk();
        systemDisk.setSize(40);//GB
        systemDisk.setType("cloud_efficiency");
        disks.setSystemDisk(systemDisk);

        DataDisk dataDisk = new DataDisk();
        dataDisk.setMountPoint("/home/dataDisk/");
        dataDisk.setSize(40);
        dataDisk.setType("cloud_efficiency");
        disks.setDataDisk(dataDisk);

        configs.setDisks(disks);

        //設定網路類型
        Networks networks = new Networks();
        VPC vpc = new VPC();
        vpc.setCidrBlock("10.0.0.0/12");
        networks.setVpc(vpc);
        configs.setNetworks(networks);

        return configs;
    }
}
執行結果:
```JSON
{
    jobId: job-000000005BE3E897000007FA00114EE9
    RequestId: null
    StatusCode: 201
}

注意

  1. 本執行個體代碼支援提交 APP 和 DAG 類型作業,支援 AutoCluster 和固定叢集類型的作業,提交作業之前根據業務需要修改開關(IS_DAG_JOBIS_AUTO_CLUSTER)即可。

  2. 若是提交 APP 類型作業,需要在提交作業之前建立 APP,然後根據 APP 的建立參數做對應修改作業參數,最後進行作業提交。

  3. 提交固定叢集作業之前需要先建立叢集,修改 ClusterId 為新建立的叢集,然後提交作業。

  4. 提交作業前,請確保 OSS 地址填寫正確並且已經上傳輸入或者執行檔案到對應的 OSS 路徑