全部产品
Search
文档中心

数据湖构建:DataLake SDK for Java 使用参考

更新时间:Nov 06, 2024

欢迎使用阿里云开发者工具套件(Alibaba Cloud SDK for Java),DataLake SDK for Java让您不用复杂编程即可访问数据湖构建。本教程介绍如何安装并开始使用DataLake SDK for Java。

前提条件

  • 创建AccessKey

  • 已安装Java环境。

    DataLake SDK for Java要求使用JDK 1.7或更高版本。

安装Alibaba Cloud SDK for Java

您可以在Maven Repository中获取数据湖构建最新的SDK包,获取地址Maven SDK地址

<dependency>
  <groupId>com.aliyun</groupId>
  <artifactId>datalake20200710</artifactId>
  <version>2.0.12</version>
</dependency>

请求步骤

  1. 初始化请求客户端。调用接口前,需配置环境变量,通过环境变量读取访问凭证。

            Config authConfig = new Config();
       	// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
            // 强烈建议不要将AccessKey ID和AccessKey Secret保存到代码里,会存在密钥泄漏风险。
    	// 此处以将AccessKey ID和 AccessKey Secret保存在环境变量为例说明。运行示例前,请先配置环境变量。
            authConfig.accessKeyId= System.getenv("AK_ENV");
            authConfig.accessKeySecret= System.getenv("SK_ENV");
            authConfig.type= "access_key";
            authConfig.endpoint= "dlf.cn-shanghai.aliyuncs.com";
            authConfig.regionId= "cn-shanghai";
            Client authClient = new Client(authConfig);
  2. 创建请求对象,构造参数。

    下述代码以CreateDatabase(新增元数据库)为例。

            CreateDatabaseRequest request = new CreateDatabaseRequest();
            request.catalogId = "";
    
            DatabaseInput input = new DatabaseInput();
            input.description = "";
            input.locationUri = "oss://test";
            input.name = "example";
    
            request.databaseInput = input;
  3. 执行调用,获取返回结果。

            CreateDatabaseResponseBody response = authClient.createDatabase(request).body;

参考示例

以下示例代码以创建一个元数据库为例。

package com.aliyun.datalake.examples;

import com.aliyun.datalake20200710.Client;
import com.aliyun.datalake20200710.models.CreateDatabaseRequest;
import com.aliyun.datalake20200710.models.CreateDatabaseResponseBody;
import com.aliyun.datalake20200710.models.DatabaseInput;
import com.aliyun.teaopenapi.models.Config;
import com.google.gson.Gson;

public class SchemaExample {

    public static void main(String[] args) throws Exception {
        // 1 Create and initialize a Config instance.
        Config authConfig = new Config();
   	// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
        // 强烈建议不要将AccessKey ID和 AccessKey Secret保存到代码里,会存在密钥泄漏风险。
      	// 此处以将AccessKey ID和 AccessKey Secret保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里。
        authConfig.accessKeyId= System.getenv("AK_ENV");
        authConfig.accessKeySecret= System.getenv("SK_ENV");
        authConfig.type = "access_key";
        authConfig.endpoint = "dlf.cn-shanghai.aliyuncs.com";
        authConfig.regionId = "cn-shanghai";

        Client authClient = new Client(authConfig);

        // 2 Create an API request and set parameters.
        CreateDatabaseRequest request = new CreateDatabaseRequest();
        request.catalogId = "";

        DatabaseInput input = new DatabaseInput();
        input.description = "";
        input.locationUri = "oss://test";
        input.name = "example";

        request.databaseInput = input;
        // 3 Initiate the request and handle the response or exceptions.
        CreateDatabaseResponseBody response = authClient.createDatabase(request).body;
        System.out.println(new Gson().toJson(response));
    }

}

调用成功时,返回结果response示例:

{"code":"OK","message":"","requestId":"1739F0B0-A94E-49AC-95FC-C1CE5E4171FA","success":true}

调用出现异常时,SDK会将状态码和错误信息包装成异常抛出给调用方,示例:

Exception in thread "main" com.aliyun.tea.TeaException: code: 409, Database example already exists request id: 598B1E2F-9AEF-4B13-AE4D-EB8733B643EB
 at com.aliyun.teaopenapi.Client.doROARequest(Client.java:303)
 at com.aliyun.datalake20200710.Client.createDatabaseWithOptions(Client.java:790)
 at com.aliyun.datalake20200710.Client.createDatabase(Client.java:772)
 at com.aliyun.datalake.examples.SchemaExample.main(SchemaExample.java:34)

调用出现网络等未知异常时,SDK会直接抛出,示例:

