elasticsearch之记录es7.17升级8.17 springboot2.7.0 程序改造坑
es7.17升级8.x问题目录
- 一、硬件安装
- 1-1. centos7 服务器上,删除elasticsearch7.17,安装es8.17
- 二、 程序改造
- 2-1. Java API Client 8.17.5
- 2-2. 依赖引入
- 2-3. 配置文件
- 2-4. Java 配置类
- 三、根据 Elasticsearch 集群信息(版本 8.17.2), Java 客户端开发的注意事项和完整集成建议:
- 3-1. 环境兼容性验证
- 1. 客户端版本匹配
- 2. 安全认证配置
- 3-2. 核心 API 升级指南
- 2. 搜索请求构建
- 3. 批量操作优化
- 3-3. 关键差异处理(7.x → 8.x)
- 3-4. 安全加固建议
- 1. HTTPS 加密传输
- 2. API 密钥认证
- 3-5. 性能调优参数
- 3-6. 故障排查命令
- 3-7. 迁移验证清单
- 问题一
- 问题二 健康检查的ip:port 不对
- 问题原因分析
- 解决方案
一、硬件安装
1-1. centos7 服务器上,删除elasticsearch7.17,安装es8.17
todo
21.18服务器
二、 程序改造
2-1. Java API Client 8.17.5
-
是 Elasticsearch 官方新一代客户端(专为 Elasticsearch 8.x 设计,支持所有新特性(如角色权限管理、安全增强等)。
-
长期维护:官方明确表示未来所有功能迭代和 bug 修复将集中在 Java API Client,而 RestHighLevelClient 已逐步被弃用
2-2. 依赖引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId><version>2.7.0</version><exclusions><!-- 排除旧版本ES客户端 --><exclusion><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId></exclusion><exclusion><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId></exclusion><exclusion><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId></exclusion></exclusions></dependency><!-- 显式添加 spring-data-elasticsearch 并排除其内部的 rest-client --><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-elasticsearch</artifactId><version>4.4.6</version><exclusions><exclusion><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId></exclusion></exclusions></dependency><!-- 改用新版API Elasticsearch 8.xx Java API Client --><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.17.5</version> <!-- 兼容 Spring Boot 2.7.x 的最新稳定版 --><exclusions><exclusion><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>8.15.3</version> <!-- 配套的 REST 客户端 --></dependency><!-- 作用:实现与 Elasticsearch 集群的 HTTP 通信(基于 Java 11+ 的 HttpClient)。必要性:必须与 elasticsearch-java 搭配使用,否则无法初始化 ElasticsearchClient 实例。--><!-- 必须的 JSON 处理库 --><dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.glassfish</groupId><artifactId>jakarta.json</artifactId><version>2.0.1</version></dependency>
2-3. 配置文件
2-3-1. 主配置 application.yml
# 日志配置
log_path:/data/logs/personnel-manager/backendlogging:level:com.ltkj: debugorg.springframework: warnorg.elasticsearch: infoorg.apache.http: INFOconfig: classpath:logback-spring.xml
2-3-2. 子配置 application-dev.yml
minio:endpoint: http//:192.168.21.3:33306access-key: ltkj.personnelsecret-key: ltkj.personnel.combucketName: wanglurl: ${minio.endpoint}/${minio.bucket}/#elasticsearch 配置:
elasticsearch:address: 192.168.21.3:19200,192.168.21.18:19200 #如果是集群,用逗号隔开connect-timeout: 3000 #连接超时时间socket-timeout: 30000 #连接超时时间connection-request-timeout: 1000max-connect-num: 200max-connect-per-route: 200shards: 2replicas: 1username: elasticpassword: elastic.commanagement:health:elasticsearch:enabled: true # 可选:禁用健康检查
2-4. Java 配置类
/** Copyright (c) 2025. ltkj.com 石家庄文旅投数智科技有限公司 All rights reserved.*/
package com.ltkj.configuration;import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.ltkj.configuration.properties.ElasticSearchProperties;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.net.ssl.SSLContext;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;@Configuration
public class ElasticSearchConfig {/*** Java API Client 8.17.5* 是 Elasticsearch 官方新一代客户端(专为 Elasticsearch 8.x 设计,支持所有新特性(如角色权限管理、安全增强等)。* 长期维护:官方明确表示未来所有功能迭代和 bug 修复将集中在 Java API Client,而 RestHighLevelClient 已逐步被弃用。*/private static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);@Beanpublic RestClient restClient(ElasticSearchProperties elasticProperties) {// 1. 解析地址(支持完整URI格式)List<HttpHost> hostList = parseHosts(elasticProperties.getAddress());logger.info("初始化Elasticsearch 开始>>>>>> 节点列表: {}", hostList); // 关键日志// 2. 构建 RestClientBuilderRestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0])).setFailureListener(new LoggingFailureListener());// 3. 配置超时参数(新版API推荐方式)configureTimeouts(builder, elasticProperties);// 4. 配置连接池和认证configureConnections(builder, elasticProperties);return builder.build();}@Bean(name = "elasticsearchClient")@ConditionalOnMissingBeanpublic ElasticsearchClient client(RestClient restClient) {// 依赖于前面的 1、2、3、4 步骤// 5. 创建 Transport 和 ClientElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());logger.info("ElasticsearchClient initializing ......");ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport);logger.info("ElasticsearchClient initialized with nodes: {}", restClient.getNodes());return elasticsearchClient;}/*** 解析地址(支持带协议的完整格式)* 输入示例:http://192.168.21.3:19200,https://192.168.21.18:19200*/private List<HttpHost> parseHosts(String addressStr) {return Arrays.stream(addressStr.split(",")).map(addr -> {try {// 自动补全协议头if (!addr.startsWith("http://") && !addr.startsWith("https://")) {addr = "http://" + addr; // 默认HTTP}URI uri = URI.create(addr);return new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());} catch (IllegalArgumentException e) {throw new IllegalArgumentException("Invalid Elasticsearch address format: " + addr);}}).collect(Collectors.toList());}/*** 配置超时参数(适配新API)*/private void configureTimeouts(RestClientBuilder builder, ElasticSearchProperties props) {builder.setRequestConfigCallback(requestConfigBuilder ->requestConfigBuilder.setConnectTimeout(props.getConnectTimeOut()).setSocketTimeout(props.getSocketTimeOut()).setConnectionRequestTimeout(props.getConnectionRequestTimeOut()));}/*** 配置连接池和认证(同步客户端)*/private void configureConnections(RestClientBuilder builder, ElasticSearchProperties props) {final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(props.getUsername(), props.getPassword()));// 使用异步客户端配置builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {// 连接池配置httpClientBuilder.setMaxConnTotal(props.getMaxConnectNum()).setMaxConnPerRoute(props.getMaxConnectPerRoute()).setDefaultCredentialsProvider(credentialsProvider);// 启用HTTPS时配置SSLif ("https".equalsIgnoreCase(props.getScheme())) {configureSSL(httpClientBuilder);}return httpClientBuilder;}});}/*** 配置SSL(异步客户端)*/private void configureSSL(HttpAsyncClientBuilder httpClientBuilder) {try {SSLContext sslContext = SSLContexts.custom().loadTrustMaterial((chain, authType) -> true) // 信任所有证书(生产环境需替换).build();httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);} catch (Exception e) {throw new RuntimeException("SSL配置失败", e);}}/*** 节点故障监听器(记录黑名单事件)*/private static class LoggingFailureListener extends RestClient.FailureListener {@Overridepublic void onFailure(Node node) {logger.warn("Node [{}] added to blacklist", node.getHost());}}}
三、根据 Elasticsearch 集群信息(版本 8.17.2), Java 客户端开发的注意事项和完整集成建议:
3-1. 环境兼容性验证
1. 客户端版本匹配
确保使用的 Java API Client 版本与 Elasticsearch 服务端版本一致:
<!-- pom.xml -->
<dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.17.2</version> <!-- 必须与ES服务端版本一致 -->
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.17.1</version> <!-- 推荐使用官方认证版本 -->
</dependency>
2. 安全认证配置
由于你的集群启用了基础认证(-u elastic:elastic.com),客户端需配置安全凭证:
RestClient restClient = RestClient.builder(new HttpHost("192.168.21.18", 19200)).setDefaultHeaders(new Header[]{new BasicHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString("elastic:elastic.com".getBytes()))}).build();ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
3-2. 核心 API 升级指南
java
// Index 文档(自动序列化POJO)
IndexResponse response = client.index(i -> i.index("products").id(product.getId()).document(product)
);// 根据ID查询
Product product = client.get(g -> g.index("products").id("123"),Product.class
).source();// 删除文档
DeleteResponse response = client.delete(d -> d.index("products").id("123")
);
2. 搜索请求构建
SearchResponse<Product> response = client.search(s -> s.index("products").query(q -> q.bool(b -> b.must(m -> m.match(t -> t.field("name").query("手机"))).filter(f -> f.range(r -> r.field("price").gte(JsonData.of(1000))))).highlight(h -> h.fields("name", f -> f.preTags("<em>").postTags("</em>"))).from(0).size(10),Product.class
);
3. 批量操作优化
BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
products.forEach(p -> bulkBuilder.operations(op -> op.index(idx -> idx.index("products").id(p.getId()).document(p)))
);BulkResponse response = client.bulk(bulkBuilder.build());
response.items().forEach(item -> {if (item.error() != null) {log.error("操作失败: {}", item.error().reason());}
});
3-3. 关键差异处理(7.x → 8.x)
功能点 | 7.x 实现方式 | 8.x 正确实现方式 |
---|---|---|
查询构建 | QueryBuilders.matchQuery() | Query.of(q -> q.match(m -> m.field(...))) |
分页参数 | .setFrom() / .setSize() | .from() / .size() 链式调用 |
高亮配置 | HighlightBuilder | .highlight(h -> h.fields(...)) |
聚合分析 | AggregationBuilders | .aggregations("agg_name", a -> a.terms(...))) |
异步操作 | client.prepareIndex().execute() | client.indexAsync(...) + CompletableFuture |
3-4. 安全加固建议
1. HTTPS 加密传输
RestClient.builder(new HttpHost("192.168.21.18", 19200, "https")).setHttpClientConfigCallback(hc -> hc.setSSLContext(sslContext) // 配置SSL上下文.setDefaultCredentialsProvider(credentialsProvider));
2. API 密钥认证
Header apiKeyHeader = new BasicHeader("Authorization", "ApiKey " + Base64.getEncoder().encodeToString("api_key_id:api_key_secret".getBytes()));RestClient restClient = RestClient.builder(...).setDefaultHeaders(new Header[]{apiKeyHeader}).build();
3-5. 性能调优参数
// 查询超时设置
SearchRequest request = SearchRequest.of(s -> s.timeout(t -> t.time("30s"))
);// 批量操作优化
BulkRequest bulkRequest = BulkRequest.of(b -> b.refresh(Refresh.WaitFor) // 刷新策略.requireAlias(true) // 强制通过别名写入.timeout(t -> t.time("1m"))
);
3-6. 故障排查命令
- 集群健康检查
curl -u elastic:elastic.com http://192.168.21.18:19200/_cluster/health?pretty
- 索引状态分析
curl -u elastic:elastic.com http://192.168.21.18:19200/_cat/indices?v
- 慢查询日志
# elasticsearch.yml
index.search.slowlog.threshold.query.warn: 10s
index.search.slowlog.threshold.fetch.debug: 500ms
3-7. 迁移验证清单
- 客户端版本与ES服务端版本严格一致(8.17.2)
- 所有 TransportClient 代码已替换为 Java API Client
- 查询DSL已升级到类型安全构建器
- 安全认证(Basic Auth/API Key)已正确配置
- 日志系统已适配新响应格式(如错误日志记录 item.error().reason())
- 性能测试通过(对比7.x版本QPS、Latency等指标)
问题一
问题二 健康检查的ip:port 不对
上面 2-3 中,我的配置文件为:
elasticsearch:address: 192.168.21.3:19200,192.168.21.18:19200 #如果是集群,用逗号隔开connect-timeout: 3000 #连接超时时间socket-timeout: 30000 #连接超时时间connection-request-timeout: 1000max-connect-num: 200max-connect-per-route: 200shards: 2replicas: 1username: elasticpassword: elastic.com
为什么会报错如下:
2025-05-15 17:40:45.968 [RMI TCP Connection(2)-192.168.68.19] DEBUG org.elasticsearch.client.RestClient - added [[host=http://localhost:9200]] to blacklist
java.net.ConnectException: Connection refused: no further informationat java.base/sun.nio.ch.Net.pollConnect(Native Method)at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)at java.base/java.lang.Thread.run(Thread.java:833)
2025-05-19 09:40:56.212 [elasticsearch-rest-client-1-thread-1] DEBUG o.a.h.i.n.c.InternalHttpAsyncClient - [exchange: 1] connection request failed
2025-05-19 09:40:56.214 [RMI TCP Connection(2)-192.168.68.19] DEBUG org.elasticsearch.client.RestClient - request [GET http://localhost:9200/_cluster/health/] failed
java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused: no further informationat org.apache.http.concurrent.BasicFuture.getResult(BasicFuture.java:71)at org.apache.http.concurrent.BasicFuture.get(BasicFuture.java:84)at org.apache.http.impl.nio.client.FutureWrapper.get(FutureWrapper.java:70)at org.elasticsearch.client.RestClient.performRequest(RestClient.java:300)at org.elasticsearch.client.RestClient.performRequest(RestClient.java:292)at org.springframework.boot.actuate.elasticsearch.ElasticsearchRestClientHealthIndicator.doHealthCheck(ElasticsearchRestClientHealthIndicator.java:60)at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:77)at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:40)at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:130)at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateContribution(HealthEndpointSupport.java:141)at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:126)at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:95)at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:66)at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:71)at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:61)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:568)at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74)at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:122)at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:97)at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:814)at java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:802)at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1472)at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1310)at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1405)at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)at java.base/jdk.internal.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:568)at java.rmi/sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:360)at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:200)at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:197)at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)at java.rmi/sun.rmi.transport.Transport.serviceCall(Transport.java:196)at java.rmi/sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:587)at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:828)at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:705)at java.base/java.security.AccessController.doPrivileged(AccessController.java:399)at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:704)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.ConnectException: Connection refused: no further information
问题原因分析
从错误日志可以看出,健康检查仍然尝试连接 localhost:9200,而非您配置的 192.168.21.3:19200。这是因为 Spring Boot Actuator 的 Elasticsearch 健康检查默认使用独立的客户端配置,而非您自定义的 ElasticsearchClient。以下是完整解决方案:
解决方案
- 禁用默认健康检查(可选)
在 application.yml 中关闭 Elasticsearch 默认健康检查:
management:health:elasticsearch:enabled: false
- 自定义健康检查(推荐)
创建自定义健康检查配置类,强制使用您的客户端配置:
CustomElasticsearchHealthIndicator.java
@Component
public class CustomElasticsearchHealthIndicator extends AbstractHealthIndicator {private final ElasticsearchClient client;public CustomElasticsearchHealthIndicator(ElasticsearchClient client) {this.client = client;}@Overrideprotected void doHealthCheck(Health.Builder builder) throws Exception {try {InfoResponse info = client.info();builder.up().withDetail("cluster_name", info.clusterName()).withDetail("version", info.version().number());} catch (IOException e) {builder.down(e);}}
}