全部产品
Search
文档中心

云原生数据仓库AnalyticDB:通过Java SDK开发Spark应用

更新时间:Jul 05, 2024

AnalyticDB for MySQL企业版及湖仓版集群支持通过Java SDK开发Spark应用和Spark SQL作业。本文介绍通过Java SDK提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业的操作步骤。

前提条件

  • JDK为1.8及以上版本。

  • 已创建企业版及湖仓版集群。具体操作,请参见创建湖仓版集群

  • 已创建Job型资源组。具体操作,请参见新建资源组

  • 已配置Spark日志的存储地址。

    说明

    配置Spark日志存储地址的两种方法如下:

    • AnalyticDB for MySQL控制台的Spark Jar开发页面,单击页面右上角的日志配置,设置Spark日志的存储地址。

    • 使用配置项spark.app.log.rootPath指定一个OSS路径来存储Spark作业的执行日志。

操作步骤

  1. 在pom.xml中配置Maven依赖。示例代码如下:

    <dependencies>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>adb20211201</artifactId>
                <version>1.0.16</version>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.30</version>
            </dependency>
        </dependencies>
    说明

    AnalyticDB for MySQL Java SDK版本号建议填写为1.0.16。

  2. 设置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量

  3. 以下为提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业的完整示例代码。

    import com.aliyun.adb20211201.Client;
    import com.aliyun.adb20211201.models.*;
    import com.aliyun.teaopenapi.models.Config;
    import com.google.gson.Gson;
    import com.google.gson.GsonBuilder;
    import lombok.SneakyThrows;
    
    import java.util.List;
    
    public class SparkExample {
        private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
    
        /**
          提交Spark作业
    
         @param client:                    阿里云客户端
         * @param clusterId:               集群ID
         * @param rgName:                  资源组名称
         * @param type:                     Spark作业类型,取值:BATCH或SQL
         * @param data:                    选择Batch后,需输入描述Spark作业的JSON
                                            选择SQL后,需输入SQL语句
         * @return:                        Spark作业ID
         **/
        @SneakyThrows
        public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
            SubmitSparkAppRequest request = new SubmitSparkAppRequest();
            request.setDBClusterId(clusterId);
            request.setResourceGroupName(rgName);
            request.setData(data);
            request.setAppType(type);
            System.out.println("Start to build Submit request " + gson.toJson(request));
            SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp(request);
            System.out.println("Submit app response: " + gson.toJson(submitSparkAppResponse));
            return submitSparkAppResponse.getBody().getData().getAppId();
        }
    
        /**
         * 查询Spark作业的状态
         *
         * @param appId:                     Spark作业ID
         * @param client:                    阿里云客户端
         * @return:                          Spark作业的状态
         */
        @SneakyThrows
        public static String getAppState(String appId, Client client) {
            GetSparkAppStateRequest request = new GetSparkAppStateRequest();
            request.setAppId(appId);
            System.out.println("Start to get app state request " + gson.toJson(request));
            GetSparkAppStateResponse sparkAppState = client.getSparkAppState(request);
            System.out.println("App state response: " + gson.toJson(sparkAppState));
            return sparkAppState.getBody().getData().getState();
        }
    
        /**
         * 查询Spark作业的详细信息
         *
         * @param appId:                     Spark作业ID
         * @param client:                    阿里云客户端
         * @return:                          Spark作业的详细信息
         */
        @SneakyThrows
        public static SparkAppInfo getAppInfo(String appId, Client client) {
            GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
            request.setAppId(appId);
            System.out.println("Start to get app info request " + gson.toJson(request));
            GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo(request);
            System.out.println("App info response: " + gson.toJson(sparkAppInfo));
            return sparkAppInfo.getBody().getData();
        }
    
        /**
         * 查询Spark作业的日志信息
         *
         * @param appId:                     Spark作业ID
         * @param client:                    阿里云客户端
         * @return:                          Spark作业的日志信息
         */
        @SneakyThrows
        public static String getAppDriverLog(String appId, Client client) {
            GetSparkAppLogRequest request = new GetSparkAppLogRequest();
            request.setAppId(appId);
            System.out.println("Start to get app log request " + gson.toJson(request));
            GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog(request);
            System.out.println("App log response: " + gson.toJson(sparkAppLog));
            return sparkAppLog.getBody().getData().getLogContent();
        }
    
        /**
         * 查询Spark历史作业
         * @param dbClusterId:               集群ID
         * @param pageNumber:                页码,取值为正整数,默认值为1
         * @param pageSize:                  每页记录数
         * @param client:                    阿里云客户端
         * @return:                          Spark作业详细信息
         */
        @SneakyThrows
        public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) {
            ListSparkAppsRequest request = new ListSparkAppsRequest();
            request.setDBClusterId(dbClusterId);
            request.setPageNumber(pageNumber);
            request.setPageSize(pageSize);
            System.out.println("Start to list spark apps request " + gson.toJson(request));
            ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps(request);
            System.out.println("List spark apps response: " + gson.toJson(listSparkAppsResponse));
            return listSparkAppsResponse.getBody().getData().getAppInfoList();
        }
    
    
        /**
         * Example for submit a spark application
         *
         * @param args Access Key ID, Access Key Secret, ADB Cluster ID, ADB Resource Group Name, Submit data, Submit type
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            // 阿里云客户端
            Config config = new Config();
            // 从环境变量ALIBABA_CLOUD_ACCESS_KEY_ID中获取AccessKey ID
            config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // 从环境变量ALIBABA_CLOUD_ACCESS_KEY_SECRET中获取AccessKey Secret
            config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // 集群所在的地域ID
            config.setRegionId("cn-hangzhou");
            // 连接地址,cn-hangzhou为集群所在的地域ID
            config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
            Client client = new Client(config);
            // 集群ID
            String dbClusterId = "amv-bp1mhnosdb38****";
            // 资源组名称
            String resourceGroupName = "test";
            // Spark作业内容
            String data = "{\n" +
                    "    \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
                    "    \"args\": [\"1000\"],\n" +
                    "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
                    "    \"name\": \"SparkPi\",\n" +
                    "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
                    "    \"conf\": {\n" +
                    "        \"spark.driver.resourceSpec\": \"medium\",\n" +
                    "        \"spark.executor.instances\": 2,\n" +
                    "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
                    "}\n";
            // Spark作业类型
            String type = "Batch";
            // Spark 作业最大执行时间
            long sparkAppMaxRunningTimeInMilli = 60000;
            // 每一轮扫描时间间隔
            long getAppStateinterval = 2000;
          
    
            // 提交Spark作业
            String appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client);
    
            // 查询Spark作业的状态
            String state;
            long startTimeInMillis = System.currentTimeMillis();
            do {
                state = getAppState(appId, client);
                if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) {
                    System.out.println("Timeout");
                    break;
                } else {
                    System.out.println("Current state: " + state);
                    Thread.sleep(getAppStateinterval);
                }
            } while (!"COMPLETED".equalsIgnoreCase(state) && !"FATAL".equalsIgnoreCase(state)
                    && !"FAILED".equalsIgnoreCase(state));
    
            // 查询Spark作业的详细信息
            SparkAppInfo appInfo = getAppInfo(appId, client);
            String x = String.format("State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n",
                    state,
                    appInfo.getDetail().webUiAddress,
                    appInfo.getDetail().submittedTimeInMillis,
                    appInfo.getDetail().terminatedTimeInMillis);
            System.out.println(x);
    
            // 查询Spark作业的日志信息
            String log = getAppDriverLog(appId, client);
            System.out.println(log);
    
            // 查询Spark历史作业
            List<SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId, 1, 50, client);
            sparkAppInfos.forEach(sparkAppInfo -> {
                String y = String.format("AppId: %s\n State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n",
                        appInfo.getAppId(),
                        appInfo.getState(),
                        appInfo.getDetail().webUiAddress,
                        appInfo.getDetail().submittedTimeInMillis,
                        appInfo.getDetail().terminatedTimeInMillis);
                System.out.println(y);
            });
    
            //  结束Spark作业
            KillSparkAppRequest request = new KillSparkAppRequest();
            request.setAppId(appId);
            client.killSparkApp(request);
        }
    }