使用EAS提供的官方SDK进行服务调用,可以有效减少编写调用逻辑的时间并提高调用稳定性。本文介绍官方Java SDK接口详情。同时,以字符串输入输出、TensorFlow输入输出、QueueService客户端和请求数据压缩为例,提供了使用Java SDK进行服务调用的完整程序示例。
添加依赖项
使用Java编写客户端代码时,在Maven工程中使用EAS Java SDK,必须在pom.xml文件<dependencies>中添加eas-sdk的依赖,示例如下,最新版本以Maven仓库中显示的为准。
<dependency>
<groupId>com.aliyun.openservices.eas</groupId>
<artifactId>eas-sdk</artifactId>
<version>2.0.20</version>
</dependency>
EAS2.0.5及以上版本增加了QueueService客户端功能,支持多优先级异步队列服务。如果需要使用该功能,为避免依赖版本冲突,您还需自行添加如下两个依赖,并修改这两个依赖至合适版本:
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
接口列表
类 | 接口 | 描述 |
PredictClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
| 功能:自定义请求 URL。 | |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
HttpConfig |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
| 返回最近一次调用的状态码。 | |
| 返回最近一次调用的状态信息。 | |
TFRequest |
|
|
|
| |
|
| |
TFResponse |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
QueueClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
| 功能:关闭队列服务。 | |
DataFrame |
|
|
|
| |
|
|
程序示例
字符串输入输出示例
对于使用自定义Processor部署服务的用户而言,通常采用字符串进行服务调用(例如,PMML模型服务的调用),具体的Demo程序如下。
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class TestString {
public static void main(String[] args) throws Exception {
// 启动并初始化客户端, client对象需要共享,千万不可每个请求都创建一个client对象。
PredictClient client = new PredictClient(new HttpConfig());
client.setToken("YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****");
// 如果要使用网络直连功能,需使用setDirectEndpoint方法
// 如 client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
// 网络直连需打通在EAS控制台开通,提供用于访问EAS服务的源vswitch,打通后可绕过网关以软负载的方式直接访问服务的实例,以实现更好的稳定性和性能。
// 注:普通网关访问时请使用以用户uid为开头的endpoint,在eas控制台服务的调用信息中可查到。直连访问时请使用如上的pai-eas-vpc.{region_id}.aliyuncs.com的域名进行访问。
client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
client.setModelName("scorecard_pmml_example");
// 输入字符串定义
String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
System.out.println(request);
// 通过eas返回字符串
try {
String response = client.predict(request);
System.out.println(response);
} catch (Exception e) {
e.printStackTrace();
}
// 关闭客户端
client.shutdown();
return;
}
}
如上述程序所示,使用Java SDK调用服务的流程如下:
通过
PredictClient
接口创建客户端服务对象。如果在程序中需要使用多个服务,则创建多个Client对象。为PredictClient对象配置Token、Endpoint及ModelName。
构造STRING类型的request作为输入,通过
client.predict
发送HTTP请求,系统返回response。
TensorFlow输入输出示例
使用TensorFlow的用户,需要将TFRequest和TFResponse分别作为输入和输出数据格式,具体Demo示例如下。
import java.util.List;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;
public class TestTF {
public static TFRequest buildPredictRequest() {
TFRequest request = new TFRequest();
request.setSignatureName("predict_images");
float[] content = new float[784];
for (int i = 0; i < content.length; i++) {
content[i] = (float) 0.0;
}
request.addFeed("images", TFDataType.DT_FLOAT, new long[]{1, 784}, content);
request.addFetch("scores");
return request;
}
public static void main(String[] args) throws Exception {
PredictClient client = new PredictClient(new HttpConfig());
// 如果要使用网络直连功能,需使用setDirectEndpoint方法。
// 如 client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
// 网络直连需打通在EAS控制台开通,提供用于访问EAS服务的源vswitch,打通后可绕过网关以软负载的方式直接访问服务的实例,以实现更好的稳定性和性能。
// 注:普通网关访问时请使用以用户uid为开头的endpoint,在eas控制台服务的调用信息中可查到。直连访问时请使用如上的pai-eas-vpc.{region_id}.aliyuncs.com的域名进行访问。
client.setEndpoint("182848887922****.vpc.cn-shanghai.pai-eas.aliyuncs.com");
client.setModelName("mnist_saved_model_example");
client.setToken("YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****");
long startTime = System.currentTimeMillis();
int count = 1000;
for (int i = 0; i < count; i++) {
try {
TFResponse response = client.predict(buildPredictRequest());
List<Float> result = response.getFloatVals("scores");
System.out.print("Predict Result: [");
for (int j = 0; j < result.size(); j++) {
System.out.print(result.get(j).floatValue());
if (j != result.size() - 1) {
System.out.print(", ");
}
}
System.out.print("]\n");
} catch (Exception e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Spend Time: " + (endTime - startTime) + "ms");
client.shutdown();
}
}
如上述程序所示,使用Java SDK调用TensorFlow服务的流程如下:
通过
PredictClient
接口创建客户端服务对象。如果在程序中需要使用多个服务,则创建多个Client对象。为PredictClient对象配置Token、Endpoint及ModelName。
使用TFRequest类封装输入数据,使用TFResponse类封装输出数据。
QueueService客户端示例
支持通过QueueClient接口使用队列服务功能,具体demo示例如下:
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.http.QueueClient;
import com.aliyun.openservices.eas.predict.queue_client.QueueUser;
import com.aliyun.openservices.eas.predict.queue_client.WebSocketWatcher;
public class DemoWatch {
public static void main(String[] args) throws Exception {
/** 创建队列服务客户端 */
String queueEndpoint = "18*******.cn-hangzhou.pai-eas.aliyuncs.com";
String inputQueueName = "test_queue_service";
String sinkQueueName = "test_queue_service/sink";
String queueToken = "test-token";
/** 输入队列,往输入队列添加数据,推理服务会自动从输入队列中读取请求数据 */
QueueClient inputQueue =
new QueueClient(queueEndpoint, inputQueueName, queueToken, new HttpConfig(), new QueueUser());
/** 输出队列,推理服务处理输入数据后会将结果写入输出队列 */
QueueClient sinkQueue =
new QueueClient(queueEndpoint, sinkQueueName, queueToken, new HttpConfig(), new QueueUser());
/** 清除队列数据!!!请谨慎使用 */
inputQueue.clear();
sinkQueue.clear();
/** 往输入队列添加数据 */
int count = 10;
for (int i = 0; i < count; ++i) {
String data = Integer.toString(i);
inputQueue.put(data.getBytes(), null);
/** 队列服务支持多优先级队列,可通过put函数设置数据优先级,默认优先级为0 */
// inputQueue.put(data.getBytes(), 0L, null);
}
/** 通过watch函数订阅输出队列的数据,窗口大小为5 */
WebSocketWatcher watcher = sinkQueue.watch(0L, 5L, false, true, null);
/** WatchConfig参数可自定义重试次数、重试间隔(单位为秒)、是否无限重试;未配置WatchConfig则默认重试次数:3,重试间隔:5 */
// WebSocketWatcher watcher = sink_queue.watch(0L, 5L, false, true, null, new WatchConfig(3, 1));
// WebSocketWatcher watcher = sink_queue.watch(0L, 5L, false, true, null, new WatchConfig(true, 10));
/** 获取输出数据 */
for (int i = 0; i < count; ++i) {
try {
/** getDataFrame 函数用于获取DataFrame数据类,没有数据时会被阻塞 */
byte[] data = watcher.getDataFrame().getData();
System.out.println("[watch] data = " + new String(data));
} catch (RuntimeException ex) {
System.out.println("[watch] error = " + ex.getMessage());
break;
}
}
/** 关闭已经打开的watcher对象,每个客户端实例只允许存在一个watcher对象,若watcher对象不关闭,再运行时会报错 */
watcher.close();
Thread.sleep(2000);
JSONObject attrs = sinkQueue.attributes();
System.out.println(attrs.toString());
/** 关闭客户端 */
inputQueue.shutdown();
sinkQueue.shutdown();
}
}
如上述程序所示,使用Java SDK调用服务的流程如下:
通过
QueueClient
接口创建队列服务客户端对象。如果创建了推理服务,需同时创建输入队列和输出队列对象。使用
put()
函数向输入队列中发送数据;使用watch()
函数从输出队列中订阅数据。说明现实场景中,发送数据和订阅数据可以由不同的线程处理,本示例中为了演示方便,在同一线程中完成,先Put数据,后Watch结果。
请求数据压缩示例
对于请求数据量较大的情况,EAS支持将数据压缩之后再发送至服务端,目前支持Zlib和Gzip两种压缩格式。该功能需要在服务配置中指定相应的rpc.decompressor
才能生效。
服务配置如下所示:
"metadata": {
"rpc": {
"decompressor": "zlib"
}
}
SDK代码调用示例如下:
package com.aliyun.openservices.eas.predict;
import com.aliyun.openservices.eas.predict.http.Compressor;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
public class TestString {
public static void main(String[] args) throws Exception{
// 启动并初始化客户端。
PredictClient client = new PredictClient(new HttpConfig());
client.setEndpoint("18*******.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("echo_compress");
client.setToken("YzZjZjQwN2E4NGRkMDMxNDk5NzhhZDcwZDBjOTZjOGYwZDYxZGM2****");
// 或者使用Compressor.Gzip。
client.setCompressor(Compressor.Zlib);
// 输入字符串定义。
String request = "[{\"money_credit\": 3000000}, {\"money_credit\": 10000}]";
System.out.println(request);
// 通过eas返回字符串。
String response = client.predict(request);
System.out.println(response);
// 关闭客户端。
client.shutdown();
return;
}
}
相关文档
在服务调用时,您可以通过查看访问服务返回的状态码来确定请求执行结果,具体的状态码请参见附录:服务状态码说明。
更多调用方式,请参见服务调用方式。