当前位置: 首页 > news >正文

【ElasticSearch】客户端选择

【ElasticSearch】客户端选择

  • 【一】springboot整合es的客户端工具
    • 【1】可选工具和客户端
      • (1)ElasticsearchRepository​
      • (2)​ElasticsearchRestTemplate​(或旧版的ElasticsearchTemplate):
      • (3)High Level REST Client​
      • (4)​Java API Client​
    • 【2】如何选择
    • 【3】如何使用
      • (1)ElasticsearchRepository 使用
      • (2)ElasticsearchRestTemplate 使用
      • (3)新的Java API Client使用(Elasticsearch 7.17+)
  • 【二】ElasticsearchRepository使用案例
    • 【1】如何使用
      • (1)步骤1:定义实体类
      • (2)步骤2:创建继承ElasticsearchRepository的接口
      • (3)步骤3:使用该接口进行CRUD操作
      • (4)配置类
    • 【2】可用方法
      • (1)基本 CRUD 方法
      • (2)分页和排序
      • (3)自定义查询方法
  • 【三】ElasticsearchRestTemplate使用案例
    • 【1】核心功能
    • 【2】可用方法
      • (1)文档操作方法
      • (2)查询操作方法
      • (3)聚合分析方法
      • (4)索引管理方法
    • 【3】使用案例
      • (1)配置类
      • (2)实体类
      • (3)服务层实现
    • 【4】实践优化
      • (1)批量操作优化
      • (2)高校分页查询
  • 【四】​Java API Client​使用案例
    • 【1】介绍
    • 【2】核心功能
      • (1)主要模块
      • (2)主要类
    • 【3】使用案例
      • (1)maven配置
      • (2)配置类
      • (3)实体类
      • (4)服务层实现
    • 【4】高级用法
      • (1)异步操作
      • (2)复杂聚合
      • (3)自定义json映射

【一】springboot整合es的客户端工具

【1】可选工具和客户端

(1)ElasticsearchRepository​

这是Spring Data Elasticsearch提供的一个接口,类似于Spring Data JPA的Repository。它提供了基本的CRUD操作和简单的查询方法(通过方法名或注解定义查询)。适用于简单的CRUD操作和查询,能够快速开发。

(2)​ElasticsearchRestTemplate​(或旧版的ElasticsearchTemplate):

(1)ElasticsearchTemplate是Spring Data Elasticsearch早期版本中的主要类,基于TransportClient(已弃用)。
(2)ElasticsearchRestTemplate是Spring Data Elasticsearch 3.2.x及以上版本推荐的类,基于High Level REST Client。它提供了更底层的操作,可以执行复杂的查询和聚合,适用于需要高度自定义查询的场景。

(3)High Level REST Client​

Elasticsearch官方提供的Java高级 REST 客户端,用于与Elasticsearch集群通信。它提供了所有Elasticsearch操作的方法,但使用起来相对繁琐,需要手动构建请求和解析响应。在Spring Data Elasticsearch中,通常不需要直接使用,因为ElasticsearchRestTemplate已经对其进行了封装。

(4)​Java API Client​

Elasticsearch 7.15及以上版本引入了新的Java API客户端,这是一个基于Jackson的强类型客户端,提供了更好的类型安全和性能。但是,Spring Data Elasticsearch目前(截至3.2.x)还没有完全整合这个新客户端。

【2】如何选择

(1)如果只需要基本的CRUD和简单查询,推荐使用ElasticsearchRepository,因为它使用简单,代码量少。
(2)如果需要执行复杂的查询、聚合、或者需要更灵活地控制查询过程,那么应该使用ElasticsearchRestTemplate。
(3)如果Spring Data Elasticsearch提供的功能无法满足需求(例如,使用一些非常新的Elasticsearch特性),可以考虑直接使用Elasticsearch的Java API Client,但这样会失去Spring Data的便利性。

在这里插入图片描述

【3】如何使用

(1)ElasticsearchRepository 使用

(1)创建Repository接口

public interface ProductRepository extends ElasticsearchRepository<Product, String> {// 自定义查询方法List<Product> findByName(String name);List<Product> findByPriceBetween(Double minPrice, Double maxPrice);Page<Product> findByCategory(String category, Pageable pageable);// 使用@Query注解自定义DSL@Query("{\"match\": {\"name\": \"?0\"}}")Page<Product> findByNameCustom(String name, Pageable pageable);
}