Exception in thread "main" com.aliyun.tea.TeaException
 at com.aliyun.tea.Tea.doAction(Tea.java:67)
 at com.aliyun.teaopenapi.Client.doROARequest(Client.java:292)
 at com.aliyun.datalake20200710.Client.createDatabaseWithOptions(Client.java:790)
 at com.aliyun.datalake20200710.Client.createDatabase(Client.java:772)
 at com.aliyun.datalake.examples.SchemaExample.main(SchemaExample.java:34)
Caused by: java.net.UnknownHostException: dlf.cn-shanghai.aliyuncs.com
 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
 at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
 at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
 at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
 at java.net.InetAddress.getAllByName(InetAddress.java:1192)
 at java.net.InetAddress.getAllByName(InetAddress.java:1126)
 at okhttp3.Dns$1.lookup(Dns.java:39)
 at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)
 at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)
 at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)
 at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)
 at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
 at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
 at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
 at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
 at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
 at okhttp3.RealCall.execute(RealCall.java:69)
 at com.aliyun.tea.Tea.doAction(Tea.java:64)
 ... 4 more

最佳实践

为了便于返回结果的统一处理,获取到和API文档一致的异常结果,我们可以通过一些固定的写法,来处理SDK正常或异常的返回结果。

例如,我们把API调用层用统一的方法来包装:

public class AbstractAPI {

    protected final Client client;

    public AbstractAPI(Client client) {
        this.client = client;
    }

    public  <M, V extends ResultModel<M>> ResultModel<M> call(Callable<V> c) throws Exception {
        try {
            return c.call();
        } catch (TeaException e) {
            Map<String, Object> data = e.getData();
            if (data!= null && data.get("Code") != null) {
                return TeaModel.toModel(data, new ResultModel<M>());
            } else {
                throw e;
            }
        }
    }
}

对于具体的API,可以继承上面的类,用适合自己的参数构造调用方法。

public class DatabaseAPI extends AbstractAPI {

    public DatabaseAPI(Client client) {
        super(client);
    }

    public ResultModel<Void> createDatabase(String catalogId, String databaseName, String description,
                                            String locationUri, Map<String, String> parameters,
                                            String ownerName, String ownerType, PrincipalPrivilegeSet privileges) throws Exception {
        return call(()-> {
            CreateDatabaseRequest request = new CreateDatabaseRequest();
            request.catalogId = catalogId;

            DatabaseInput input = new DatabaseInput();
            input.description = description;
            input.locationUri = locationUri;
            input.parameters = parameters;
            input.name = databaseName;
            input.ownerName = ownerName;
            input.ownerType = ownerType;
            input.privileges = privileges;

            request.databaseInput = input;

            CreateDatabaseResponseBody response = client.createDatabase(request).body;
            return new ResultModel<>(response.success, response.code, response.message,
                    response.requestId);
        });
    }
}

这样,在使用每个API的时候,就可以直接拿到标准的Response了:

public class SchemaExample {

    public static void main(String[] args) throws Exception {
        // 1 Create and initialize a Config instance.
        Config authConfig = new Config();
   	// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
        // 强烈建议不要将AccessKey ID和AccessKey Secret保存到代码里,会存在密钥泄漏风险
      	// 此处以将AccessKey ID和 AccessKey Secret保存在环境变量为例说明。运行代码示例前,请先配置环境变量。
        authConfig.accessKeyId= System.getenv("AK_ENV");
        authConfig.accessKeySecret= System.getenv("SK_ENV");
        authConfig.type = "access_key";
        authConfig.endpoint = "dlf.cn-shanghai.aliyuncs.com";
        authConfig.regionId = "cn-shanghai";

        Client authClient = new Client(authConfig);

        // 2 Initiate the request and handle the response or exceptions.
        ResultModel<Void> response = new DatabaseAPI(authClient).createDatabase("", "example3", "",
                "oss://test", null,
                null, null, null);
        System.out.println(new Gson().toJson(response));
    }

}

创建成功时的response示例:

{"success":true,"code":"OK","message":"","requestId":"50778D55-696D-45FA-8328-B01983F6CEB1"}

创建出错时的response示例:

{"success":false,"code":"AlreadyExists","message":"Database example3 already exists","requestId":"94617169-DA17-4020-9027-7D8F89160682","httpStatusCode":409}

更多信息

  • DLF目前支持的地域(Region)和域名(Endpoint),请参考已开通的地域和访问域名

  • 在线调试和生成SDK示例。OpenAPI Explorer提供在线调用云产品API、动态生成SDK示例代码和快速检索接口等功能,能显著降低使用API的难度,推荐您使用。