Java High Level REST Client是Elasticsearch官方提供的進階別REST用戶端,支援更簡單易用的API。Lindorm搜尋引擎相容Elasticsearch 7.10及更早版本的功能,如果您想要進行複雜查詢分析或使用Elasticsearch的一些進階特性,可以通過Java High Level REST Client串連搜尋引擎,輕鬆實現搜尋索引及索引文檔的設計和管理。
前提條件
操作步驟
安裝High Level Rest Client。以Maven專案為例,在
pom.xml
檔案的dependencies
中添加依賴項。範例程式碼如下:<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.10.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.20.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.20.0</version> </dependency>
重要Java High Level REST Client能夠向上相容,例如6.7.0版本的Java High Level REST Client可以與6.7.0及以上版本的Elasticsearch叢集進行通訊。為保證最大程度地使用新版用戶端的特性,推薦使用7.10.0版本或更低版本的Java High Level REST Client。
配置串連參數並使用RestClient.builder()方式建立RestHighLevelClient對象。
//Lindorm搜尋引擎的Elasticsearch相容地址 String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com"; int search_port = 30070; String username = "user"; String password = "test"; final CredentialsProvider credentials_provider = new BasicCredentialsProvider(); credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestHighLevelClient highClient = new RestHighLevelClient( RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider); } }) );
參數說明
參數
說明
search_url
搜尋引擎的Elasticsearch相容串連地址。如何擷取,請參見Elasticsearch相容地址。
重要如果應用部署在ECS執行個體,建議您通過專用網路訪問Lindorm執行個體,可獲得更高的安全性和更低的網路延遲。
如果應用部署在本地,在通過公網串連Lindorm執行個體前,需在控制台開通公網地址。開通方式:在控制台的左側導覽列,選擇資料庫連接,單擊搜尋引擎頁簽,在頁簽右上方單擊開通公網地址。
通過專用網路訪問Lindorm執行個體,search_url請填寫Elasticsearch相容地址對應的專用網路地址。通過公網訪問Lindorm執行個體,search_url請填寫Elasticsearch相容地址對應的公網地址。
search_port
Lindorm搜尋引擎Elasticsearch相容的連接埠,固定為30070。
username
訪問搜尋引擎的使用者名稱和密碼。
預設使用者名和密碼的擷取方式:在控制台的左側導覽列,選擇資料庫連接,單擊搜尋引擎頁簽,在搜尋引擎頁簽可擷取。
password
使用搜尋引擎。
範例程式碼分為以下幾部分:
建立搜尋索引:建立搜尋索引lindorm_index。
資料寫入:使用單條寫入方式,將資料寫入ID為test的文檔。再使用批量寫入方式,寫入100000條文檔。
資料查詢:重新整理請求,強制寫入的資料可見。執行兩個請求,分別查詢索引中的全部文檔和ID為test的文檔。
資料刪除:刪除ID為test的文檔,並刪除索引lindorm_index。
try { String index_name = "lindorm_index"; // 構造建立索引的CreateIndex請求 CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name); // 指定索引的Settings Map<String, Object> settingsMap = new HashMap<>(); settingsMap.put("index.number_of_shards", 4); createIndexRequest.settings(settingsMap); CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS); if (createIndexResponse.isAcknowledged()) { System.out.println("Create index [" + index_name + "] successfully."); } // 文檔id。若不指定會使用系統自產生的id,寫入效能更優 String doc_id = "test"; // 文檔欄位。請將field和value替換為實際業務欄位與值 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("field1", "value1"); jsonMap.put("field2", "value2"); // 構造寫入單條文檔的Index請求,指定文檔的id和要寫入的欄位 IndexRequest indexRequest = new IndexRequest(index_name); indexRequest.id(doc_id).source(jsonMap); IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS); System.out.println("Index document with id[" + indexResponse.getId() + "] successfully."); // 使用批量寫入的方式寫資料 int bulkTotal = 100000; AtomicLong failedBulkItemCount = new AtomicLong(); // 建立一個BulkProcessor,用於執行Bulk請求 BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener), new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 可以擷取到Bulk請求中每個請求的執行結果,此處統計執行失敗的BulkItem請求個數 for (BulkItemResponse bulkItemResponse : response) { if (bulkItemResponse.isFailed()) { failedBulkItemCount.incrementAndGet(); } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 此處的失敗意味著整個Bulk請求都沒有被執行 if (null != failure) { failedBulkItemCount.addAndGet(request.numberOfActions()); } } }); // 設定Bulk請求的並發數,預設為1 builder.setConcurrentRequests(10); // 設定BulkProcessor發送Bulk請求的閾值,可以是:時間間隔,運算元量或者請求大小 builder.setFlushInterval(TimeValue.timeValueSeconds(5)); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)); BulkProcessor bulkProcessor = builder.build(); Random random = new Random(); for (int i = 0; i < bulkTotal; i++) { // 請將field和value替換為實際業務欄位與值 Map<String, Object> map = new HashMap<>(); map.put("field1", random.nextInt() + ""); map.put("field2", random.nextInt() + ""); IndexRequest bulkItemRequest = new IndexRequest(index_name); bulkItemRequest.source(map); // 添加操作到BulkProcessor bulkProcessor.add(bulkItemRequest); } // 如果需要的話,可以使用awaitClose方法來等待所有的操作都被執行完 bulkProcessor.awaitClose(120, TimeUnit.SECONDS); long failure = failedBulkItemCount.get(), success = bulkTotal - failure; System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed."); // 構造強制已寫資料可見的Refresh請求 RefreshRequest refreshRequest = new RefreshRequest(index_name); RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS); System.out.println("Refresh on index [" + index_name + "] successfully."); // 構造查詢所有資料的Search請求 SearchRequest searchRequest = new SearchRequest(index_name); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder(); searchSourceBuilder.query(queryMatchAllBuiler); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); long totalHit = searchResponse.getHits().getTotalHits().value; System.out.println("Search query match all hits [" + totalHit + "] in total."); // 構造通過id查詢資料的Search請求 QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id); searchSourceBuilder.query(queryByIdBuilder); searchRequest.source(searchSourceBuilder); searchResponse = highClient.search(searchRequest, COMMON_OPTIONS); for (SearchHit searchHit : searchResponse.getHits()) { System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]"); } // 構造刪除單條文檔的Delete請求,指定要刪除文檔的id DeleteRequest deleteRequest = new DeleteRequest(index_name); deleteRequest.id(doc_id); DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS); System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully."); // 構造刪除整個索引的DeleteIndex請求 DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name); AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS); if (deleteIndexResponse.isAcknowledged()) { System.out.println("Delete index [" + index_name + "] successfully."); } highClient.close(); } catch (Exception exception) { // 異常處理 System.out.println("msg " + exception); }
完整樣本
完整範例程式碼如下:
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class RestHClientTest {
private static final RequestOptions COMMON_OPTIONS;
static {
// 建立請求配置參數,預設緩衝限制為100MB,此處修改為30MB
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
public static void main(String[] args) {
// Lindorm搜尋引擎的Elasticsearch相容地址
String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
int search_port = 30070;
// Lindorm搜尋引擎的訪問使用者名稱和密碼,通過控制台擷取
String username = "user";
String password = "test";
final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestHighLevelClient highClient = new RestHighLevelClient(
RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
}
})
);
try {
String index_name = "lindorm_index";
// 構造建立索引的CreateIndex請求
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
// 指定索引的Settings
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("index.number_of_shards", 4);
createIndexRequest.settings(settingsMap);
CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
if (createIndexResponse.isAcknowledged()) {
System.out.println("Create index [" + index_name + "] successfully.");
}
// 文檔id。若不指定會使用系統自產生的id,寫入效能更優
String doc_id = "test";
// 文檔欄位。請將field和value替換為實際業務欄位與值
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("field1", "value1");
jsonMap.put("field2", "value2");
// 構造寫入單條文檔的Index請求,指定文檔的id和要寫入的欄位
IndexRequest indexRequest = new IndexRequest(index_name);
indexRequest.id(doc_id).source(jsonMap);
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
System.out.println("Index document with id[" + indexResponse.getId() + "] successfully.");
// 使用批量寫入的方式寫資料
int bulkTotal = 100000;
AtomicLong failedBulkItemCount = new AtomicLong();
// 建立一個BulkProcessor,用於執行Bulk請求
BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// 可以擷取到Bulk請求中每個請求的執行結果,此處統計執行失敗的BulkItem請求個數
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
failedBulkItemCount.incrementAndGet();
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// 此處的失敗意味著整個Bulk請求都沒有被執行
if (null != failure) {
failedBulkItemCount.addAndGet(request.numberOfActions());
}
}
});
// 設定Bulk請求的並發數,預設為1
builder.setConcurrentRequests(10);
// 設定BulkProcessor發送Bulk請求的閾值,可以是:時間間隔,運算元量或者請求大小
builder.setFlushInterval(TimeValue.timeValueSeconds(5));
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
BulkProcessor bulkProcessor = builder.build();
Random random = new Random();
for (int i = 0; i < bulkTotal; i++) {
// 請將field和value替換為實際業務欄位與值
Map<String, Object> map = new HashMap<>();
map.put("field1", random.nextInt() + "");
map.put("field2", random.nextInt() + "");
IndexRequest bulkItemRequest = new IndexRequest(index_name);
bulkItemRequest.source(map);
// 添加操作到BulkProcessor
bulkProcessor.add(bulkItemRequest);
}
// 如果需要的話,可以使用awaitClose方法來等待所有的操作都被執行完
bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
long failure = failedBulkItemCount.get(),
success = bulkTotal - failure;
System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed.");
// 構造強制已寫資料可見的Refresh請求
RefreshRequest refreshRequest = new RefreshRequest(index_name);
RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
System.out.println("Refresh on index [" + index_name + "] successfully.");
// 構造查詢所有資料的Search請求
SearchRequest searchRequest = new SearchRequest(index_name);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
searchSourceBuilder.query(queryMatchAllBuiler);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
long totalHit = searchResponse.getHits().getTotalHits().value;
System.out.println("Search query match all hits [" + totalHit + "] in total.");
// 構造通過id查詢資料的Search請求
QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
searchSourceBuilder.query(queryByIdBuilder);
searchRequest.source(searchSourceBuilder);
searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
for (SearchHit searchHit : searchResponse.getHits()) {
System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]");
}
// 構造刪除單條文檔的Delete請求,指定要刪除文檔的id
DeleteRequest deleteRequest = new DeleteRequest(index_name);
deleteRequest.id(doc_id);
DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully.");
// 構造刪除整個索引的DeleteIndex請求
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
if (deleteIndexResponse.isAcknowledged()) {
System.out.println("Delete index [" + index_name + "] successfully.");
}
highClient.close();
} catch (Exception exception) {
// 異常處理
System.out.println("msg " + exception);
}
}
}
執行成功後將返回如下結果:
Create index [lindorm_index] successfully.
Index document with id[test] successfully.
Bulk using BulkProcessor finished with [100000] requests succeeded, [0] requests failed.
Refresh on index [lindorm_index] successfully.
Search query match all hits [10000] in total.
Search query by id response [{"field1":"value1","field2":"value2"}]
Delete document with id [test] successfully.
Delete index [lindorm_index] successfully.
從返回結果可以看到已成功寫入了100,000條資料,但查詢所有資料的請求響應結果顯示僅匹配到10,000條資料。這是由於服務端預設情況下的hit上限為10,000,因此導致了查詢結果與預期不符。
如果您希望能命中所有資料,可以將SearchSourceBuilder對象的trackTotalHits屬性設定為true
,即searchSourceBuilder.trackTotalHits(true);
。