全部產品
Search
文件中心

:Java SDK 快速開始

更新時間:Jul 06, 2024

本文檔將介紹如何使用 Java 版 SDK 來提交一個作業,目的是統計一個記錄檔中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

步驟

  • 作業準備

    • 上傳資料檔案到 OSS

    • 使用範例程式碼

    • 編譯打包

    • 上傳到 OSS

  • 使用 SDK 建立(提交)作業

  • 查看結果

1. 作業準備

本作業是統計一個記錄檔中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數。

該作業包含 3 個任務:split,count 和 merge:

  • split 任務會把記錄檔分成 3 份。

  • count 任務會統計每份記錄檔中“INFO”,”WARN”,”ERROR”,”DEBUG”出現的次數(count 任務需要配置 InstanceCount 為 3,表示同時啟動 3 台機器運行 count 程式)。

  • merge 任務會把 count 任務的結果統一合并起來。

DAG圖例:

DAG

(1) 上傳資料檔案到OSS

下載本例所需的資料:log-count-data.txt

將 log-count-data.txt 上傳到:oss://your-bucket/log-count/log-count-data.txt

  • your-bucket 表示您自己建立的 bucket,本例假設 region 為:cn-shenzhen.

(2) 使用範例程式碼

本樣本將採用 Java 來編寫作業任務,使用 Maven 來編譯,推薦使用 IDEA:https://www.jetbrains.com/idea/download/ 選擇 Community 版本(免費)。

樣本程式下載:java-log-count.zip

這是一個 Maven 工程。

重要

無需修改代碼。

(3) 編譯打包

運行命令編譯打包:

mvn package

即可在 target 得到下面 3 個 jar 包:

batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar

再將 3 個 jar 包,打成一個 tar.gz 壓縮包,命令如下:

> cd target  #進入 target 目錄
> tar -czf worker.tar.gz *SNAPSHOT-*.jar #打包

運行以下命令,查看包的內容是否正確:

> tar -tvf worker.tar.gz
batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar
重要

BatchCompute 只支援以 tar.gz 為尾碼的壓縮包,請注意務必用以上方式(gzip)打包,否則將會無法解析。

(4) 上傳到OSS

本例將 worker.tar.gz 上傳到 OSS 的 your-bucket 中:oss://your-bucket/log-count/worker.tar.gz

如要運行本例子,您需要建立自己的 bucket,並且把 worker.tar.gz 檔案上傳至您自己建立的 bucket 路徑下。

2. 使用SDK建立(提交)作業

(1) 建立一個Maven工程

在 pom.xml 中增加以下 dependencies:

    <dependencies>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-batchcompute</artifactId>
            <version>5.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>3.2.3</version>
        </dependency>
    </dependencies>

請確定使用最新版本的 SDK:Java 版 SDK

(2) 建立一個Java類: Demo.java

提交作業需要指定叢集 ID 或者使用匿名叢集參數。本例子使用匿名叢集方式進行。匿名叢集需要配置2個參數,其中:

  • 可用的鏡像 ID,可以使用系統提供的 Image,也可以自行製作鏡像。

  • 執行個體規格(InstanceType,執行個體類型),請看 目前支援類型

在 OSS 中建立儲存 StdoutRedirectPath(程式輸出結果)和 StderrRedirectPath(錯誤記錄檔)的檔案路徑,本例中建立的路徑為oss://your-bucket/log-count/logs/

如需運行本例,請按照上文所述的變數擷取以及與上文對應的您的 OSS 路徑對程式中注釋中的變數進行修改。

Java SDK 提交程式模板如下,程式中具體參數含義請參照SDK介面說明

Demo.java:

/*
* IMAGE_ID:ECS 鏡像,由上文所述擷取
* INSTANCE_TYPE: 執行個體類型,由上文所述擷取
* REGION_ID:提交作業的地區,此項需與上文 OSS 儲存 worker 的bucket 地區一致
* ACCESS_KEY_ID: AccessKeyId 可以由上文所述擷取
* ACCESS_KEY_SECRET: AccessKeySecret 可以由上文所述擷取
* WORKER_PATH:由上文所述打包上傳的 worker 的 OSS 儲存路徑
* LOG_PATH:錯誤反饋和 task 輸出的儲存路徑,logs 檔案需事先自行建立
 */
 import com.aliyuncs.batchcompute.main.v20151111.*;
 import com.aliyuncs.batchcompute.model.v20151111.*;
 import com.aliyuncs.batchcompute.pojo.v20151111.*;
 import com.aliyuncs.exceptions.ClientException;

 import java.util.ArrayList;
 import java.util.List;

 public class Demo {

     static String IMAGE_ID = "img-ubuntu";;  //這裡填寫您的 ECS 鏡像 ID
     static String INSTANCE_TYPE = "ecs.sn1.medium"; //根據 region 填寫合適的 InstanceType

     static String REGION_ID = "cn-shenzhen";   //這裡填寫 region
     static String ACCESS_KEY_ID = "";  //"your-AccessKeyId";   這裡填寫您的 AccessKeyId
     static String ACCESS_KEY_SECRET = ""; //"your-AccessKeySecret";    這裡填寫您的 AccessKeySecret
     static String WORKER_PATH = ""; //"oss://your-bucket/log-count/worker.tar.gz"; //   這裡填寫您上傳的 worker.tar.gz 的 OSS 儲存路徑
     static String LOG_PATH = ""; // "oss://your-bucket/log-count/logs/"; //   這裡填寫您建立的錯誤反饋和 task 輸出的 OSS 儲存路徑
     static String MOUNT_PATH = ""; // "oss://your-bucket/log-count/";

     public static void main(String[] args){

         /** 構造 BatchCompute 用戶端 */
         BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);

         try{

             /** 構造 Job 對象 */
             JobDescription jobDescription = genJobDescription();

             //建立 Job
             CreateJobResponse response = client.createJob(jobDescription);

             //建立成功後,返回 jobId
             String jobId = response.getJobId();

             System.out.println("Job created success, got jobId: "+jobId);


             //查詢 job 狀態
             GetJobResponse getJobResponse = client.getJob(jobId);

             Job job = getJobResponse.getJob();

             System.out.println("Job state:"+job.getState());

         } catch (ClientException e) {
             e.printStackTrace();

             System.out.println("Job created failed, errorCode:"+ e.getErrCode()+", errorMessage:"+e.getErrMsg());
         }
     }

     private static JobDescription genJobDescription(){



         JobDescription jobDescription = new JobDescription();

         jobDescription.setName("java-log-count");
         jobDescription.setPriority(0);
         jobDescription.setDescription("log-count demo");
         jobDescription.setJobFailOnInstanceFail(true);
         jobDescription.setType("DAG");

         DAG taskDag = new DAG();


         /** 添加 split task */

         TaskDescription splitTask =  genTaskDescription();
         splitTask.setTaskName("split");
         splitTask.setInstanceCount(1);
         splitTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar");
         taskDag.addTask(splitTask);

         /** 添加 count task */
         TaskDescription countTask =  genTaskDescription();
         countTask.setTaskName("count");
         countTask.setInstanceCount(3);
         countTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar");
         taskDag.addTask(countTask);

         /** 添加 merge task */
         TaskDescription mergeTask =  genTaskDescription();
         mergeTask.setTaskName("merge");
         mergeTask.setInstanceCount(1);
         mergeTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar");
         taskDag.addTask(mergeTask);



         /** 添加 Task 依賴:  split-->count-->merge  */

         List<String> taskNameTargets = new ArrayList();
         taskNameTargets.add("merge");
         taskDag.addDependencies("count", taskNameTargets);

         List<String> taskNameTargets2 = new ArrayList();
         taskNameTargets2.add("count");
         taskDag.addDependencies("split", taskNameTargets2);

         //dag
         jobDescription.setDag(taskDag);

         return jobDescription;
     }

     private static TaskDescription genTaskDescription(){

         AutoCluster autoCluster = new AutoCluster();
         autoCluster.setInstanceType(INSTANCE_TYPE);
         autoCluster.setImageId(IMAGE_ID);
         //autoCluster.setResourceType("OnDemand");

         TaskDescription task = new TaskDescription();
         //task.setTaskName("Find");

        //如果使用 VPC,需要配置 cidrBlock, 請確保 IP 段不衝突
        Configs configs = new Configs();
        Networks networks = new Networks();
        VPC vpc = new VPC();
        vpc.setCidrBlock("192.168.0.0/16");
        networks.setVpc(vpc);
        configs.setNetworks(networks);
        autoCluster.setConfigs(configs);

         //打包上傳的作業的 OSS 全路徑
         Parameters p = new Parameters();
         Command cmd = new Command();
         //cmd.setCommandLine("");
         //打包上傳的作業的 OSS 全路徑
         cmd.setPackagePath(WORKER_PATH);
         p.setCommand(cmd);
         //錯誤反饋儲存路徑
         p.setStderrRedirectPath(LOG_PATH);
         //最終結果輸出儲存路徑
         p.setStdoutRedirectPath(LOG_PATH);

         task.setParameters(p);
         task.addInputMapping(MOUNT_PATH, "/home/input");
         task.addOutputMapping("/home/output",MOUNT_PATH);

         task.setAutoCluster(autoCluster);
         //task.setClusterId(clusterId);
         task.setTimeout(30000); /* 30000 秒*/
         task.setInstanceCount(1); /** 使用 1 個執行個體來運行 */

         return task;
     }
 }

正常輸出範例:

Job created success, got jobId: job-01010100010192397211
Job state:Waiting

3. 查看作業狀態

您可以用 SDK 中的 擷取作業資訊方法擷取作業狀態:

//查詢 job 狀態
GetJobResponse getJobResponse = client.getJob(jobId);
Job job = getJobResponse.getJob();
System.out.println("Job state:"+job.getState());

Job 的 state 可能為:Waiting、Running、Finished、Failed、Stopped.

4. 查看結果

您可以登入 batchcompute 控制台 查看 job 狀態。

Job 運行結束,您可以登入 OSS 控制台 查看your-bucket 這個 bucket 下面的這個檔案:/log-count/merge_result.json。

內容應該如下:

{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}

您也可以使用OSS 的 SDK來擷取結果。