(2)使用示例

@Service
@RequiredArgsConstructor
public class ProductService {private final ProductRepository productRepository;public Page<Product> searchProducts(String keyword, int page, int size) {return productRepository.findByNameCustom(keyword, PageRequest.of(page, size, Sort.by("price").descending()));}public List<Product> findProductsByPriceRange(double min, double max) {return productRepository.findByPriceBetween(min, max);}
}

(2)ElasticsearchRestTemplate 使用

(1)配置类

@Configuration
public class ElasticsearchConfig {@Beanpublic ElasticsearchRestTemplate elasticsearchRestTemplate(ElasticsearchRestClient elasticsearchRestClient) {return new ElasticsearchRestTemplate(elasticsearchRestClient);}
}

(2)复杂查询实现

@Service
@RequiredArgsConstructor
public class ProductSearchService {private final ElasticsearchRestTemplate elasticsearchRestTemplate;public SearchPage<Product> complexSearch(String keyword, String category, Double minPrice, Double maxPrice, int page, int size) {// 构建布尔查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();if (StringUtils.hasText(keyword)) {boolQuery.must(QueryBuilders.matchQuery("name", keyword).boost(2.0f));}if (StringUtils.hasText(category)) {boolQuery.must(QueryBuilders.termQuery("category", category));}if (minPrice != null || maxPrice != null) {RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price");if (minPrice != null) rangeQuery.gte(minPrice);if (maxPrice != null) rangeQuery.lte(maxPrice);boolQuery.must(rangeQuery);}// 构建分页和排序NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(boolQuery).withPageable(PageRequest.of(page, size)).withSort(SortBuilders.fieldSort("price").order(SortOrder.DESC)).build();SearchHits<Product> searchHits = elasticsearchRestTemplate.search(searchQuery, Product.class);return SearchHitSupport.searchPageFor(searchHits, searchQuery.getPageable());}// 聚合查询示例public Map<String, Long> getCategoryStats() {TermsAggregationBuilder aggregation = AggregationBuilders.terms("category_agg").field("category").size(10);NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().addAggregation(aggregation).build();SearchHits<Product> searchHits = elasticsearchRestTemplate.search(searchQuery, Product.class);Terms terms = searchHits.getAggregations().get("category_agg");return terms.getBuckets().stream().collect(Collectors.toMap(Terms.Bucket::getKeyAsString, Terms.Bucket::getDocCount));}
}

(3)新的Java API Client使用(Elasticsearch 7.17+)

(1)添加依赖

<dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-elasticsearch</artifactId><version>5.1.0</version>
</dependency>
<dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.11.4</version>
</dependency>

(2)配置客户端

@Configuration
public class ElasticsearchClientConfig {@Value("${spring.elasticsearch.uris}")private String[] elasticsearchUris;@Beanpublic ElasticsearchClient elasticsearchClient() {// 创建低级客户端RestClient restClient = RestClient.builder(HttpHost.create(elasticsearchUris[0])).build();// 创建传输层ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());// 创建API客户端return new ElasticsearchClient(transport);}
}

(3)使用Java API Client

@Service
@RequiredArgsConstructor
public class ProductJavaClientService {private final ElasticsearchClient elasticsearchClient;public void createProduct(Product product) throws IOException {elasticsearchClient.index(i -> i.index("products").id(product.getId()).document(product));}public List<Product> searchProducts(String keyword) throws IOException {SearchResponse<Product> response = elasticsearchClient.search(s -> s.index("products").query(q -> q.match(m -> m.field("name").query(keyword))),Product.class);return response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());}
}

【二】ElasticsearchRepository使用案例

【1】如何使用

(1)步骤1:定义实体类

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;@Document(indexName = "products")
@Data
public class Product {@Idprivate String id;@Field(type = FieldType.Text, analyzer = "ik_max_word")private String name;@Field(type = FieldType.Double)private Double price;@Field(type = FieldType.Keyword)private String category;@Field(type = FieldType.Integer)private Integer stock;// 构造方法public Product() {}public Product(String name, Double price, String category, Integer stock) {this.name = name;this.price = price;this.category = category;this.stock = stock;}}

(2)步骤2:创建继承ElasticsearchRepository的接口

import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;import java.util.List;public interface ProductRepository extends ElasticsearchRepository<Product, String> {// 1. 方法名派生查询List<Product> findByName(String name);List<Product> findByPriceBetween(Double minPrice, Double maxPrice);List<Product> findByCategoryAndStockGreaterThan(String category, Integer stock);Page<Product> findByCategory(String category, Pageable pageable);// 2. 使用@Query自定义查询@Query("{\"match\": {\"name\": \"?0\"}}")List<Product> findByNameCustom(String name);@Query("{\"range\": {\"price\": {\"gte\": ?0, \"lte\": ?1}}}")List<Product> findByPriceRange(Double minPrice, Double maxPrice);@Query("{\"bool\": {\"must\": [{\"match\": {\"category\": \"?0\"}}, {\"range\": {\"stock\": {\"gt\": ?1}}}]}}")List<Product> findByCategoryAndStockGreaterThanCustom(String category, Integer stock);// 3. 聚合查询@Query("{\"aggs\": {\"category_count\": {\"terms\": {\"field\": \"category.keyword\"}}}}")Map<String, Long> countByCategory();
}

(3)步骤3:使用该接口进行CRUD操作

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.Optional;@Service
public class ProductService {private final ProductRepository productRepository;@Autowiredpublic ProductService(ProductRepository productRepository) {this.productRepository = productRepository;}// 保存产品public Product saveProduct(Product product) {return productRepository.save(product);}// 批量保存public Iterable<Product> saveProducts(List<Product> products) {return productRepository.saveAll(products);}// 根据ID查询public Optional<Product> findById(String id) {return productRepository.findById(id);}// 检查是否存在public boolean existsById(String id) {return productRepository.existsById(id);}// 根据名称查询public List<Product> findByName(String name) {return productRepository.findByName(name);}// 价格范围查询public List<Product> findByPriceRange(Double minPrice, Double maxPrice) {return productRepository.findByPriceBetween(minPrice, maxPrice);}// 分类分页查询public Page<Product> findByCategory(String category, int page, int size) {return productRepository.findByCategory(category, PageRequest.of(page, size));}// 自定义查询public List<Product> findByNameCustom(String name) {return productRepository.findByNameCustom(name);}// 删除产品public void deleteProduct(String id) {productRepository.deleteById(id);}// 获取所有产品(排序)public List<Product> findAllSortedByPrice() {return (List<Product>) productRepository.findAll(Sort.by(Sort.Direction.DESC, "price"));}// 分页查询所有产品public Page<Product> findAllProducts(int page, int size) {return productRepository.findAll(PageRequest.of(page, size));}// 按分类统计产品数量public Map<String, Long> countProductsByCategory() {return productRepository.countByCategory();}
}

(4)配置类

import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.repository")
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {@Override@Beanpublic RestHighLevelClient elasticsearchClient() {ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo("localhost:9200").build();return RestClients.create(clientConfiguration).rest();}
}

【2】可用方法

(1)基本 CRUD 方法

save(S entity):保存单个实体
saveAll(Iterable entities):批量保存
findById(ID id):根据ID查询
existsById(ID id):检查是否存在
findAll():查询所有文档
findAllById(Iterable ids):根据ID列表查询
count():统计文档数量
deleteById(ID id):根据ID删除
delete(T entity):删除实体
deleteAll():删除所有文档

(2)分页和排序

findAll(Pageable pageable):分页查询
findAll(Sort sort):排序查询

(3)自定义查询方法

方法名派生查询:findBy[Field][Operation]
@Query注解自定义查询

【三】ElasticsearchRestTemplate使用案例

【1】核心功能

ElasticsearchRestTemplate是 Spring Data Elasticsearch 提供的高级操作类,比 ElasticsearchRepository更灵活,支持更复杂的操作:

文档操作​:索引、更新、删除文档
​查询操作​:执行各种类型的查询(匹配、范围、布尔等)
​聚合分析​:执行统计、分组等聚合操作
索引管理​:创建、删除索引,管理映射
​批量操作​:高效执行批量索引/删除
​脚本支持​:执行脚本更新
​地理空间查询​:执行地理位置相关查询

【2】可用方法

(1)文档操作方法

index(T entity):索引单个文档
save(T entity):保存/更新文档
bulkIndex(List queries):批量索引文档
delete(String id, Class clazz):删除文档
delete(Query query, Class clazz):按查询删除文档
deleteByQuery(Query query, Class clazz):按查询删除文档
get(String id, Class clazz):根据ID获取文档

(2)查询操作方法

search(Query query, Class clazz):执行查询
search(SearchQuery query, Class clazz):执行搜索查询
queryForList(Query query, Class clazz):查询文档列表
queryForPage(Query query, Class clazz):分页查询
suggest(SuggestBuilder suggestion, Class clazz):执行建议查询

(3)聚合分析方法

aggregate(Aggregation aggregation, Class clazz):执行聚合
query(Query query, ResultsExtractor resultsExtractor):自定义结果提取

(4)索引管理方法

indexOps(Class clazz):获取索引操作接口
createIndex(Class clazz):创建索引
deleteIndex(Class clazz):删除索引
putMapping(Class clazz):更新映射
refresh(Class clazz):刷新索引

【3】使用案例

(1)配置类

import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;@Configuration
public class ElasticsearchConfig {@Beanpublic RestHighLevelClient restHighLevelClient() {ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo("localhost:9200").withConnectTimeout(5000).withSocketTimeout(60000).build();return RestClients.create(clientConfiguration).rest();}@Beanpublic ElasticsearchRestTemplate elasticsearchRestTemplate() {return new ElasticsearchRestTemplate(restHighLevelClient());}
}

(2)实体类

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.GeoPointField;
import org.springframework.data.elasticsearch.core.geo.GeoPoint;@Document(indexName = "products")
public class Product {@Idprivate String id;@Field(type = FieldType.Text, analyzer = "ik_max_word")private String name;@Field(type = FieldType.Double)private Double price;@Field(type = FieldType.Keyword)private String category;@Field(type = FieldType.Integer)private Integer stock;@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)private Date createdAt;@GeoPointFieldprivate GeoPoint location;// 构造方法、Getter和Setterpublic Product() {}public Product(String name, Double price, String category, Integer stock) {this.name = name;this.price = price;this.category = category;this.stock = stock;this.createdAt = new Date();}// 省略Getter和Setter...
}

(3)服务层实现

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.*;
import org.springframework.data.elasticsearch.core.geo.GeoPoint;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.stream.Collectors;@Service
public class ProductService {private final ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredpublic ProductService(ElasticsearchRestTemplate elasticsearchRestTemplate) {this.elasticsearchRestTemplate = elasticsearchRestTemplate;}// 1. 索引单个文档public String indexProduct(Product product) {IndexQuery indexQuery = new IndexQueryBuilder().withObject(product).build();return elasticsearchRestTemplate.index(indexQuery, IndexCoordinates.of("products"));}// 2. 批量索引文档public List<String> bulkIndexProducts(List<Product> products) {List<IndexQuery> queries = products.stream().map(product -> new IndexQueryBuilder().withObject(product).build()).collect(Collectors.toList());return elasticsearchRestTemplate.bulkIndex(queries, IndexCoordinates.of("products"));}// 3. 根据ID获取文档public Product getProductById(String id) {return elasticsearchRestTemplate.get(id, Product.class);}// 4. 根据ID删除文档public String deleteProductById(String id) {return elasticsearchRestTemplate.delete(id, Product.class);}// 5. 简单匹配查询public List<Product> searchByName(String name) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchQuery("name", name)).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 6. 分页查询public Page<Product> searchByCategory(String category, int page, int size) {Pageable pageable = PageRequest.of(page, size);Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).withPageable(pageable).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);List<Product> products = hits.stream().map(SearchHit::getContent).collect(Collectors.toList());return new PageImpl<>(products, pageable, hits.getTotalHits());}// 7. 范围查询public List<Product> findByPriceRange(double minPrice, double maxPrice) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice)).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 8. 布尔查询public List<Product> complexSearch(String keyword, String category, double minPrice) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("name", keyword)).must(QueryBuilders.termQuery("category", category)).must(QueryBuilders.rangeQuery("price").gte(minPrice))).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 9. 聚合分析 - 按类别统计数量public Map<String, Long> countByCategory() {Query query = new NativeSearchQueryBuilder().addAggregation(AggregationBuilders.terms("by_category").field("category.keyword")).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);Terms terms = hits.getAggregations().get("by_category");return terms.getBuckets().stream().collect(Collectors.toMap(Terms.Bucket::getKeyAsString,Terms.Bucket::getDocCount));}// 10. 聚合分析 - 计算平均价格public double averagePrice() {Query query = new NativeSearchQueryBuilder().addAggregation(AggregationBuilders.avg("avg_price").field("price")).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);Avg avg = hits.getAggregations().get("avg_price");return avg.getValue();}// 11. 更新文档 - 部分更新public void updateProductStock(String id, int newStock) {Map<String, Object> params = new HashMap<>();params.put("stock", newStock);UpdateQuery updateQuery = UpdateQuery.builder(id).withParams(params).build();elasticsearchRestTemplate.update(updateQuery, IndexCoordinates.of("products"));}// 12. 脚本更新public void increaseProductPrice(String id, double amount) {Map<String, Object> params = new HashMap<>();params.put("amount", amount);UpdateQuery updateQuery = UpdateQuery.builder(id).withScript("ctx._source.price += params.amount").withParams(params).build();elasticsearchRestTemplate.update(updateQuery, IndexCoordinates.of("products"));}// 13. 按查询更新public long increaseStockForCategory(String category, int amount) {UpdateQuery updateQuery = UpdateQuery.builder(QueryBuilders.termQuery("category", category)).withScript("ctx._source.stock += params.amount").withParams(Collections.singletonMap("amount", amount)).build();ByQueryResponse response = elasticsearchRestTemplate.updateByQuery(updateQuery, IndexCoordinates.of("products"));return response.getUpdated();}// 14. 按查询删除public long deleteByCategory(String category) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).build();return elasticsearchRestTemplate.delete(query, Product.class);}// 15. 地理位置查询 - 附近的产品public List<Product> findNearbyProducts(double lat, double lon, double distance) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.geoDistanceQuery("location").point(lat, lon).distance(distance + "km")).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 16. 高亮显示public List<SearchHit<Product>> searchWithHighlight(String keyword) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchQuery("name", keyword)).withHighlightFields(new HighlightBuilder.Field("name").preTags("<em>").postTags("</em>")).build();return elasticsearchRestTemplate.search(query, Product.class).getSearchHits();}// 17. 自动补全public List<String> suggestNames(String prefix) {SuggestBuilder suggestBuilder = new SuggestBuilder().addSuggestion("name-suggest", SuggestBuilders.completionSuggestion("suggest").prefix(prefix).skipDuplicates(true).size(10));SuggestResponse response = elasticsearchRestTemplate.suggest(suggestBuilder, Product.class);return response.getSuggest().getSuggestion("name-suggest").getEntries().get(0).getOptions().stream().map(Suggest.Suggestion.Entry.Option::getText).collect(Collectors.toList());}// 18. 索引管理 - 创建索引public boolean createProductIndex() {return elasticsearchRestTemplate.indexOps(Product.class).create();}// 19. 索引管理 - 更新映射public boolean updateProductMapping() {return elasticsearchRestTemplate.indexOps(Product.class).putMapping();}// 20. 索引管理 - 刷新索引public void refreshProductIndex() {elasticsearchRestTemplate.indexOps(Product.class).refresh();}
}

【4】实践优化

(1)批量操作优化

public void bulkInsert(List<Product> products) {List<IndexQuery> queries = new ArrayList<>();for (Product product : products) {IndexQuery query = new IndexQuery();query.setId(product.getId());query.setObject(product);queries.add(query);}// 分批处理,每批1000条int batchSize = 1000;for (int i = 0; i < queries.size(); i += batchSize) {int end = Math.min(i + batchSize, queries.size());List<IndexQuery> batch = queries.subList(i, end);elasticsearchRestTemplate.bulkIndex(batch, Product.class);}
}

(2)高校分页查询

public List<Product> efficientPagination(String category, int page, int size) {// 使用search_after实现高效分页Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).withSort(SortBuilders.fieldSort("price").order(SortOrder.DESC)).withPageable(PageRequest.of(0, size)) // 第一页.build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);List<Object> searchAfter = hits.getSearchHits().get(hits.getSearchHits().size() - 1).getSortValues();// 获取下一页Query nextPageQuery = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).withSort(SortBuilders.fieldSort("price").order(SortOrder.DESC)).withSearchAfter(searchAfter.toArray()).withPageable(PageRequest.of(0, size)).build();return elasticsearchRestTemplate.search(nextPageQuery, Product.class).stream().map(SearchHit::getContent).collect(Collectors.toList());
}

【四】​Java API Client​使用案例

【1】介绍

Elasticsearch Java API Client 是 Elasticsearch 官方推出的新一代 Java 客户端,具有以下特点:
强类型​:所有请求和响应都是类型安全的
现代化设计​:基于 Elasticsearch DSL 构建
异步支持​:内置异步操作支持
模块化​:按需引入不同功能模块
与 REST API 完全对应​:1:1 映射 REST API

【2】核心功能

(1)主要模块

elasticsearch-java:核心模块
elasticsearch-java-api:API 模块
elasticsearch-transport:传输层
elasticsearch-x-content:JSON 处理

(2)主要类

ElasticsearchClient:主客户端类
*Request:各种操作请求(IndexRequest, SearchRequest 等)
*Response:各种操作响应
Query:查询构建器
Aggregation:聚合构建器

【3】使用案例

(1)maven配置

<dependencies><!-- 核心依赖 --><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.11.4</version></dependency><!-- JSON 处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version></dependency><!-- HTTP 客户端 --><dependency><groupId>org.apache.httpcomponents.client5</groupId><artifactId>httpclient5</artifactId><version>5.2.1</version></dependency>
</dependencies>

(2)配置类

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticsearchConfig {@Beanpublic ElasticsearchClient elasticsearchClient() {// 1. 创建低级客户端RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(httpClientBuilder -> {// 认证配置CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic", "your-password"));return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLHostnameVerifier((hostname, session) -> true); // 开发环境禁用主机名验证}).build();// 2. 创建传输层ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());// 3. 创建API客户端return new ElasticsearchClient(transport);}
}

(3)实体类

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;import java.time.LocalDateTime;
import java.util.List;@Data
public class Product {private String id;private String name;private String description;private double price;private String category;private int stock;private List<String> tags;@JsonProperty("created_at")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createdAt;private Location location;// 嵌套对象public static class Location {private double lat;private double lon;// Getter和Setterpublic double getLat() { return lat; }public void setLat(double lat) { this.lat = lat; }public double getLon() { return lon; }public void setLon(double lon) { this.lon = lon; }}}

(4)服务层实现

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.*;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.search.*;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.util.ObjectBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;@Service
public class ProductService {private static final String INDEX_NAME = "products";private final ElasticsearchClient client;@Autowiredpublic ProductService(ElasticsearchClient client) {this.client = client;}// 1. 创建索引public void createIndex() throws IOException {client.indices().create(c -> c.index(INDEX_NAME).settings(s -> s.numberOfShards("3").numberOfReplicas("1")).mappings(m -> m.properties("name", p -> p.text(t -> t.analyzer("ik_max_word"))).properties("description", p -> p.text(t -> t.analyzer("ik_smart"))).properties("price", p -> p.double_(d -> d)).properties("category", p -> p.keyword(k -> k)).properties("stock", p -> p.integer(i -> i)).properties("tags", p -> p.keyword(k -> k)).properties("created_at", p -> p.date(d -> d.format("yyyy-MM-dd HH:mm:ss"))).properties("location", p -> p.geoPoint(g -> g))));}// 2. 索引单个文档public String indexProduct(Product product) throws IOException {IndexResponse response = client.index(i -> i.index(INDEX_NAME).id(product.getId()).document(product));return response.id();}// 3. 批量索引文档public List<String> bulkIndexProducts(List<Product> products) throws IOException {List<BulkOperation> operations = products.stream().map(product -> BulkOperation.of(op -> op.index(idx -> idx.index(INDEX_NAME).id(product.getId()).document(product)))).collect(Collectors.toList());BulkResponse response = client.bulk(b -> b.index(INDEX_NAME).operations(operations));return response.items().stream().map(BulkResponseItem::id).collect(Collectors.toList());}// 4. 根据ID获取文档public Product getProductById(String id) throws IOException {GetResponse<Product> response = client.get(g -> g.index(INDEX_NAME).id(id),Product.class);if (response.found()) {return response.source();}return null;}// 5. 根据ID删除文档public boolean deleteProductById(String id) throws IOException {DeleteResponse response = client.delete(d -> d.index(INDEX_NAME).id(id));return response.result() == Result.Deleted;}// 6. 简单匹配查询public List<Product> searchByName(String name) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.match(m -> m.field("name").query(name))),Product.class);return extractProducts(response);}// 7. 多字段搜索public List<Product> multiMatchSearch(String query) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.multiMatch(m -> m.fields("name", "description", "tags").query(query))),Product.class);return extractProducts(response);}// 8. 布尔查询public List<Product> complexSearch(String keyword, String category, double minPrice) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.bool(b -> b.must(m -> m.match(t -> t.field("name").query(keyword))).must(m -> m.term(t -> t.field("category").value(category))).must(m -> m.range(r -> r.field("price").gte(JsonData.of(minPrice)))))),Product.class);return extractProducts(response);}// 9. 范围查询public List<Product> findByPriceRange(double min, double max) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.range(r -> r.field("price").gte(JsonData.of(min)).lte(JsonData.of(max)))),Product.class);return extractProducts(response);}// 10. 分页查询public List<Product> findByCategoryPaginated(String category, int page, int size) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.term(t -> t.field("category").value(category))).from(page * size).size(size),Product.class);return extractProducts(response);}// 11. 聚合分析 - 按类别统计数量public Map<String, Long> countByCategory() throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).size(0).aggregations("by_category", a -> a.terms(t -> t.field("category.keyword").size(100))),Product.class);return response.aggregations().get("by_category").sterms().buckets().array().stream().collect(Collectors.toMap(StringTermsBucket::key,StringTermsBucket::docCount));}// 12. 聚合分析 - 计算平均价格public double averagePrice() throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).size(0).aggregations("avg_price", a -> a.avg(av -> av.field("price"))),Product.class);AvgAggregate avg = response.aggregations().get("avg_price").avg();return avg.value();}// 13. 更新文档 - 部分更新public void updateProductStock(String id, int newStock) throws IOException {client.update(u -> u.index(INDEX_NAME).id(id).doc(Map.of("stock", newStock)),Product.class);}// 14. 脚本更新public void increaseProductPrice(String id, double amount) throws IOException {client.update(u -> u.index(INDEX_NAME).id(id).script(s -> s.inline(i -> i.source("ctx._source.price += params.amount").params("amount", JsonData.of(amount)))),Product.class);}// 15. 按查询更新public long increaseStockForCategory(String category, int amount) throws IOException {UpdateByQueryResponse response = client.updateByQuery(u -> u.index(INDEX_NAME).query(q -> q.term(t -> t.field("category").value(category))).script(s -> s.inline(i -> i.source("ctx._source.stock += params.amount").params("amount", JsonData.of(amount)))));return response.updated();}// 16. 按查询删除public long deleteByCategory(String category) throws IOException {DeleteByQueryResponse response = client.deleteByQuery(d -> d.index(INDEX_NAME).query(q -> q.term(t -> t.field("category").value(category))));return response.deleted();}// 17. 地理位置查询 - 附近的产品public List<Product> findNearbyProducts(double lat, double lon, double distance) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.geoDistance(g -> g.field("location").distance(distance + "km").location(l -> l.latlon(ll -> ll.lat(lat).lon(lon))))),Product.class);return extractProducts(response);}// 18. 高亮显示public List<Hit<Product>> searchWithHighlight(String keyword) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.match(m -> m.field("name").query(keyword))).highlight(h -> h.fields("name", f -> f.preTags("<em>").postTags("</em>"))),Product.class);return response.hits().hits();}// 19. 自动补全public List<String> suggestNames(String prefix) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).suggest(su -> su.suggesters("name-suggest", sug -> sug.completion(c -> c.field("suggest").size(10).skipDuplicates(true))).text(prefix)),Product.class);return response.suggest().get("name-suggest").get(0).completion().options().stream().map(CompletionSuggestOption::text).collect(Collectors.toList());}// 20. 批量操作public void bulkOperations(List<Product> toCreate, List<String> toDelete) throws IOException {List<BulkOperation> operations = new ArrayList<>();// 添加创建操作toCreate.forEach(product -> operations.add(BulkOperation.of(op -> op.index(idx -> idx.index(INDEX_NAME).id(product.getId()).document(product)))));// 添加删除操作toDelete.forEach(id -> operations.add(BulkOperation.of(op -> op.delete(d -> d.index(INDEX_NAME).id(id)))));client.bulk(b -> b.index(INDEX_NAME).operations(operations));}// 辅助方法:从响应中提取产品列表private List<Product> extractProducts(SearchResponse<Product> response) {return response.hits().hits().stream().map(Hit::source).filter(Objects::nonNull).collect(Collectors.toList());}
}

【4】高级用法

(1)异步操作

import co.elastic.clients.util.ApiTypeHelper;
import co.elastic.clients.util.BinaryData;public void asyncIndexProduct(Product product) {BinaryData data = BinaryData.of(ApiTypeHelper.jsonBuilder(product).toString(),client._transport().jsonpMapper());client.indexAsync(i -> i.index(INDEX_NAME).id(product.getId()).document(data)).whenComplete((response, exception) -> {if (exception != null) {// 处理异常System.err.println("索引失败: " + exception.getMessage());} else {// 处理成功System.out.println("文档索引成功,ID: " + response.id());}});
}

(2)复杂聚合

public Map<String, Double> avgPriceByCategory() throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).size(0).aggregations("by_category", a -> a.terms(t -> t.field("category.keyword").size(100)).aggregations("avg_price", avg -> avg.avg(av -> av.field("price")))),Product.class);return response.aggregations().get("by_category").sterms().buckets().array().stream().collect(Collectors.toMap(StringTermsBucket::key,bucket -> bucket.aggregations().get("avg_price").avg().value()));
}

(3)自定义json映射

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;// 自定义 JSON 映射器
public JsonpMapper customJsonpMapper() {ObjectMapper objectMapper = new ObjectMapper();objectMapper.registerModule(new JavaTimeModule()); // 支持 Java 8 时间类型return new JacksonJsonpMapper(objectMapper);
}// 在配置中使用自定义映射器
@Bean
public ElasticsearchClient elasticsearchClient() {RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();ElasticsearchTransport transport = new RestClientTransport(restClient,customJsonpMapper() // 使用自定义映射器);return new ElasticsearchClient(transport);
}
http://www.dtcms.com/a/351896.html

相关文章:

  • Sigma规则集网络安全应用(Elasticsearch、es日志安全检查、SOC、自定义规则)
  • Linux修改服务器时区
  • S2B2B系统哪个好,商淘云、数商云、金蝶云苍穹供应链批发哪个比较靠谱
  • 模型微调训练中超长文本训练存在的问题
  • 机器视觉学习-day02-灰度化实验
  • 更新依赖失败,报错
  • 赋能增长:商城分销平台的五大核心模式与适用场景
  • 京东招java开发
  • 解决Ubuntu拉取Docker镜像失败问题。
  • 云计算学习笔记——Linux硬盘、硬盘划分、交换空间、自动挂载篇
  • 淤地坝安全在线监测系统
  • 如何用企业微信AI解决金融运维难题,让故障响应快、客服专业度高
  • Android 中使用开源库 ZXing 生成二维码图片
  • 实训日志day28
  • 人工智能-python-深度学习-参数初始化与损失函数
  • Redis核心机制解析:数据结构、线程模型与内存管理策略
  • Axios多实例封装
  • 产品运营必备职场通用能力及提升攻略,一文说明白
  • 人工智能之数学基础:离散型随机变量的概率分布有哪些?
  • windows下配置lua环境
  • KubeBlocks for Kafka 揭秘
  • 100种交易系统(6)均线MA识别信号与杂音
  • 部署本地模型,使用cherry-studio测试本地模型和云端模型
  • 【最短路问题转换/拓扑排序+dp】P1807 最长路
  • 广度优先遍历-BFS
  • 【跨国数仓迁移最佳实践7】基于MaxCompute多租的大数据平台架构
  • springboot实现合同生成
  • Odoo 企业版用户手册[新版] 前言 00.3-企业版功能模块全景图
  • C语言 指针
  • 消防设施安全员证核心考点:消防设施操作与维护高频知识点汇总