全部產品
Search
文件中心

AnalyticDB:通過Java SDK開發Spark應用

更新時間:Jul 06, 2024

AnalyticDB for MySQL企業版及湖倉版叢集支援通過Java SDK開發Spark應用和Spark SQL作業。本文介紹通過Java SDK提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業的操作步驟。

前提條件

  • JDK為1.8及以上版本。

  • 已建立企業版及湖倉版叢集。具體操作,請參見建立企業版或湖倉版叢集

  • 已建立Job型資源群組。具體操作,請參見建立資源群組

  • 已配置Spark日誌的儲存地址。

    說明

    配置Spark日誌儲存地址的兩種方法如下:

    • AnalyticDB for MySQL控制台的Spark Jar開發頁面,單擊頁面右上方的日誌配置,設定Spark日誌的儲存地址。

    • 使用配置項spark.app.log.rootPath指定一個OSS路徑來儲存Spark作業的執行日誌。

操作步驟

  1. 在pom.xml中配置Maven依賴。範例程式碼如下:

    <dependencies>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>adb20211201</artifactId>
                <version>1.0.16</version>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.30</version>
            </dependency>
        </dependencies>
    說明

    AnalyticDB for MySQL Java SDK版本號碼建議填寫為1.0.16。

  2. 設定環境變數ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變數

  3. 以下為提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業的完整範例程式碼。

    import com.aliyun.adb20211201.Client;
    import com.aliyun.adb20211201.models.*;
    import com.aliyun.teaopenapi.models.Config;
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import lombok.SneakyThrows;
    
    import java.util.List;
    
    public class SparkExample {
        private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
    
        /**
          提交Spark作業
    
         @param client:                    阿里雲用戶端
         * @param clusterId:               叢集ID
         * @param rgName:                  資源群組名稱
         * @param type:                     Spark作業類型,取值:BATCH或SQL
         * @param data:                    選擇Batch後,需輸入描述Spark作業的JSON
                                            選擇SQL後,需輸入SQL語句
         * @return:                        Spark作業ID
         **/
        @SneakyThrows
        public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
            SubmitSparkAppRequest request = new SubmitSparkAppRequest();
            request.setDBClusterId(clusterId);
            request.setResourceGroupName(rgName);
            request.setData(data);
            request.setAppType(type);
            System.out.println("Start to build Submit request " + gson.toJson(request));
            SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp(request);
            System.out.println("Submit app response: " + gson.toJson(submitSparkAppResponse));
            return submitSparkAppResponse.getBody().getData().getAppId();
        }
    
        /**
         * 查詢Spark作業的狀態
         *
         * @param appId:                     Spark作業ID
         * @param client:                    阿里雲用戶端
         * @return:                          Spark作業的狀態
         */
        @SneakyThrows
        public static String getAppState(String appId, Client client) {
            GetSparkAppStateRequest request = new GetSparkAppStateRequest();
            request.setAppId(appId);
            System.out.println("Start to get app state request " + gson.toJson(request));
            GetSparkAppStateResponse sparkAppState = client.getSparkAppState(request);
            System.out.println("App state response: " + gson.toJson(sparkAppState));
            return sparkAppState.getBody().getData().getState();
        }
    
        /**
         * 查詢Spark作業的詳細資料
         *
         * @param appId:                     Spark作業ID
         * @param client:                    阿里雲用戶端
         * @return:                          Spark作業的詳細資料
         */
        @SneakyThrows
        public static SparkAppInfo getAppInfo(String appId, Client client) {
            GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
            request.setAppId(appId);
            System.out.println("Start to get app info request " + gson.toJson(request));
            GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo(request);
            System.out.println("App info response: " + gson.toJson(sparkAppInfo));
            return sparkAppInfo.getBody().getData();
        }
    
        /**
         * 查詢Spark作業的日誌資訊
         *
         * @param appId:                     Spark作業ID
         * @param client:                    阿里雲用戶端
         * @return:                          Spark作業的日誌資訊
         */
        @SneakyThrows
        public static String getAppDriverLog(String appId, Client client) {
            GetSparkAppLogRequest request = new GetSparkAppLogRequest();
            request.setAppId(appId);
            System.out.println("Start to get app log request " + gson.toJson(request));
            GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog(request);
            System.out.println("App log response: " + gson.toJson(sparkAppLog));
            return sparkAppLog.getBody().getData().getLogContent();
        }
    
        /**
         * 查詢Spark歷史作業
         * @param dbClusterId:               叢集ID
         * @param pageNumber:                頁碼,取值為正整數,預設值為1
         * @param pageSize:                  每頁記錄數
         * @param client:                    阿里雲用戶端
         * @return:                          Spark作業詳細資料
         */
        @SneakyThrows
        public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) {
            ListSparkAppsRequest request = new ListSparkAppsRequest();
            request.setDBClusterId(dbClusterId);
            request.setPageNumber(pageNumber);
            request.setPageSize(pageSize);
            System.out.println("Start to list spark apps request " + gson.toJson(request));
            ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps(request);
            System.out.println("List spark apps response: " + gson.toJson(listSparkAppsResponse));
            return listSparkAppsResponse.getBody().getData().getAppInfoList();
        }
    
    
        /**
         * Example for submit a spark application
         *
         * @param args Access Key ID, Access Key Secret, ADB Cluster ID, ADB Resource Group Name, Submit data, Submit type
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            // 阿里雲用戶端
            Config config = new Config();
            // 從環境變數ALIBABA_CLOUD_ACCESS_KEY_ID中擷取AccessKey ID
            config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // 從環境變數ALIBABA_CLOUD_ACCESS_KEY_SECRET中擷取AccessKey Secret
            config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // 叢集所在的地區ID
            config.setRegionId("cn-hangzhou");
            // 串連地址,cn-hangzhou為叢集所在的地區ID
            config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
            Client client = new Client(config);
            // 叢集ID
            String dbClusterId = "amv-bp1mhnosdb38****";
            // 資源群組名稱
            String resourceGroupName = "test";
            // Spark作業內容
            String data = "{\n" +
                    "    \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
                    "    \"args\": [\"1000\"],\n" +
                    "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
                    "    \"name\": \"SparkPi\",\n" +
                    "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
                    "    \"conf\": {\n" +
                    "        \"spark.driver.resourceSpec\": \"medium\",\n" +
                    "        \"spark.executor.instances\": 2,\n" +
                    "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
                    "}\n";
            // Spark作業類型
            String type = "Batch";
            // Spark 作業最大執行時間
            long sparkAppMaxRunningTimeInMilli = 60000;
            // 每一輪掃描時間間隔
            long getAppStateinterval = 2000;
          
    
            // 提交Spark作業
            String appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client);
    
            // 查詢Spark作業的狀態
            String state;
            long startTimeInMillis = System.currentTimeMillis();
            do {
                state = getAppState(appId, client);
                if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) {
                    System.out.println("Timeout");
                    break;
                } else {
                    System.out.println("Current state: " + state);
                    Thread.sleep(getAppStateinterval);
                }
            } while (!"COMPLETED".equalsIgnoreCase(state) && !"FATAL".equalsIgnoreCase(state)
                    && !"FAILED".equalsIgnoreCase(state));
    
            // 查詢Spark作業的詳細資料
            SparkAppInfo appInfo = getAppInfo(appId, client);
            String x = String.format("State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n",
                    state,
                    appInfo.getDetail().webUiAddress,
                    appInfo.getDetail().submittedTimeInMillis,
                    appInfo.getDetail().terminatedTimeInMillis);
            System.out.println(x);
    
            // 查詢Spark作業的日誌資訊
            String log = getAppDriverLog(appId, client);
            System.out.println(log);
    
            // 查詢Spark歷史作業
            List<SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId, 1, 50, client);
            sparkAppInfos.forEach(sparkAppInfo -> {
                String y = String.format("AppId: %s\n State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n",
                        appInfo.getAppId(),
                        appInfo.getState(),
                        appInfo.getDetail().webUiAddress,
                        appInfo.getDetail().submittedTimeInMillis,
                        appInfo.getDetail().terminatedTimeInMillis);
                System.out.println(y);
            });
    
            //  結束Spark作業
            KillSparkAppRequest request = new KillSparkAppRequest();
            request.setAppId(appId);
            client.killSparkApp(request);
        }
    }