AnalyticDB for MySQL企業版及湖倉版叢集支援通過Java SDK開發Spark應用和Spark SQL作業。本文介紹通過Java SDK提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業的操作步驟。
前提條件
JDK為1.8及以上版本。
已建立企業版及湖倉版叢集。具體操作,請參見建立企業版或湖倉版叢集。
已建立Job型資源群組。具體操作,請參見建立資源群組。
已配置Spark日誌的儲存地址。
說明配置Spark日誌儲存地址的兩種方法如下:
在AnalyticDB for MySQL控制台的Spark Jar開發頁面,單擊頁面右上方的日誌配置,設定Spark日誌的儲存地址。
使用配置項
spark.app.log.rootPath
指定一個OSS路徑來儲存Spark作業的執行日誌。
操作步驟
在pom.xml中配置Maven依賴。範例程式碼如下:
<dependencies> <dependency> <groupId>com.aliyun</groupId> <artifactId>adb20211201</artifactId> <version>1.0.16</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>
說明AnalyticDB for MySQL Java SDK版本號碼建議填寫為1.0.16。
設定環境變數
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。具體操作,請參見在Linux、macOS和Windows系統配置環境變數。以下為提交Spark作業、查詢Spark作業的狀態和日誌資訊、結束Spark作業以及查詢Spark歷史作業的完整範例程式碼。
import com.aliyun.adb20211201.Client; import com.aliyun.adb20211201.models.*; import com.aliyun.teaopenapi.models.Config; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import lombok.SneakyThrows; import java.util.List; public class SparkExample { private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); /** 提交Spark作業 @param client: 阿里雲用戶端 * @param clusterId: 叢集ID * @param rgName: 資源群組名稱 * @param type: Spark作業類型,取值:BATCH或SQL * @param data: 選擇Batch後,需輸入描述Spark作業的JSON 選擇SQL後,需輸入SQL語句 * @return: Spark作業ID **/ @SneakyThrows public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) { SubmitSparkAppRequest request = new SubmitSparkAppRequest(); request.setDBClusterId(clusterId); request.setResourceGroupName(rgName); request.setData(data); request.setAppType(type); System.out.println("Start to build Submit request " + gson.toJson(request)); SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp(request); System.out.println("Submit app response: " + gson.toJson(submitSparkAppResponse)); return submitSparkAppResponse.getBody().getData().getAppId(); } /** * 查詢Spark作業的狀態 * * @param appId: Spark作業ID * @param client: 阿里雲用戶端 * @return: Spark作業的狀態 */ @SneakyThrows public static String getAppState(String appId, Client client) { GetSparkAppStateRequest request = new GetSparkAppStateRequest(); request.setAppId(appId); System.out.println("Start to get app state request " + gson.toJson(request)); GetSparkAppStateResponse sparkAppState = client.getSparkAppState(request); System.out.println("App state response: " + gson.toJson(sparkAppState)); return sparkAppState.getBody().getData().getState(); } /** * 查詢Spark作業的詳細資料 * * @param appId: Spark作業ID * @param client: 阿里雲用戶端 * @return: Spark作業的詳細資料 */ @SneakyThrows public static SparkAppInfo getAppInfo(String appId, Client client) { GetSparkAppInfoRequest request = new GetSparkAppInfoRequest(); request.setAppId(appId); System.out.println("Start to get app info request " + gson.toJson(request)); GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo(request); System.out.println("App info response: " + gson.toJson(sparkAppInfo)); return sparkAppInfo.getBody().getData(); } /** * 查詢Spark作業的日誌資訊 * * @param appId: Spark作業ID * @param client: 阿里雲用戶端 * @return: Spark作業的日誌資訊 */ @SneakyThrows public static String getAppDriverLog(String appId, Client client) { GetSparkAppLogRequest request = new GetSparkAppLogRequest(); request.setAppId(appId); System.out.println("Start to get app log request " + gson.toJson(request)); GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog(request); System.out.println("App log response: " + gson.toJson(sparkAppLog)); return sparkAppLog.getBody().getData().getLogContent(); } /** * 查詢Spark歷史作業 * @param dbClusterId: 叢集ID * @param pageNumber: 頁碼,取值為正整數,預設值為1 * @param pageSize: 每頁記錄數 * @param client: 阿里雲用戶端 * @return: Spark作業詳細資料 */ @SneakyThrows public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) { ListSparkAppsRequest request = new ListSparkAppsRequest(); request.setDBClusterId(dbClusterId); request.setPageNumber(pageNumber); request.setPageSize(pageSize); System.out.println("Start to list spark apps request " + gson.toJson(request)); ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps(request); System.out.println("List spark apps response: " + gson.toJson(listSparkAppsResponse)); return listSparkAppsResponse.getBody().getData().getAppInfoList(); } /** * Example for submit a spark application * * @param args Access Key ID, Access Key Secret, ADB Cluster ID, ADB Resource Group Name, Submit data, Submit type * @throws Exception */ public static void main(String[] args) throws Exception { // 阿里雲用戶端 Config config = new Config(); // 從環境變數ALIBABA_CLOUD_ACCESS_KEY_ID中擷取AccessKey ID config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // 從環境變數ALIBABA_CLOUD_ACCESS_KEY_SECRET中擷取AccessKey Secret config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // 叢集所在的地區ID config.setRegionId("cn-hangzhou"); // 串連地址,cn-hangzhou為叢集所在的地區ID config.setEndpoint("adb.cn-hangzhou.aliyuncs.com"); Client client = new Client(config); // 叢集ID String dbClusterId = "amv-bp1mhnosdb38****"; // 資源群組名稱 String resourceGroupName = "test"; // Spark作業內容 String data = "{\n" + " \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" + " \"args\": [\"1000\"],\n" + " \"file\": \"local:///tmp/spark-examples.jar\",\n" + " \"name\": \"SparkPi\",\n" + " \"className\": \"org.apache.spark.examples.SparkPi\",\n" + " \"conf\": {\n" + " \"spark.driver.resourceSpec\": \"medium\",\n" + " \"spark.executor.instances\": 2,\n" + " \"spark.executor.resourceSpec\": \"medium\"}\n" + "}\n"; // Spark作業類型 String type = "Batch"; // Spark 作業最大執行時間 long sparkAppMaxRunningTimeInMilli = 60000; // 每一輪掃描時間間隔 long getAppStateinterval = 2000; // 提交Spark作業 String appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client); // 查詢Spark作業的狀態 String state; long startTimeInMillis = System.currentTimeMillis(); do { state = getAppState(appId, client); if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) { System.out.println("Timeout"); break; } else { System.out.println("Current state: " + state); Thread.sleep(getAppStateinterval); } } while (!"COMPLETED".equalsIgnoreCase(state) && !"FATAL".equalsIgnoreCase(state) && !"FAILED".equalsIgnoreCase(state)); // 查詢Spark作業的詳細資料 SparkAppInfo appInfo = getAppInfo(appId, client); String x = String.format("State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n", state, appInfo.getDetail().webUiAddress, appInfo.getDetail().submittedTimeInMillis, appInfo.getDetail().terminatedTimeInMillis); System.out.println(x); // 查詢Spark作業的日誌資訊 String log = getAppDriverLog(appId, client); System.out.println(log); // 查詢Spark歷史作業 List<SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId, 1, 50, client); sparkAppInfos.forEach(sparkAppInfo -> { String y = String.format("AppId: %s\n State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n", appInfo.getAppId(), appInfo.getState(), appInfo.getDetail().webUiAddress, appInfo.getDetail().submittedTimeInMillis, appInfo.getDetail().terminatedTimeInMillis); System.out.println(y); }); // 結束Spark作業 KillSparkAppRequest request = new KillSparkAppRequest(); request.setAppId(appId); client.killSparkApp(request); } }