长春网站seo营销策划方案怎么写
es8出了个新的JavaClient,相较于HighLevelClient少了对es源代码的引用,更加的轻便了,这里记录下使用方式。
文档地址:Introduction | Elasticsearch Java API Client [8.17] | Elastic
本地运行的是es 8.6.2版本
JavaClient使用的是8.17
1、首先创建个java springboot项目
源码地址:https://github.com/a66245753/es-8-java-client.git
pom依赖文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.david</groupId><artifactId>es8-java-client</artifactId><version>0.0.1-SNAPSHOT</version><name>es8-java-client</name><description>es8-java-client</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.17.3</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.17.0</version></dependency><!-- Lombok 依赖 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.8</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope><version>8</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
application.yml
数据库需要改为自己的
spring:application:name: es8-java-clientdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/david?charset=utf8mb4&parseTime=True&loc=Localusername: davidpassword: davidmybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
按照官网上声明EsClient 为spring bean
@Configuration
public class EsClientConfig {@Beanpublic ElasticsearchClient elasticsearchClient() {// URL and API keyString serverUrl = "http://localhost:9200";// String apiKey = "VnVhQ2ZHY0JDZGJrU...";// Create the low-level clientRestClient restClient = RestClient.builder(HttpHost.create(serverUrl))
// .setDefaultHeaders(new Header[]{
// new BasicHeader("Authorization", "ApiKey " + apiKey)
// }).build();// Create the transport with a Jackson mapperElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());// And create the API clientreturn new ElasticsearchClient(transport);}
}
2、es client使用
class:com.david.coltroller.EsDocController
2.1 新增数据
有单个新增和批量新增,使用批量时最好把索引的更新频率设置一下
/*** https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing.html** @param add* @return* @throws IOException*/@PostMapping("/add")public ResponseEntity<EsResponseBody> add(@RequestBody EsDocAdd add) throws IOException {try {IndexResponse response = elasticsearchClient.index(i -> i.index(add.getIndex())// 不设置id,默认使用es生成的// .id(product.getSku()).document(add.getDoc()));EsResponseBody responseBody = new EsResponseBody(0, response.id(), null);return new ResponseEntity<>(responseBody, HttpStatus.OK);} catch (ElasticsearchException e) {EsResponseBody responseBody = new EsResponseBody(-1, null, e.error().toString());return new ResponseEntity<>(responseBody, HttpStatus.OK);}}/*** https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing-bulk.html** @param add* @return* @throws IOException*/@PostMapping("/addBatch")public ResponseEntity<EsResponseBody> addBatch(@RequestBody EsDocAddBatch add) throws IOException {BulkRequest.Builder br = new BulkRequest.Builder();for (ObjectNode doc : add.getDocs()) {br.operations(op -> op.index(idx -> idx.index(add.getIndex()).document(doc)));}try {BulkResponse result = elasticsearchClient.bulk(br.build());if (result.errors()) {List<EsDocBucketError> list = new ArrayList<>();int i = 0;for (BulkResponseItem item : result.items()) {if (item.error() != null) {list.add(new EsDocBucketError(item.error().reason(), add.getDocs().get(i)));}i++;}EsResponseBody responseBody = new EsResponseBody(-1, list, "失败");return new ResponseEntity<>(responseBody, HttpStatus.OK);}EsResponseBody responseBody = new EsResponseBody(0, null, null);return new ResponseEntity<>(responseBody, HttpStatus.OK);} catch (ElasticsearchException e) {EsResponseBody responseBody = new EsResponseBody(-1, null, e.error().toString());return new ResponseEntity<>(responseBody, HttpStatus.OK);}}
建议使用模版的方式创建索引,比如我下面的dsl语句规定了索引内字段的类型。
PUT _index_template/ssp_ad_union_log_template
{// 匹配的索引模式"index_patterns": ["ssp_ad_union_log_*"], "template": {"settings": {"number_of_shards": 1, "number_of_replicas": 1,"refresh_interval":"1s" },"mappings": {"dynamic": "strict","properties": {"id": { "type": "long" }, "reqId": { "type": "keyword" }, "device": { "type": "keyword" },"platform": { "type": "byte" },"platformName":{"type":"keyword"},"clientType": { "type": "byte" },"myAppId":{"type":"keyword"},"deviceId":{"type":"keyword"},"adSiteGroupId": { "type": "long" },"adSiteId":{"type":"keyword"},"packagePath":{"type":"keyword"},"ecpm": { "type": "long" },"location":{"type":"geo_point"},"ip":{"type":"ip"},"cityId":{"type":"integer"},"cityName":{"type":"keyword"},"areaId":{"type":"integer"},"areaName":{"type":"keyword"},"provinceId":{"type":"integer"},"provinceName":{"type":"keyword"},"phoneBrand":{"type":"keyword"},"phoneBrandName":{"type":"keyword"},"phoneModel":{"type":"keyword"},"idfa":{"type":"keyword"},"bizType":{"type":"byte"},"sdkVersion":{"type":"keyword"},"network":{"type":"keyword"},"logTime":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"},"createdAt":{"type":"date","format":"yyyy-MM-dd HH:mm:ss"}}},"aliases":{"ssp_ad_union_log":{}}}
}
2.2 根据es的objectId获取数据
/*** 根据es的objectId获取数据* @param index* @param id* @return* @throws IOException*/@GetMapping("/get")public ResponseEntity<EsResponseBody> get(@RequestParam("index") String index, @RequestParam("id") String id) throws IOException {GetResponse<ObjectNode> response = elasticsearchClient.get(g -> g.index(index).id(id),ObjectNode.class);if (response.found()) {EsResponseBody responseBody = new EsResponseBody(0, response.source(), null);return new ResponseEntity<>(responseBody, HttpStatus.OK);}EsResponseBody responseBody = new EsResponseBody(-1, null, "not found");return new ResponseEntity<>(responseBody, HttpStatus.OK);}/*** 根据es的多个objectId获取数据* @param params* @return* @throws IOException*/@PostMapping("/getBatch")public ResponseEntity<EsResponseBody> getBatch(@RequestBody EsGetBatchParams params) throws IOException {MgetResponse<ObjectNode> response = elasticsearchClient.mget(g -> g.index(params.getIndex()).ids(params.getIdList()),ObjectNode.class);List<ObjectNode> list = new ArrayList<>();for (MultiGetResponseItem<ObjectNode> doc : response.docs()) {if (doc.isResult()) {list.add(doc.result().source());}if (doc.isFailure()) {System.out.println("isFailure doc" + doc.toString());}}EsResponseBody responseBody = new EsResponseBody(0, list, null);return new ResponseEntity<>(responseBody, HttpStatus.OK);}
2.3 查询分页
这里写了个通用查询
/*** 查询分页* @param request* @return* @throws IOException*/@PostMapping("/search")public ResponseEntity<EsResponseBody> search(@RequestBody EsSearchRequest request) throws IOException {// 构造查询过滤条件List<Query> list = buildQueryList(request);Query query = null;if (request.getLikeParams() != null && !request.getLikeParams().isEmpty()) {// 使用分值查询,分高的排前面query = new BoolQuery.Builder().must(list).build()._toQuery();} else {// 使用filter不算分查询query = new BoolQuery.Builder().filter(list).build()._toQuery();}// 创建 SearchRequest 并指定索引名称SearchRequest searchRequest = new SearchRequest.Builder().index(request.getIndexList()).query(query).trackTotalHits(tth -> tth.enabled(true)).from((request.getPageIndex() - 1) * request.getPageSize()).size(request.getPageSize()).build();// 打印查询语句,方便排查问题System.out.println("search query: " + searchRequest.toString());// 执行搜索请求, 第二个参数可以换成自己的返回值对象SearchResponse<ObjectNode> response = elasticsearchClient.search(searchRequest, ObjectNode.class);List<ObjectNode> collect = response.hits().hits().stream().map(Hit::source).toList();EsSearchResult result = new EsSearchResult();result.setPageIndex(request.getPageIndex());result.setPageSize(request.getPageSize());result.setTotal(response.hits().total().value());result.setList(collect);EsResponseBody responseBody = new EsResponseBody(0, result, null);return new ResponseEntity<>(responseBody, HttpStatus.OK);}/*** 组装请求参数** @param request* @return*/private List<Query> buildQueryList(EsSearchRequest request) {List<Query> list = new ArrayList<>();if (request.getEqualsParams() != null && !request.getEqualsParams().isEmpty()) {for (Map.Entry<String, Object> entry : request.getEqualsParams().entrySet()) {// 构建 TermQuery 对象Query query = new TermQuery.Builder().field(entry.getKey()).value(entry.getValue().toString()).build()._toQuery();list.add(query);}}if (request.getInParams() != null && !request.getInParams().isEmpty()) {for (EsInParams params : request.getInParams()) {// 构建 TermsQuery 对象Query query = new TermsQuery.Builder().field(params.getField()).terms(t -> t.value(params.getValueList().stream().map(FieldValue::of).toList())).build()._toQuery();list.add(query);}}if (request.getLikeParams() != null && !request.getLikeParams().isEmpty()) {for (Map.Entry<String, String> entry : request.getLikeParams().entrySet()) {// 构建 MatchQuery 对象Query query = new MatchQuery.Builder().field(entry.getKey()).query(entry.getValue()).build()._toQuery();list.add(query);}}if (request.getRangeParams() != null && !request.getRangeParams().isEmpty()) {for (EsRangeParams rangeParam : request.getRangeParams()) {if (rangeParam.getFieldType() == 1) {Query query = new NumberRangeQuery.Builder().field(rangeParam.getField()).gte(Double.valueOf(rangeParam.getGte())).lte(Double.valueOf(rangeParam.getLte())).build()._toRangeQuery()._toQuery();list.add(query);} else {Query query = new DateRangeQuery.Builder().field(rangeParam.getField()).gte(rangeParam.getGte()).lte(rangeParam.getLte()).build()._toRangeQuery()._toQuery();list.add(query);}}}return list;}
2.4 聚合数据(分组统计)
聚合没有太多的通用性,按照自己业务的需求写就可以。
下面是按照了城市维度分组,统计了每个城市下面业务类型数量、网络类型数量、sdk版本数量、平台数量、手机型号数量,以及最后一个是带条件过滤在统计的数据平均值。
@PostMapping("/aggs")public ResponseEntity<EsResponseBody> aggs(@RequestBody EsSearchRequest request) throws IOException {// 构造查询过滤条件List<Query> list = buildQueryList(request);Query query = new BoolQuery.Builder().filter(list).build()._toQuery();// 创建 SearchRequest 并指定索引名称和其他参数SearchRequest searchRequest = new SearchRequest.Builder().index("ssp_ad_union_log").trackTotalHits(tth -> tth.enabled(true)).from(0).size(1).query(query).aggregations("cityName_bucket",// 一级聚合a -> a.terms(t -> t.field("cityName").size(10))// 二级聚合.aggregations("bizType_bucket", aa -> aa.terms(ts -> ts.field("bizType").size(10))).aggregations("network_bucket", aa -> aa.terms(ts -> ts.field("network").size(10))).aggregations("sdkVersion_bucket", aa -> aa.terms(t3 -> t3.field("sdkVersion").size(10))).aggregations("platformName_bucket", aa -> aa.terms(t4 -> t4.field("platformName").size(10))).aggregations("phoneBrandName_bucket", aa -> aa.terms(t5 -> t5.field("phoneBrandName").size(10)))// 带条件二级聚合.aggregations("ecpmAvg_bucket", aa -> aa.filter(f6 -> f6.term(c -> c.field("bizType").value(2))).aggregations("ecpmAvg_bucket", ss -> ss.avg(avg -> avg.field("ecpm"))))).build();// 打印查询语句,方便排查问题System.out.println("search query: " + searchRequest.toString());// 执行搜索请求SearchResponse<ObjectNode> response = elasticsearchClient.search(searchRequest, ObjectNode.class);// 处理结果System.out.println(response.aggregations().toString());List<StringTermsBucket> buckets = response.aggregations().get("cityName_bucket").sterms().buckets().array();EsResponseBody responseBody = new EsResponseBody(0, response.aggregations().toString(), null);return new ResponseEntity<>(responseBody, HttpStatus.OK);}