AnalyticDB for MySQL Data Lakehouse Edition (V3.0) では、SDK for Javaを使用してSparkアプリケーションとSpark SQLジョブを開発できます。 このトピックでは、AnalyticDB for MySQL SDK for Javaを使用してSparkジョブを送信し、Sparkジョブのステータスとログを照会し、Sparkジョブを終了し、Sparkジョブの履歴を照会する方法について説明します。
前提条件
JDK 1.8以降がインストールされます。
AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが作成されます。 詳細については、「Data Lakehouse Editionクラスターの作成」をご参照ください。
AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。
Sparkジョブログを保存するパスが設定されています。
説明次のいずれかの方法を使用して、ログパスを設定できます。
AnalyticDB for MySQLコンソールにログインし、Spark JAR Developmentページに移動します。 ページの右上隅にある [ログ設定] をクリックして、ログパスを設定します。
spark.app.log.rootPath
パラメーターを使用して、Sparkジョブログを保存するObject Storage Service (OSS) パスを指定します。
手順
Mavenの依存関係をpom.xmlファイルに追加します。 サンプルコード:
<dependencies> <dependency> <groupId>com.aliyun</groupId> <artifactId>adb20211201</artifactId> <version>1.0.16</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>
説明AnalyticDB for MySQL SDK for Javaのバージョン番号を1.0.16に設定することを推奨します。
環境変数
ALIBABA_CLOUD_ACCESS_KEY_ID
およびALIBABA_CLOUD_ACCESS_KEY_SECRET
を設定します。 詳細については、「Linux、macOS、およびWindowsでの環境変数の設定」をご参照ください。次のサンプルコードを実行して、Sparkジョブの送信、Sparkジョブのステータスとログの照会、Sparkジョブの終了、およびSparkジョブの履歴の照会を行います。
com.aliyun.adb20211201.Clientをインポートします。com.aliyun.adb20211201.mo delsをインポートします。*; com.aliyun.teaopenapi.mo dels.Configをインポートします。com.google.gson.Gsonをインポートします。com.google.gson.GsonBuilderをインポートします。lombokをインポートします。SneakyThrows; java.util.Listをインポートします。public class SparkExample { プライベート静的Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); /** Sparkジョブを送信します。 @ paramクライアント: Alibaba Cloudクライアント。 * @ param clusterId: クラスターID。 * @ param rgName: リソースグループの名前。 * @ paramタイプ: Sparkジョブのタイプ。 有効な値: BatchとSQL。 * @ param data: ジョブタイプをBatchに設定した場合、Sparkジョブに関するJSONデータを入力します。 ジョブタイプをSQLに設定した場合は、SQL文を入力します。 * @ return: SparkジョブのID。 **/ @ SneakyThrows public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) { SubmitSparkAppRequest request=新しいSubmitSparkAppRequest(); request.setDBClusterId(clusterId); request.setResourceGroupName(rgName); request.setData (データ); request.setAppType (タイプ); System.out.println("Start to build Submit request" + gson.toJson(request)); SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp (リクエスト); System.out.println("Submit app response: " + gson.toJson(submitSparkAppResponse)); を返しますsubmitSparkAppResponse.getBody().getData().getAppId(); } /** * Sparkジョブのステータスを照会します。 * * @ param appId: SparkジョブのID。 * @ paramクライアント: Alibaba Cloudクライアント。 * @ return: Sparkジョブのステータス。 */ @ SneakyThrows public static String getAppState(String appId, Client client) { GetSparkAppStateRequest request=新しいGetSparkAppStateRequest(); request.setAppId(appId); System.out.println("Start to get app state request" + gson.toJson(request)); GetSparkAppStateResponse sparkAppState = client.getSparkAppState (リクエスト); System.out.println("App state response: " + gson.toJson(sparkAppState)); sparkAppState.getBody().getData().getState(); を返します。 } /** * Sparkジョブに関する詳細を照会します。 * * @ param appId: SparkジョブのID。 * @ paramクライアント: Alibaba Cloudクライアント。 * @ return: Sparkジョブの詳細。 */ @ SneakyThrows public static SparkAppInfo getAppInfo (文字列appId、クライアント) { GetSparkAppInfoRequest request=新しいGetSparkAppInfoRequest(); request.setAppId(appId); System.out.println("Start to get app info request" + gson.toJson(request)); GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo (リクエスト); System.out.println("アプリ情報応答:" + gson.toJson(sparkAppInfo)); sparkAppInfo.getBody().getData(); を返します。 } /** * Sparkジョブのログを照会します。 * * @ param appId: SparkジョブのID。 * @ paramクライアント: Alibaba Cloudクライアント。 * @ return: Sparkジョブのログ。 */ @ SneakyThrows public static String getAppDriverLog(String appId, Client client) { GetSparkAppLogRequest request=新しいGetSparkAppLogRequest(); request.setAppId(appId); System.out.println("Start to get app log request" + gson.toJson(request)); GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog (リクエスト); System.out.println("アプリのログ応答:" + gson.toJson(sparkAppLog)); sparkAppLog.getBody().getData().getLogContent(); を返します。 } /** * 履歴Sparkジョブをクエリします。 * @ param dbClusterId: クラスターID。 * @ param pageNumber: ページ番号。 ページは 1 ページ目から始まります。 デフォルト値は 1 です。 * @ param pageSize: 1ページあたりのエントリ数。 * @ paramクライアント: Alibaba Cloudクライアント。 * @ return: Sparkジョブの詳細。 */ @ SneakyThrows public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) { ListSparkAppsRequest request = new ListSparkAppsRequest(); request.setDBClusterId(dbClusterId); request.setPageNumber(pageNumber); request.setPageSize(pageSize); System.out.println("Start to list spark apps request" + gson.toJson(request)); ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps (リクエスト); System.out.println("List spark apps response: " + gson.toJson(listSparkAppsResponse)); listSparkAppsResponse.getBody().getData().getAppInfoList(); を返します。 } /** * スパークアプリケーションの送信例 * * @ param args Access Key ID, Access Key Secret, ADB Cluster ID, ADBリソースグループ名, データの送信, タイプの送信 * @throws Exception */ public static void main(String[] args) throws Exception { // Alibaba Cloudクライアント。 Config config = new Config(); // ALIBABA_CLOUD_ACCESS_KEY_ID環境変数からAccessKey IDを取得します。 config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // ALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数からAccessKeyシークレットを取得します。 config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // クラスターが存在するリージョンのID。 config.setRegionId("cn-hangzhou"); // エンドポイント。 cn-hangzhouは、クラスターが存在するリージョンのIDを示します。 config.setEndpoint("adb.cn-hangzhou.aliyuncs.com"); Client client = new Client(config); // クラスターID。 文字列dbClusterId = "amv-bp1mhnosdb38 ****"; // リソースグループの名前。 文字列resourceGroupName = "test"; // Sparkジョブの内容。 String data = "String data = "{\n "+ " \" comments\": [\"-これはSparkPiの例です。 コンテンツを変更し、スパークプログラムを実行します。\"],\n" + " \" args\": [\" 1000\"],\n" + " \" file\": \" local:/// tmp/spark-examples.jar\",\n" + " \" name\": \" SparkPi\",\n" + " \" className\": \" org.apache.spark.examples.SparkPi\",\n" + " \" conf\": {\n" + " \" spark.driver.resourceSpec\": \" medium\",\n" + " \" spark.exe cutor.instances\": 2,\n" + " \" spark.exe cutor.resourceSpec\": \" medium\"}\n" + "}\n";"; // Sparkジョブのタイプ。 文字列タイプ="バッチ"; // Sparkジョブの最大実行時間。 long sparkAppMaxRunningTimeInMilli = 60000; // スキャン間隔。 long getAppStateinterval = 2000; // Sparkジョブを送信します。 文字列appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client); // Sparkジョブのステータスを照会します。 文字列の状態; long startTimeInMillis = System.currentTimeMillis(); do { state = getAppState(appId、クライアント); if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) { System.out.println("Timeout"); break; } else { System.out.println("Current state: " + state); Thread.sleep(getAppStateinterval); } } while (!"COMPLETED".equalsIgnoreCase(state) && ! "FATAL".equalsIgnoreCase (状態) && ! "FAILED".equalsIgnoreCase (状態); // Sparkジョブに関する詳細を照会します。 SparkAppInfo appInfo = getAppInfo(appId、クライアント); String x = String.format("State: % s\n WebUI: % s\n submit time: % s\n, terminated time: % s\n" 、 状態、 appInfo.getDetail().webUiAddress、 appInfo.getDetail().submittedTimeInMillis、 appInfo.getDetail().terminatedTimeInMillis); System.out.println(x); // Sparkジョブのログを照会します。 String log = getAppDriverLog(appId、クライアント); System.out.println (ログ); // 履歴Sparkジョブを照会します。 リスト <SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId、1、50、クライアント); sparkAppInfos.forEach(sparkAppInfo -> { String y = String.format("AppId: % s\n State: % s\n WebUI: % s\n submit time: % s\n, terminated time: % s\n" 、 appInfo.getAppId() 、 appInfo.getState() 、 appInfo.getDetail().webUiAddress、 appInfo.getDetail().submittedTimeInMillis、 appInfo.getDetail().terminatedTimeInMillis); System.out.println(y); }); // Sparkジョブを終了します。 KillSparkAppRequest request = new KillSparkAppRequest(); request.setAppId(appId); client.killSparkApp (リクエスト); } }