Java High Level REST Client是Elasticsearch官方提供的高级别REST客户端,支持更简单易用的API。Lindorm搜索引擎兼容Elasticsearch 7.10及更早版本的功能,如果您想要进行复杂查询分析或使用Elasticsearch的一些高级特性,可以通过Java High Level REST Client连接搜索引擎,轻松实现搜索索引及索引文档的设计和管理。
前提条件
操作步骤
安装High Level Rest Client。以Maven项目为例,在
pom.xml
文件的dependencies
中添加依赖项。示例代码如下:<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.10.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.20.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.20.0</version> </dependency>
重要Java High Level REST Client能够向上兼容,例如6.7.0版本的Java High Level REST Client可以与6.7.0及以上版本的Elasticsearch集群进行通信。为保证最大程度地使用新版客户端的特性,推荐使用7.10.0版本或更低版本的Java High Level REST Client。
配置连接参数并使用RestClient.builder()方式创建RestHighLevelClient对象。
//Lindorm搜索引擎的Elasticsearch兼容地址 String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com"; int search_port = 30070; String username = "user"; String password = "test"; final CredentialsProvider credentials_provider = new BasicCredentialsProvider(); credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestHighLevelClient highClient = new RestHighLevelClient( RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider); } }) );
参数说明
参数
说明
search_url
搜索引擎的Elasticsearch兼容连接地址。如何获取,请参见Elasticsearch兼容地址。
重要如果应用部署在ECS实例,建议您通过专有网络访问Lindorm实例,可获得更高的安全性和更低的网络延迟。
如果应用部署在本地,在通过公网连接Lindorm实例前,需在控制台开通公网地址。开通方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在页签右上角单击开通公网地址。
通过专有网络访问Lindorm实例,search_url请填写Elasticsearch兼容地址对应的专有网络地址。通过公网访问Lindorm实例,search_url请填写Elasticsearch兼容地址对应的公网地址。
search_port
Lindorm搜索引擎Elasticsearch兼容的端口,固定为30070。
username
访问搜索引擎的用户名和密码。
默认用户名和密码的获取方式:在控制台的左侧导航栏,选择数据库连接,单击搜索引擎页签,在搜索引擎页签可获取。
password
使用搜索引擎。
示例代码分为以下几部分:
创建搜索索引:创建搜索索引lindorm_index。
数据写入:使用单条写入方式,将数据写入ID为test的文档。再使用批量写入方式,写入100000条文档。
数据查询:刷新请求,强制写入的数据可见。执行两个请求,分别查询索引中的全部文档和ID为test的文档。
数据删除:删除ID为test的文档,并删除索引lindorm_index。
try { String index_name = "lindorm_index"; // 构造创建索引的CreateIndex请求 CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name); // 指定索引的Settings Map<String, Object> settingsMap = new HashMap<>(); settingsMap.put("index.number_of_shards", 4); createIndexRequest.settings(settingsMap); CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS); if (createIndexResponse.isAcknowledged()) { System.out.println("Create index [" + index_name + "] successfully."); } // 文档id。若不指定会使用系统自生成的id,写入性能更优 String doc_id = "test"; // 文档字段。请将field和value替换为实际业务字段与值 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("field1", "value1"); jsonMap.put("field2", "value2"); // 构造写入单条文档的Index请求,指定文档的id和要写入的字段 IndexRequest indexRequest = new IndexRequest(index_name); indexRequest.id(doc_id).source(jsonMap); IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS); System.out.println("Index document with id[" + indexResponse.getId() + "] successfully."); // 使用批量写入的方式写数据 int bulkTotal = 100000; AtomicLong failedBulkItemCount = new AtomicLong(); // 创建一个BulkProcessor,用于执行Bulk请求 BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener), new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 可以获取到Bulk请求中每个请求的执行结果,此处统计执行失败的BulkItem请求个数 for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { failedBulkItemCount.incrementAndGet(); } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 此处的失败意味着整个Bulk请求都没有被执行 if (null != failure) { failedBulkItemCount.addAndGet(request.numberOfActions()); } } }); // 设置Bulk请求的并发数,默认为1 builder.setConcurrentRequests(10); // 设置BulkProcessor发送Bulk请求的阈值,可以是:时间间隔,操作数量或者请求大小 builder.setFlushInterval(TimeValue.timeValueSeconds(5)); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); BulkProcessor bulkProcessor = builder.build(); Random random = new Random(); for (int i = 0; i < bulkTotal; i++) { // 请将field和value替换为实际业务字段与值 Map<String, Object> map = new HashMap<>(); map.put("field1", random.nextInt() + ""); map.put("field2", random.nextInt() + ""); IndexRequest bulkItemRequest = new IndexRequest(index_name); bulkItemRequest.source(map); // 添加操作到BulkProcessor bulkProcessor.add(bulkItemRequest); } // 如果需要的话,可以使用awaitClose方法来等待所有的操作都被执行完 bulkProcessor.awaitClose(120, TimeUnit.SECONDS); long failure = failedBulkItemCount.get(), success = bulkTotal - failure; System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed."); // 构造强制已写数据可见的Refresh请求 RefreshRequest refreshRequest = new RefreshRequest(index_name); RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS); System.out.println("Refresh on index [" + index_name + "] successfully."); // 构造查询所有数据的Search请求 SearchRequest searchRequest = new SearchRequest(index_name); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder(); searchSourceBuilder.query(queryMatchAllBuiler); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); long totalHit = searchResponse.getHits().getTotalHits().value; System.out.println("Search query match all hits [" + totalHit + "] in total."); // 构造通过id查询数据的Search请求 QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id); searchSourceBuilder.query(queryByIdBuilder); searchRequest.source(searchSourceBuilder); searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); for (SearchHit searchHit : searchResponse.getHits()) { System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]"); } // 构造删除单条文档的Delete请求,指定要删除文档的id DeleteRequest deleteRequest = new DeleteRequest(index_name); deleteRequest.id(doc_id); DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS); System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully."); // 构造删除整个索引的DeleteIndex请求 DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name); AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS); if (deleteIndexResponse.isAcknowledged()) { System.out.println("Delete index [" + index_name + "] successfully."); } highClient.close(); } catch (Exception exception) { // 异常处理 System.out.println("msg " + exception); }
完整示例
完整示例代码如下:
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class RestHClientTest {
private static final RequestOptions COMMON_OPTIONS;
static {
// 创建请求配置参数,默认缓存限制为100MB,此处修改为30MB
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
public static void main(String[] args) {
// Lindorm搜索引擎的Elasticsearch兼容地址
String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
int search_port = 30070;
// Lindorm搜索引擎的访问用户名和密码,通过控制台获取
String username = "user";
String password = "test";
final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestHighLevelClient highClient = new RestHighLevelClient(
RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
}
})
);
try {
String index_name = "lindorm_index";
// 构造创建索引的CreateIndex请求
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
// 指定索引的Settings
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("index.number_of_shards", 4);
createIndexRequest.settings(settingsMap);
CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
if (createIndexResponse.isAcknowledged()) {
System.out.println("Create index [" + index_name + "] successfully.");
}
// 文档id。若不指定会使用系统自生成的id,写入性能更优
String doc_id = "test";
// 文档字段。请将field和value替换为实际业务字段与值
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("field1", "value1");
jsonMap.put("field2", "value2");
// 构造写入单条文档的Index请求,指定文档的id和要写入的字段
IndexRequest indexRequest = new IndexRequest(index_name);
indexRequest.id(doc_id).source(jsonMap);
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
System.out.println("Index document with id[" + indexResponse.getId() + "] successfully.");
// 使用批量写入的方式写数据
int bulkTotal = 100000;
AtomicLong failedBulkItemCount = new AtomicLong();
// 创建一个BulkProcessor,用于执行Bulk请求
BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// 可以获取到Bulk请求中每个请求的执行结果,此处统计执行失败的BulkItem请求个数
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
failedBulkItemCount.incrementAndGet();
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// 此处的失败意味着整个Bulk请求都没有被执行
if (null != failure) {
failedBulkItemCount.addAndGet(request.numberOfActions());
}
}
});
// 设置Bulk请求的并发数,默认为1
builder.setConcurrentRequests(10);
// 设置BulkProcessor发送Bulk请求的阈值,可以是:时间间隔,操作数量或者请求大小
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
BulkProcessor bulkProcessor = builder.build();
Random random = new Random();
for (int i = 0; i < bulkTotal; i++) {
// 请将field和value替换为实际业务字段与值
Map<String, Object> map = new HashMap<>();
map.put("field1", random.nextInt() + "");
map.put("field2", random.nextInt() + "");
IndexRequest bulkItemRequest = new IndexRequest(index_name);
bulkItemRequest.source(map);
// 添加操作到BulkProcessor
bulkProcessor.add(bulkItemRequest);
}
// 如果需要的话,可以使用awaitClose方法来等待所有的操作都被执行完
bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
long failure = failedBulkItemCount.get(),
success = bulkTotal - failure;
System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed.");
// 构造强制已写数据可见的Refresh请求
RefreshRequest refreshRequest = new RefreshRequest(index_name);
RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
System.out.println("Refresh on index [" + index_name + "] successfully.");
// 构造查询所有数据的Search请求
SearchRequest searchRequest = new SearchRequest(index_name);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
searchSourceBuilder.query(queryMatchAllBuiler);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
long totalHit = searchResponse.getHits().getTotalHits().value;
System.out.println("Search query match all hits [" + totalHit + "] in total.");
// 构造通过id查询数据的Search请求
QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
searchSourceBuilder.query(queryByIdBuilder);
searchRequest.source(searchSourceBuilder);
searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
for (SearchHit searchHit : searchResponse.getHits()) {
System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]");
}
// 构造删除单条文档的Delete请求,指定要删除文档的id
DeleteRequest deleteRequest = new DeleteRequest(index_name);
deleteRequest.id(doc_id);
DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully.");
// 构造删除整个索引的DeleteIndex请求
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
if (deleteIndexResponse.isAcknowledged()) {
System.out.println("Delete index [" + index_name + "] successfully.");
}
highClient.close();
} catch (Exception exception) {
// 异常处理
System.out.println("msg " + exception);
}
}
}
执行成功后将返回如下结果:
Create index [lindorm_index] successfully.
Index document with id[test] successfully.
Bulk using BulkProcessor finished with [100000] requests succeeded, [0] requests failed.
Refresh on index [lindorm_index] successfully.
Search query match all hits [10000] in total.
Search query by id response [{"field1":"value1","field2":"value2"}]
Delete document with id [test] successfully.
Delete index [lindorm_index] successfully.
从返回结果可以看到已成功写入了100,000条数据,但查询所有数据的请求响应结果显示仅匹配到10,000条数据。这是由于服务端默认情况下的hit上限为10,000,因此导致了查询结果与预期不符。
如果您希望能命中所有数据,可以将SearchSourceBuilder对象的trackTotalHits属性设置为true
,即searchSourceBuilder.trackTotalHits(true);
。