All Products
Search
Document Center

AnalyticDB:Use AnalyticDB for MySQL SDK for Java to develop Spark applications

Last Updated:Jul 05, 2024

AnalyticDB for MySQL Data Lakehouse Edition (V3.0) allows you to use SDK for Java to develop Spark applications and Spark SQL jobs. This topic describes how to use AnalyticDB for MySQL SDK for Java to submit a Spark job, query the status and logs of a Spark job, terminate a Spark job, and query historical Spark jobs.

Prerequisites

  • JDK 1.8 or later is installed.

  • An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster is created. For more information, see Create a Data Lakehouse Edition cluster.

  • A job resource group is created for the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. For more information, see Create a resource group.

  • The path to store Spark job logs is configured.

    Note

    You can use one of the following methods to configure the log path:

    • Log on to the AnalyticDB for MySQL console and go to the Spark JAR Development page. In the upper-right corner of the page, click Log Settings to configure the log path.

    • Use the spark.app.log.rootPath parameter to specify an Object Storage Service (OSS) path to store Spark job logs.

Procedure

  1. Add Maven dependencies to the pom.xml file. Sample code:

    <dependencies>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>adb20211201</artifactId>
                <version>1.0.16</version>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.30</version>
            </dependency>
        </dependencies>
    Note

    We recommend that you set the version number of AnalyticDB for MySQL SDK for Java to 1.0.16.

  2. Configure the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET. For more information, see Configure environment variables in Linux, macOS, and Windows.

  3. Run the following sample code to submit a Spark job, query the status and logs of a Spark job, terminate a Spark job, and query historical Spark jobs:

    import com.aliyun.adb20211201.Client;
    import com.aliyun.adb20211201.models.*;
    import com.aliyun.teaopenapi.models.Config;
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import lombok.SneakyThrows;
    
    import java.util.List;
    
    public class SparkExample {
        private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
    
        /**
          Submit a Spark job.
    
         @param client:                    The Alibaba Cloud client.
         * @param clusterId:               The cluster ID.
         * @param rgName:                  The name of the resource group.
         * @param type:                     The type of the Spark job. Valid values: Batch and SQL.
         * @param data:                    If you set the job type to Batch, enter the JSON data about the Spark job.
                                            If you set the job type to SQL, enter an SQL statement.
         * @return:                        The ID of the Spark job.
         **/
        @SneakyThrows
        public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
            SubmitSparkAppRequest request = new SubmitSparkAppRequest();
            request.setDBClusterId(clusterId);
            request.setResourceGroupName(rgName);
            request.setData(data);
            request.setAppType(type);
            System.out.println("Start to build Submit request " + gson.toJson(request));
            SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp(request);
            System.out.println("Submit app response: " + gson.toJson(submitSparkAppResponse));
            return submitSparkAppResponse.getBody().getData().getAppId();
        }
    
        /**
         * Query the status of a Spark job.
         *
         * @param appId:                     The ID of the Spark job.
         * @param client:                    The Alibaba Cloud client.
         * @return:                          The status of the Spark job.
         */
        @SneakyThrows
        public static String getAppState(String appId, Client client) {
            GetSparkAppStateRequest request = new GetSparkAppStateRequest();
            request.setAppId(appId);
            System.out.println("Start to get app state request " + gson.toJson(request));
            GetSparkAppStateResponse sparkAppState = client.getSparkAppState(request);
            System.out.println("App state response: " + gson.toJson(sparkAppState));
            return sparkAppState.getBody().getData().getState();
        }
    
        /**
         * Query the details about a Spark job.
         *
         * @param appId:                     The ID of the Spark job.
         * @param client:                    The Alibaba Cloud client.
         * @return:                           The details about the Spark job.
         */
        @SneakyThrows
        public static SparkAppInfo getAppInfo(String appId, Client client) {
            GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
            request.setAppId(appId);
            System.out.println("Start to get app info request " + gson.toJson(request));
            GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo(request);
            System.out.println("App info response: " + gson.toJson(sparkAppInfo));
            return sparkAppInfo.getBody().getData();
        }
    
        /**
         * Query the logs of a Spark job.
         *
         * @param appId:                     The ID of the Spark job.
         * @param client:                    The Alibaba Cloud client.
         * @return:                          The logs of the Spark job.
         */
        @SneakyThrows
        public static String getAppDriverLog(String appId, Client client) {
            GetSparkAppLogRequest request = new GetSparkAppLogRequest();
            request.setAppId(appId);
            System.out.println("Start to get app log request " + gson.toJson(request));
            GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog(request);
            System.out.println("App log response: " + gson.toJson(sparkAppLog));
            return sparkAppLog.getBody().getData().getLogContent();
        }
    
        /**
         * Query historical Spark jobs.
         * @param dbClusterId:               The cluster ID.
         * @param pageNumber:                The page number. Pages start from page 1. Default value: 1.
         * @param pageSize:                  The number of entries per page.
         * @param client:                    The Alibaba Cloud client.
         * @return:                          The details about the Spark job.
         */
        @SneakyThrows
        public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) {
            ListSparkAppsRequest request = new ListSparkAppsRequest();
            request.setDBClusterId(dbClusterId);
            request.setPageNumber(pageNumber);
            request.setPageSize(pageSize);
            System.out.println("Start to list spark apps request " + gson.toJson(request));
            ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps(request);
            System.out.println("List spark apps response: " + gson.toJson(listSparkAppsResponse));
            return listSparkAppsResponse.getBody().getData().getAppInfoList();
        }
    
    
        /**
         * Example for submit a spark application
         *
         * @param args Access Key ID, Access Key Secret, ADB Cluster ID, ADB Resource Group Name, Submit data, Submit type
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            // The Alibaba Cloud client.
            Config config = new Config();
            // Obtain the AccessKey ID from the ALIBABA_CLOUD_ACCESS_KEY_ID environment variable.
            config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // Obtain the AccessKey secret from the ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable.
            config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // The ID of the region where the cluster resides.
            config.setRegionId("cn-hangzhou");
            // The endpoint. cn-hangzhou indicates the ID of the region where the cluster resides.
            config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
            Client client = new Client(config);
            // The cluster ID.
            String dbClusterId = "amv-bp1mhnosdb38****";
            // The name of the resource group.
            String resourceGroupName = "test";
            // The content of the Spark job.
            String data = "{\n" +
                    "    \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
                    "    \"args\": [\"1000\"],\n" +
                    "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
                    "    \"name\": \"SparkPi\",\n" +
                    "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
                    "    \"conf\": {\n" +
                    "        \"spark.driver.resourceSpec\": \"medium\",\n" +
                    "        \"spark.executor.instances\": 2,\n" +
                    "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
                    "}\n";
            // The type of the Spark job.
            String type = "Batch";
            // The maximum running time of the Spark job.
            long sparkAppMaxRunningTimeInMilli = 60000;
            // The scan interval.
            long getAppStateinterval = 2000;
          
    
            // Submit a Spark job.
            String appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client);
    
            // Query the status of a Spark job.
            String state;
            long startTimeInMillis = System.currentTimeMillis();
            do {
                state = getAppState(appId, client);
                if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) {
                    System.out.println("Timeout");
                    break;
                } else {
                    System.out.println("Current state: " + state);
                    Thread.sleep(getAppStateinterval);
                }
            } while (!"COMPLETED".equalsIgnoreCase(state) && !"FATAL".equalsIgnoreCase(state)
                    && !"FAILED".equalsIgnoreCase(state));
    
            // Query the details about a Spark job.
            SparkAppInfo appInfo = getAppInfo(appId, client);
            String x = String.format("State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n",
                    state,
                    appInfo.getDetail().webUiAddress,
                    appInfo.getDetail().submittedTimeInMillis,
                    appInfo.getDetail().terminatedTimeInMillis);
            System.out.println(x);
    
            // Query the logs of a Spark job.
            String log = getAppDriverLog(appId, client);
            System.out.println(log);
    
            // Query historical Spark jobs.
            List<SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId, 1, 50, client);
            sparkAppInfos.forEach(sparkAppInfo -> {
                String y = String.format("AppId: %s\n State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n",
                        appInfo.getAppId(),
                        appInfo.getState(),
                        appInfo.getDetail().webUiAddress,
                        appInfo.getDetail().submittedTimeInMillis,
                        appInfo.getDetail().terminatedTimeInMillis);
                System.out.println(y);
            });
    
            // Terminate a Spark job.
            KillSparkAppRequest request = new KillSparkAppRequest();
            request.setAppId(appId);
            client.killSparkApp(request);
        }
    }