すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:AnalyticDB for MySQL SDK for Javaを使用したSparkアプリケーションの開発

最終更新日:Jun 14, 2024

AnalyticDB for MySQL Data Lakehouse Edition (V3.0) では、SDK for Javaを使用してSparkアプリケーションとSpark SQLジョブを開発できます。 このトピックでは、AnalyticDB for MySQL SDK for Javaを使用してSparkジョブを送信し、Sparkジョブのステータスとログを照会し、Sparkジョブを終了し、Sparkジョブの履歴を照会する方法について説明します。

前提条件

  • JDK 1.8以降がインストールされます。

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが作成されます。 詳細については、「Data Lakehouse Editionクラスターの作成」をご参照ください。

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • Sparkジョブログを保存するパスが設定されています。

    説明

    次のいずれかの方法を使用して、ログパスを設定できます。

    • AnalyticDB for MySQLコンソールにログインし、Spark JAR Developmentページに移動します。 ページの右上隅にある [ログ設定] をクリックして、ログパスを設定します。

    • spark.app.log.rootPathパラメーターを使用して、Sparkジョブログを保存するObject Storage Service (OSS) パスを指定します。

手順

  1. Mavenの依存関係をpom.xmlファイルに追加します。 サンプルコード:

    <dependencies>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>adb20211201</artifactId>
                <version>1.0.16</version>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.30</version>
            </dependency>
        </dependencies> 
    説明

    AnalyticDB for MySQL SDK for Javaのバージョン番号を1.0.16に設定することを推奨します。

  2. 環境変数ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRETを設定します。 詳細については、「Linux、macOS、およびWindowsでの環境変数の設定」をご参照ください。

  3. 次のサンプルコードを実行して、Sparkジョブの送信、Sparkジョブのステータスとログの照会、Sparkジョブの終了、およびSparkジョブの履歴の照会を行います。

    com.aliyun.adb20211201.Clientをインポートします。com.aliyun.adb20211201.mo delsをインポートします。*;
    com.aliyun.teaopenapi.mo dels.Configをインポートします。com.google.gson.Gsonをインポートします。com.google.gson.GsonBuilderをインポートします。lombokをインポートします。SneakyThrows;
    
    java.util.Listをインポートします。public class SparkExample {
        プライベート静的Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
    
        /**
          Sparkジョブを送信します。
    
         @ paramクライアント: Alibaba Cloudクライアント。
         * @ param clusterId: クラスターID。
         * @ param rgName: リソースグループの名前。
         * @ paramタイプ: Sparkジョブのタイプ。 有効な値: BatchとSQL。
         * @ param data: ジョブタイプをBatchに設定した場合、Sparkジョブに関するJSONデータを入力します。
                                            ジョブタイプをSQLに設定した場合は、SQL文を入力します。
         * @ return: SparkジョブのID。
         **/
        @ SneakyThrows
        public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
            SubmitSparkAppRequest request=新しいSubmitSparkAppRequest();
            request.setDBClusterId(clusterId);
            request.setResourceGroupName(rgName);
            request.setData (データ);
            request.setAppType (タイプ);
            System.out.println("Start to build Submit request" + gson.toJson(request));
            SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp (リクエスト);
            System.out.println("Submit app response: " + gson.toJson(submitSparkAppResponse));
            を返しますsubmitSparkAppResponse.getBody().getData().getAppId();
        }
    
        /**
         * Sparkジョブのステータスを照会します。
         *
         * @ param appId: SparkジョブのID。
         * @ paramクライアント: Alibaba Cloudクライアント。
         * @ return: Sparkジョブのステータス。
         */
        @ SneakyThrows
        public static String getAppState(String appId, Client client) {
            GetSparkAppStateRequest request=新しいGetSparkAppStateRequest();
            request.setAppId(appId);
            System.out.println("Start to get app state request" + gson.toJson(request));
            GetSparkAppStateResponse sparkAppState = client.getSparkAppState (リクエスト);
            System.out.println("App state response: " + gson.toJson(sparkAppState));
            sparkAppState.getBody().getData().getState(); を返します。
        }
    
        /**
         * Sparkジョブに関する詳細を照会します。
         *
         * @ param appId: SparkジョブのID。
         * @ paramクライアント: Alibaba Cloudクライアント。
         * @ return: Sparkジョブの詳細。
         */
        @ SneakyThrows
        public static SparkAppInfo getAppInfo (文字列appId、クライアント) {
            GetSparkAppInfoRequest request=新しいGetSparkAppInfoRequest();
            request.setAppId(appId);
            System.out.println("Start to get app info request" + gson.toJson(request));
            GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo (リクエスト);
            System.out.println("アプリ情報応答:" + gson.toJson(sparkAppInfo));
            sparkAppInfo.getBody().getData(); を返します。
        }
    
        /**
         * Sparkジョブのログを照会します。
         *
         * @ param appId: SparkジョブのID。
         * @ paramクライアント: Alibaba Cloudクライアント。
         * @ return: Sparkジョブのログ。
         */
        @ SneakyThrows
        public static String getAppDriverLog(String appId, Client client) {
            GetSparkAppLogRequest request=新しいGetSparkAppLogRequest();
            request.setAppId(appId);
            System.out.println("Start to get app log request" + gson.toJson(request));
            GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog (リクエスト);
            System.out.println("アプリのログ応答:" + gson.toJson(sparkAppLog));
            sparkAppLog.getBody().getData().getLogContent(); を返します。
        }
    
        /**
         * 履歴Sparkジョブをクエリします。
         * @ param dbClusterId: クラスターID。
         * @ param pageNumber: ページ番号。 ページは 1 ページ目から始まります。 デフォルト値は 1 です。
         * @ param pageSize: 1ページあたりのエントリ数。
         * @ paramクライアント: Alibaba Cloudクライアント。
         * @ return: Sparkジョブの詳細。
         */
        @ SneakyThrows
        public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) {
            ListSparkAppsRequest request = new ListSparkAppsRequest();
            request.setDBClusterId(dbClusterId);
            request.setPageNumber(pageNumber);
            request.setPageSize(pageSize);
            System.out.println("Start to list spark apps request" + gson.toJson(request));
            ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps (リクエスト);
            System.out.println("List spark apps response: " + gson.toJson(listSparkAppsResponse));
            listSparkAppsResponse.getBody().getData().getAppInfoList(); を返します。
        }
    
    
        /**
         * スパークアプリケーションの送信例
         *
         * @ param args Access Key ID, Access Key Secret, ADB Cluster ID, ADBリソースグループ名, データの送信, タイプの送信
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            // Alibaba Cloudクライアント。
            Config config = new Config();
            // ALIBABA_CLOUD_ACCESS_KEY_ID環境変数からAccessKey IDを取得します。
            config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // ALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数からAccessKeyシークレットを取得します。
            config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // クラスターが存在するリージョンのID。
            config.setRegionId("cn-hangzhou");
            // エンドポイント。 cn-hangzhouは、クラスターが存在するリージョンのIDを示します。
            config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
            Client client = new Client(config);
            // クラスターID。
            文字列dbClusterId = "amv-bp1mhnosdb38 ****";
            // リソースグループの名前。
            文字列resourceGroupName = "test";
            // Sparkジョブの内容。
            String data = "String data = "{\n "+
                    " \" comments\": [\"-これはSparkPiの例です。 コンテンツを変更し、スパークプログラムを実行します。\"],\n" +
                    " \" args\": [\" 1000\"],\n" +
                    " \" file\": \" local:/// tmp/spark-examples.jar\",\n" +
                    " \" name\": \" SparkPi\",\n" +
                    " \" className\": \" org.apache.spark.examples.SparkPi\",\n" +
                    " \" conf\": {\n" +
                    " \" spark.driver.resourceSpec\": \" medium\",\n" +
                    " \" spark.exe cutor.instances\": 2,\n" +
                    " \" spark.exe cutor.resourceSpec\": \" medium\"}\n" +
                    "}\n";";
            // Sparkジョブのタイプ。
            文字列タイプ="バッチ";
            // Sparkジョブの最大実行時間。
            long sparkAppMaxRunningTimeInMilli = 60000;
            // スキャン間隔。
            long getAppStateinterval = 2000;
          
    
            // Sparkジョブを送信します。
            文字列appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client);
    
            // Sparkジョブのステータスを照会します。
            文字列の状態;
            long startTimeInMillis = System.currentTimeMillis();
            do {
                state = getAppState(appId、クライアント);
                if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) {
                    System.out.println("Timeout");
                    break;
                } else {
                    System.out.println("Current state: " + state);
                    Thread.sleep(getAppStateinterval);
                }
            } while (!"COMPLETED".equalsIgnoreCase(state) && ! "FATAL".equalsIgnoreCase (状態)
                    && ! "FAILED".equalsIgnoreCase (状態);
    
            // Sparkジョブに関する詳細を照会します。
            SparkAppInfo appInfo = getAppInfo(appId、クライアント);
            String x = String.format("State: % s\n WebUI: % s\n submit time: % s\n, terminated time: % s\n" 、
                    状態、
                    appInfo.getDetail().webUiAddress、
                    appInfo.getDetail().submittedTimeInMillis、
                    appInfo.getDetail().terminatedTimeInMillis);
            System.out.println(x);
    
            // Sparkジョブのログを照会します。
            String log = getAppDriverLog(appId、クライアント);
            System.out.println (ログ);
    
            // 履歴Sparkジョブを照会します。
            リスト <SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId、1、50、クライアント);
            sparkAppInfos.forEach(sparkAppInfo -> {
                String y = String.format("AppId: % s\n State: % s\n WebUI: % s\n submit time: % s\n, terminated time: % s\n" 、
                        appInfo.getAppId() 、
                        appInfo.getState() 、
                        appInfo.getDetail().webUiAddress、
                        appInfo.getDetail().submittedTimeInMillis、
                        appInfo.getDetail().terminatedTimeInMillis);
                System.out.println(y);
            });
    
            // Sparkジョブを終了します。
            KillSparkAppRequest request = new KillSparkAppRequest();
            request.setAppId(appId);
            client.killSparkApp (リクエスト);
        }
    }