全部產品
Search
文件中心

Lindorm:Java High Level REST Client

更新時間:Jul 06, 2024

Java High Level REST Client是Elasticsearch官方提供的進階別REST用戶端,支援更簡單易用的API。Lindorm搜尋引擎相容Elasticsearch 7.10及更早版本的功能,如果您想要進行複雜查詢分析或使用Elasticsearch的一些進階特性,可以通過Java High Level REST Client串連搜尋引擎,輕鬆實現搜尋索引及索引文檔的設計和管理。

前提條件

  • 已安裝Java環境,要求安裝JDK 1.8及以上版本。

  • 已開通搜尋引擎。如何開通,請參見開通指南

  • 已將用戶端IP地址添加至Lindorm白名單,具體操作請參見設定白名單

操作步驟

  1. 安裝High Level Rest Client。以Maven專案為例,在pom.xml檔案的dependencies中添加依賴項。範例程式碼如下:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.10.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.20.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>2.20.0</version>
    </dependency>
    重要

    Java High Level REST Client能夠向上相容,例如6.7.0版本的Java High Level REST Client可以與6.7.0及以上版本的Elasticsearch叢集進行通訊。為保證最大程度地使用新版用戶端的特性,推薦使用7.10.0版本或更低版本的Java High Level REST Client。

  2. 配置串連參數並使用RestClient.builder()方式建立RestHighLevelClient對象。

    //Lindorm搜尋引擎的Elasticsearch相容地址
    String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
    int search_port = 30070;
    
    String username = "user";
    String password = "test";
    final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
    credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    
    
    RestHighLevelClient highClient = new RestHighLevelClient(
      RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
          return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
        }
      })
    );

    參數說明

    參數

    說明

    search_url

    搜尋引擎的Elasticsearch相容串連地址。如何擷取,請參見Elasticsearch相容地址

    重要
    • 如果應用部署在ECS執行個體,建議您通過專用網路訪問Lindorm執行個體,可獲得更高的安全性和更低的網路延遲。

    • 如果應用部署在本地,在通過公網串連Lindorm執行個體前,需在控制台開通公網地址。開通方式:在控制台的左側導覽列,選擇資料庫連接,單擊搜尋引擎頁簽,在頁簽右上方單擊開通公網地址

    • 通過專用網路訪問Lindorm執行個體,search_url請填寫Elasticsearch相容地址對應的專用網路地址。通過公網訪問Lindorm執行個體,search_url請填寫Elasticsearch相容地址對應的公網地址。

    search_port

    Lindorm搜尋引擎Elasticsearch相容的連接埠,固定為30070。

    username

    訪問搜尋引擎的使用者名稱和密碼。

    預設使用者名和密碼的擷取方式:在控制台的左側導覽列,選擇資料庫連接,單擊搜尋引擎頁簽,在搜尋引擎頁簽可擷取。

    password

  3. 使用搜尋引擎。

    範例程式碼分為以下幾部分:

    • 建立搜尋索引:建立搜尋索引lindorm_index。

    • 資料寫入:使用單條寫入方式,將資料寫入ID為test的文檔。再使用批量寫入方式,寫入100000條文檔。

    • 資料查詢:重新整理請求,強制寫入的資料可見。執行兩個請求,分別查詢索引中的全部文檔和ID為test的文檔。

    • 資料刪除:刪除ID為test的文檔,並刪除索引lindorm_index。

    try {
      String index_name = "lindorm_index";
    
      // 構造建立索引的CreateIndex請求
      CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
      // 指定索引的Settings
      Map<String, Object> settingsMap = new HashMap<>();
      settingsMap.put("index.number_of_shards", 4);
      createIndexRequest.settings(settingsMap);
      CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
      if (createIndexResponse.isAcknowledged()) {
        System.out.println("Create index [" + index_name + "] successfully.");
      }
    
      // 文檔id。若不指定會使用系統自產生的id,寫入效能更優
      String doc_id = "test";
      // 文檔欄位。請將field和value替換為實際業務欄位與值
      Map<String, Object> jsonMap = new HashMap<>();
      jsonMap.put("field1", "value1");
      jsonMap.put("field2", "value2");
    
      // 構造寫入單條文檔的Index請求,指定文檔的id和要寫入的欄位
      IndexRequest indexRequest = new IndexRequest(index_name);
      indexRequest.id(doc_id).source(jsonMap);
      IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
      System.out.println("Index document with id[" + indexResponse.getId() + "] successfully.");
    
      // 使用批量寫入的方式寫資料
      int bulkTotal = 100000;
      AtomicLong failedBulkItemCount = new AtomicLong();
      // 建立一個BulkProcessor,用於執行Bulk請求
      BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
        new BulkProcessor.Listener() {
          @Override
          public void beforeBulk(long executionId, BulkRequest request) {}
    
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            // 可以擷取到Bulk請求中每個請求的執行結果,此處統計執行失敗的BulkItem請求個數
            for (BulkItemResponse bulkItemResponse : response) {
              if (bulkItemResponse.isFailed()) {
                failedBulkItemCount.incrementAndGet();
              }
            }
          }
    
          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            // 此處的失敗意味著整個Bulk請求都沒有被執行
            if (null != failure) {
              failedBulkItemCount.addAndGet(request.numberOfActions());
            }
          }
        });
      // 設定Bulk請求的並發數,預設為1
      builder.setConcurrentRequests(10);
      // 設定BulkProcessor發送Bulk請求的閾值,可以是:時間間隔,運算元量或者請求大小
      builder.setFlushInterval(TimeValue.timeValueSeconds(5));
      builder.setBulkActions(5000);
      builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
      BulkProcessor bulkProcessor = builder.build();
      Random random = new Random();
      for (int i = 0; i < bulkTotal; i++) {
        // 請將field和value替換為實際業務欄位與值
        Map<String, Object> map = new HashMap<>();
        map.put("field1", random.nextInt() + "");
        map.put("field2", random.nextInt() + "");
        IndexRequest bulkItemRequest = new IndexRequest(index_name);
        bulkItemRequest.source(map);
        // 添加操作到BulkProcessor
        bulkProcessor.add(bulkItemRequest);
      }
      // 如果需要的話,可以使用awaitClose方法來等待所有的操作都被執行完
      bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
      long failure = failedBulkItemCount.get(),
        success = bulkTotal - failure;
      System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed.");
    
      // 構造強制已寫資料可見的Refresh請求
      RefreshRequest refreshRequest = new RefreshRequest(index_name);
      RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
      System.out.println("Refresh on index [" + index_name + "] successfully.");
    
      // 構造查詢所有資料的Search請求
      SearchRequest searchRequest = new SearchRequest(index_name);
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
      QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
      searchSourceBuilder.query(queryMatchAllBuiler);
      searchRequest.source(searchSourceBuilder);
      SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      long totalHit = searchResponse.getHits().getTotalHits().value;
      System.out.println("Search query match all hits [" + totalHit + "] in total.");
    
      // 構造通過id查詢資料的Search請求
      QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
      searchSourceBuilder.query(queryByIdBuilder);
      searchRequest.source(searchSourceBuilder);
      searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      for (SearchHit searchHit : searchResponse.getHits()) {
        System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]");
      }
    
      // 構造刪除單條文檔的Delete請求,指定要刪除文檔的id
      DeleteRequest deleteRequest = new DeleteRequest(index_name);
      deleteRequest.id(doc_id);
      DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
      System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully.");
    
      // 構造刪除整個索引的DeleteIndex請求
      DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
      AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
      if (deleteIndexResponse.isAcknowledged()) {
        System.out.println("Delete index [" + index_name + "] successfully.");
      }
    
      highClient.close();
    } catch (Exception exception) {
      // 異常處理
      System.out.println("msg " + exception);
    }

