ES集群搭建及工具类
文章说明
本文主要记录Windows下搭建ES集群的过程,并提供了一个通用的ES工具类;工具类采用http接口调用es功能,目前仅提供了简单的查询功能,可在基础上额外扩展
集群搭建
ES的下载安装非常简单,只需要下载软件的 zip 压缩包,然后解压即用;本文演示采用的是 ES9.0 版本
下载地址:ES下载
下载之后默认是开启ssl的,可以关掉,然后直接启动即可获得单机版
ES常用接口
创建索引
URL:http://localhost:9201/log_2025.04.29
请求类型:PUT
请求体
{"mappings": {"properties": {"@timestamp": { "type": "date","format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ||yyyy-MM-dd HH:mm:ss.SSS||strict_date_optional_time||epoch_millis"},"timestamp": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||strict_date_optional_time||epoch_millis"},"startTime": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||strict_date_optional_time||epoch_millis"},"endTime": {"type": "date","format": "yyyy-MM-dd HH:mm:ss.SSS||strict_date_optional_time||epoch_millis"},"message": { "type": "text" },"level": { "type": "keyword" },"serviceName": { "type": "keyword" },"appName": { "type": "keyword" },"eventId": { "type": "keyword" },"globalId": { "type": "keyword" },"elapsed": { "type": "long" },"currentIP": { "type": "keyword" },"accessIp": { "type": "keyword" },"nodeName": { "type": "keyword" },"path": { "type": "keyword" },"reqParam": { "type": "keyword" },"resParam": { "type": "keyword" },"parentEventId": { "type": "keyword" },"PtxId": { "type": "keyword" },"source": { "type": "keyword" },"offset": { "type": "long" },"beat": {"properties": {"hostname": { "type": "keyword" },"name": { "type": "keyword" },"version": { "type": "keyword" }}},"host": {"properties": {"name": { "type": "keyword" }}},"log": {"properties": {"file": {"properties": {"path": { "type": "keyword" }}}}}}}
}
查询索引
URL:http://localhost:9200/log_2025.04.23
请求类型:GET
删除索引
URL:http://localhost:9200/log_2025.04.23
请求类型:DELETE
添加数据
URL:http://localhost:9200/_bulk
请求类型:POST
请求体(需要末尾多一行)
{"index":{"_index":"log_2025.04.29"}}
{"这里放请求数据"}
查询数据
URL:http://localhost:9200/log_2025.04.29/_search
请求类型:POST
请求体
{"size": 20,"query": {"bool": {"must": [{"match": {"appName": "a"}},{"match": {"eventId": "1"}},{"range": {"timestamp": {"format": "yyyy-MM-dd HH:mm:ss.SSS","gte": "2025-04-27 15:04:18.104","lte": "2025-04-27 15:09:18.104"}}}]}},"sort": [{"timestamp": {"order": "asc"}}]
}
ES的集群搭建
ES的使用其实并不复杂,ES的集群搭建相对也比较简单;额外说明,ES集群会进行数据同步,集群搭建完成后访问集群中任一存活节点可以正常获取到所有节点的数据
配置文件
这里以搭建3个节点的集群为例
node-1
# Cluster name
cluster.name: my-cluster# Node name
node.name: node-1 # 修改为 node-2 或 node-3 在其他节点上# Paths
path.data: /path/to/data/node-1
path.logs: /path/to/logs/node-1# Network
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300# Discovery
discovery.seed_hosts: ["127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302"]cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]# Security (可选)
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false
xpack.security.http.ssl.enabled: false
node-2
# Cluster name
cluster.name: my-cluster# Node name
node.name: node-2 # 修改为 node-2 或 node-3 在其他节点上# Paths
path.data: /path/to/data/node-2
path.logs: /path/to/logs/node-2# Network
network.host: 0.0.0.0
http.port: 9201
transport.port: 9301# Discovery
discovery.seed_hosts: ["127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302"]cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]# Security (可选)
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false
xpack.security.http.ssl.enabled: false
node-3
# Cluster name
cluster.name: my-cluster# Node name
node.name: node-3 # 修改为 node-2 或 node-3 在其他节点上# Paths
path.data: /path/to/data/node-3
path.logs: /path/to/logs/node-3# Network
network.host: 0.0.0.0
http.port: 9202
transport.port: 9302# Discovery
discovery.seed_hosts: ["127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302"]cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]# Security (可选)
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false
xpack.security.http.ssl.enabled: false
集群配置完成后逐个启动即可;这里我采用的是将压缩包解压三份,然后分别放在不同的目录下
启动完成后,集群搭建成功,此时可以正常访问
工具类
ESConfig.java 配置类,支持配置多个ip:port,采用英文逗号分隔
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;import java.util.ArrayList;
import java.util.List;@Component
@RefreshScope
@Data
public class ESConfig {@Value("${elasticsearch.host:127.0.0.1:9200}")private String hosts;@Value("${elasticsearch.scheme:http}")private String scheme;@Value("${elasticsearch.username:elastic}")private String username;@Value("${elasticsearch.password:}")private String password;public List<String> getHostList() {List<String> hostList = new ArrayList<>();if (hosts != null && !hosts.isEmpty()) {String[] hostArray = hosts.split(",");for (String host : hostArray) {String trimmedHost = host.trim();if (!trimmedHost.isEmpty()) {hostList.add(trimmedHost);}}}return hostList;}public String getScheme() {return scheme;}public String getUsername() {return username;}public String getPassword() {return password;}
}
ESHttpUtil.java 查询工具类
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpEntity;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Component
public class ESHttpUtil {private static final ObjectMapper MAPPER = new ObjectMapper();// 添加新的查询方法public ESResponse queryLogsAsObject(String indexName, String queryJson) throws IOException {String result = queryLogs(indexName, queryJson);return MAPPER.readValue(result, ESResponse.class);}@Resourceprivate ESConfig esConfig;public String queryLogs(String indexName, String queryJson) throws IOException {List<String> hostList = esConfig.getHostList();if (hostList == null || hostList.isEmpty()) {throw new RuntimeException("当前未配置ES服务节点.");}for (String host : hostList) {try {return executeQuery(host, indexName, queryJson);} catch (IOException e) {// 如果当前节点不可用,记录日志并继续尝试下一个节点System.err.println("无法连接到ES节点: " + host + ". 正在尝试下一节点...");}}// 如果所有节点都不可用,抛出异常throw new IOException("所有节点均无法连接.");}private String executeQuery(String host, String indexName, String queryJson) throws IOException {String[] split = host.split(":");if (split.length != 2) {throw new IOException("ES服务节点host配置异常: " + host);}String url = String.format("%s://%s:%d/%s/_search",esConfig.getScheme(),split[0],Integer.parseInt(split[1]),indexName);CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));try (CloseableHttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(credentialsProvider).build()) {HttpPost httpPost = new HttpPost(url);httpPost.setEntity(new StringEntity(queryJson, ContentType.APPLICATION_JSON));try (CloseableHttpResponse response = httpClient.execute(httpPost)) {HttpEntity entity = response.getEntity();if (entity != null) {return EntityUtils.toString(entity);} else {throw new IOException("ES节点服务异常: " + host);}}}}public static class ESQueryBuilder {private final List<Map<String, Object>> mustConditions = new ArrayList<>();private final List<Map<String, Object>> shouldConditions = new ArrayList<>();private final List<Map<String, Object>> mustNotConditions = new ArrayList<>();private Integer from;private Integer size;private List<Map<String, Object>> sort;public ESQueryBuilder matchEqual(String field, Object value) {if (value != null) {Map<String, Object> matchQuery = new HashMap<>();Map<String, Object> match = new HashMap<>();match.put(field, value);matchQuery.put("match", match);mustConditions.add(matchQuery);}return this;}public ESQueryBuilder dateRange(String field, String startTime, String endTime) {if (startTime != null || endTime != null) {Map<String, Object> rangeQuery = new HashMap<>();Map<String, Object> range = new HashMap<>();Map<String, Object> timestamp = new HashMap<>();timestamp.put("format", "yyyy-MM-dd HH:mm:ss.SSS");if (startTime != null) {timestamp.put("gte", startTime);}if (endTime != null) {timestamp.put("lte", endTime);}range.put(field, timestamp);rangeQuery.put("range", range);mustConditions.add(rangeQuery);}return this;}public ESQueryBuilder from(int from) {this.from = from;return this;}public ESQueryBuilder size(int size) {this.size = size;return this;}public ESQueryBuilder addSort(String field, String order) {if (sort == null) {sort = new ArrayList<>();}Map<String, Object> sortItem = new HashMap<>();Map<String, Object> sortField = new HashMap<>();sortField.put("order", order.toLowerCase());sortItem.put(field, sortField);sort.add(sortItem);return this;}public ESQueryBuilder should(Map<String, Object> condition) {if (condition != null) {shouldConditions.add(condition);}return this;}public String build() {Map<String, Object> root = new HashMap<>();Map<String, Object> query = new HashMap<>();Map<String, Object> bool = new HashMap<>();if (!mustConditions.isEmpty()) {bool.put("must", mustConditions);}if (!shouldConditions.isEmpty()) {bool.put("should", shouldConditions);}if (!mustNotConditions.isEmpty()) {bool.put("must_not", mustNotConditions);}query.put("bool", bool);root.put("query", query);if (size != null) {root.put("size", size);}if (sort != null && !sort.isEmpty()) {root.put("sort", sort);}try {return MAPPER.writeValueAsString(root);} catch (Exception e) {throw new RuntimeException("构建查询JSON失败", e);}}}
}
ESResponse.java 响应实体
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;import java.util.List;@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class ESResponse {private int took;@JsonProperty("timed_out")private boolean timedOut;@JsonProperty("_shards")private Shards shards;private Hits hits;@Data@JsonIgnoreProperties(ignoreUnknown = true)public static class Shards {private int total;private int successful;private int skipped;private int failed;}@Data@JsonIgnoreProperties(ignoreUnknown = true)public static class Hits {private Total total;@JsonProperty("max_score")private String maxScore;private List<Hit> hits;}@Data@JsonIgnoreProperties(ignoreUnknown = true)public static class Total {private int value;private String relation;}@Data@JsonIgnoreProperties(ignoreUnknown = true)public static class Hit {@JsonProperty("_index")private String index;@JsonProperty("_id")private String id;@JsonProperty("_score")private String score;@JsonProperty("_source")private LogData source;}@Data@JsonIgnoreProperties(ignoreUnknown = true)public static class LogData {private String timestamp;private String path;private String parentEventId;private String eventId;private String serviceName;private String appName;private String nodeName;private String startTime;private String endTime;private String elapsed;private String reqParam;private String resParam;private String globalId;private String level;private String message;}
}