完整樣本

完整範例程式碼如下:

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class RestHClientTest {
  private static final RequestOptions COMMON_OPTIONS;
  static {
    // 建立請求配置參數,預設緩衝限制為100MB,此處修改為30MB
    RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
    builder.setHttpAsyncResponseConsumerFactory(
      new HttpAsyncResponseConsumerFactory
        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
    COMMON_OPTIONS = builder.build();
  }

  public static void main(String[] args) {
    // Lindorm搜尋引擎的Elasticsearch相容地址
    String search_url = "ld-t4n5668xk31ui****-proxy-search-public.lindorm.rds.aliyuncs.com";
    int search_port = 30070;

    // Lindorm搜尋引擎的訪問使用者名稱和密碼,通過控制台擷取
    String username = "user";
    String password = "test";

    final CredentialsProvider credentials_provider = new BasicCredentialsProvider();
    credentials_provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    RestHighLevelClient highClient = new RestHighLevelClient(
      RestClient.builder(new HttpHost( search_url, search_port, "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
          return httpClientBuilder.setDefaultCredentialsProvider(credentials_provider);
        }
      })
    );

    try {
      String index_name = "lindorm_index";

      // 構造建立索引的CreateIndex請求
      CreateIndexRequest createIndexRequest = new CreateIndexRequest(index_name);
      // 指定索引的Settings
      Map<String, Object> settingsMap = new HashMap<>();
      settingsMap.put("index.number_of_shards", 4);
      createIndexRequest.settings(settingsMap);
      CreateIndexResponse createIndexResponse = highClient.indices().create(createIndexRequest, COMMON_OPTIONS);
      if (createIndexResponse.isAcknowledged()) {
        System.out.println("Create index [" + index_name + "] successfully.");
      }

      // 文檔id。若不指定會使用系統自產生的id,寫入效能更優
      String doc_id = "test";
      // 文檔欄位。請將field和value替換為實際業務欄位與值
      Map<String, Object> jsonMap = new HashMap<>();
      jsonMap.put("field1", "value1");
      jsonMap.put("field2", "value2");

      // 構造寫入單條文檔的Index請求,指定文檔的id和要寫入的欄位
      IndexRequest indexRequest = new IndexRequest(index_name);
      indexRequest.id(doc_id).source(jsonMap);
      IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
      System.out.println("Index document with id[" + indexResponse.getId() + "] successfully.");

      // 使用批量寫入的方式寫資料
      int bulkTotal = 100000;
      AtomicLong failedBulkItemCount = new AtomicLong();
      // 建立一個BulkProcessor,用於執行Bulk請求
      BulkProcessor.Builder builder = BulkProcessor.builder((request, bulkListener) -> highClient.bulkAsync(request, COMMON_OPTIONS, bulkListener),
        new BulkProcessor.Listener() {
          @Override
          public void beforeBulk(long executionId, BulkRequest request) {}

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            // 可以擷取到Bulk請求中每個請求的執行結果,此處統計執行失敗的BulkItem請求個數
            for (BulkItemResponse bulkItemResponse : response) {
              if (bulkItemResponse.isFailed()) {
                failedBulkItemCount.incrementAndGet();
              }
            }
          }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            // 此處的失敗意味著整個Bulk請求都沒有被執行
            if (null != failure) {
              failedBulkItemCount.addAndGet(request.numberOfActions());
            }
          }
        });
      // 設定Bulk請求的並發數,預設為1
      builder.setConcurrentRequests(10);
      // 設定BulkProcessor發送Bulk請求的閾值,可以是:時間間隔,運算元量或者請求大小
      builder.setFlushInterval(TimeValue.timeValueSeconds(5));
      builder.setBulkActions(5000);
      builder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
      BulkProcessor bulkProcessor = builder.build();
      Random random = new Random();
      for (int i = 0; i < bulkTotal; i++) {
        // 請將field和value替換為實際業務欄位與值
        Map<String, Object> map = new HashMap<>();
        map.put("field1", random.nextInt() + "");
        map.put("field2", random.nextInt() + "");
        IndexRequest bulkItemRequest = new IndexRequest(index_name);
        bulkItemRequest.source(map);
        // 添加操作到BulkProcessor
        bulkProcessor.add(bulkItemRequest);
      }
      // 如果需要的話,可以使用awaitClose方法來等待所有的操作都被執行完
      bulkProcessor.awaitClose(120, TimeUnit.SECONDS);
      long failure = failedBulkItemCount.get(),
        success = bulkTotal - failure;
      System.out.println("Bulk using BulkProcessor finished with [" + success + "] requests succeeded, [" + failure + "] requests failed.");

      // 構造強制已寫資料可見的Refresh請求
      RefreshRequest refreshRequest = new RefreshRequest(index_name);
      RefreshResponse refreshResponse = highClient.indices().refresh(refreshRequest, COMMON_OPTIONS);
      System.out.println("Refresh on index [" + index_name + "] successfully.");

      // 構造查詢所有資料的Search請求
      SearchRequest searchRequest = new SearchRequest(index_name);
      SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
      QueryBuilder queryMatchAllBuiler = new MatchAllQueryBuilder();
      searchSourceBuilder.query(queryMatchAllBuiler);
      searchRequest.source(searchSourceBuilder);
      SearchResponse searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      long totalHit = searchResponse.getHits().getTotalHits().value;
      System.out.println("Search query match all hits [" + totalHit + "] in total.");

      // 構造通過id查詢資料的Search請求
      QueryBuilder queryByIdBuilder = new MatchQueryBuilder("_id", doc_id);
      searchSourceBuilder.query(queryByIdBuilder);
      searchRequest.source(searchSourceBuilder);
      searchResponse = highClient.search(searchRequest, COMMON_OPTIONS);
      for (SearchHit searchHit : searchResponse.getHits()) {
        System.out.println("Search query by id response [" + searchHit.getSourceAsString() + "]");
      }

      // 構造刪除單條文檔的Delete請求,指定要刪除文檔的id
      DeleteRequest deleteRequest = new DeleteRequest(index_name);
      deleteRequest.id(doc_id);
      DeleteResponse deleteResponse = highClient.delete(deleteRequest, COMMON_OPTIONS);
      System.out.println("Delete document with id [" + deleteResponse.getId() + "] successfully.");

      // 構造刪除整個索引的DeleteIndex請求
      DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index_name);
      AcknowledgedResponse deleteIndexResponse = highClient.indices().delete(deleteIndexRequest, COMMON_OPTIONS);
      if (deleteIndexResponse.isAcknowledged()) {
        System.out.println("Delete index [" + index_name + "] successfully.");
      }

      highClient.close();
    } catch (Exception exception) {
      // 異常處理
      System.out.println("msg " + exception);
    }
  }
}

執行成功後將返回如下結果:

Create index [lindorm_index] successfully.
Index document with id[test] successfully.
Bulk using BulkProcessor finished with [100000] requests succeeded, [0] requests failed.
Refresh on index [lindorm_index] successfully.
Search query match all hits [10000] in total.
Search query by id response [{"field1":"value1","field2":"value2"}]
Delete document with id [test] successfully.
Delete index [lindorm_index] successfully.
說明

從返回結果可以看到已成功寫入了100,000條資料,但查詢所有資料的請求響應結果顯示僅匹配到10,000條資料。這是由於服務端預設情況下的hit上限為10,000,因此導致了查詢結果與預期不符。

如果您希望能命中所有資料,可以將SearchSourceBuilder對象的trackTotalHits屬性設定為true,即searchSourceBuilder.trackTotalHits(true);