SpringBoot 集成 ELK 实现系统操作日志存储方案
一、方案概述
ELK 是 Elasticsearch(ES)、Logstash、Kibana 三者的组合,是日志收集、存储、分析、可视化的经典解决方案。在 SpringBoot 项目中,通过以下流程实现操作日志管理:
- 日志产生:SpringBoot 项目通过 AOP 拦截系统操作(如接口调用、按钮点击),生成标准化操作日志;
- 日志传输:利用 Logback/Log4j2 将日志输出到文件,或直接通过 TCP/UDP 发送给 Logstash;
- 日志处理:Logstash 接收日志后,进行过滤、格式化(如解析 JSON、提取关键字段);
- 日志存储:Logstash 将处理后的日志写入 Elasticsearch,利用 ES 的全文检索和高吞吐特性实现日志高效存储;
- 日志可视化:通过 Kibana 连接 ES,创建索引模式、仪表盘,实现日志的查询、筛选、统计和可视化展示。
该方案适用于高并发系统的日志管理,支持海量日志的快速检索和分析,同时降低 SpringBoot 项目与 ES 的直接耦合(通过 Logstash 作为中间层)。
二、环境准备
1. 组件版本选型(兼容性优先)
组件 | 推荐版本 | 说明 |
---|---|---|
SpringBoot | 2.7.x / 3.1.x | 与 ES 客户端(Spring Data Elasticsearch)兼容性良好 |
Elasticsearch | 7.17.x | 稳定版,支持高并发写入,与 Logstash、Kibana 7.x 完全兼容 |
Logstash | 7.17.x | 与 ES 版本一致,避免数据格式兼容问题 |
Kibana | 7.17.x | 与 ES 版本一致,可视化界面适配性最佳 |
JDK | 11+ | ES 7.x 要求 JDK 11+,SpringBoot 3.x 需 JDK 17+ |
2. 组件部署(以 Docker 快速部署为例)
(1)创建 Docker Compose 配置文件(docker-compose.yml)
version: '3.8'
services:# Elasticsearch 服务elasticsearch:image: elasticsearch:7.17.10container_name: esenvironment:- discovery.type=single-node # 单节点模式(生产环境需集群)- ES_JAVA_OPTS=-Xms1g -Xmx1g # JVM 内存配置(根据服务器调整)- xpack.security.enabled=false # 关闭安全认证(测试环境,生产需开启)ports:- "9200:9200" # ES HTTP 端口(用于 API 访问)- "9300:9300" # ES TCP 端口(用于集群通信)volumes:- es-data:/usr/share/elasticsearch/data # 数据持久化restart: always# Logstash 服务logstash:image: logstash:7.17.10container_name: logstashdepends_on:- elasticsearch # 依赖 ES 启动environment:- LS_JAVA_OPTS=-Xms512m -Xmx512mvolumes:- ./logstash/pipeline:/usr/share/logstash/pipeline # 挂载配置文件- ./logstash/logs:/usr/share/logstash/logs # 日志持久化ports:- "5044:5044" # Filebeat 采集端口- "9600:9600" # Logstash 监控端口- "5000:5000" # TCP 输入端口(接收 SpringBoot 日志)restart: always# Kibana 服务kibana:image: kibana:7.17.10container_name: kibanadepends_on:- elasticsearchenvironment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200 # 连接 ES 地址ports:- "5601:5601" # Kibana 可视化端口restart: alwaysvolumes:es-data: # ES 数据卷
(2)配置 Logstash 管道(./logstash/pipeline/logstash.conf)
Logstash 核心配置:定义输入(接收 SpringBoot 日志)、过滤(格式化日志)、输出(写入 ES)。
# 输入配置:接收 SpringBoot 发送的 TCP 日志(JSON 格式)
input {tcp {port => 5000 # 对应 SpringBoot 日志输出的端口codec => json_lines # 日志格式为 JSON 行}# 可选:同时接收文件日志(如 SpringBoot 本地日志文件)file {path => "/usr/share/logstash/logs/spring-boot-*.log" # 挂载的 SpringBoot 日志路径start_position => "beginning"sincedb_path => "/dev/null" # 每次重启重新读取所有日志(测试用)}
}# 过滤配置:解析日志字段,格式化数据(可选但推荐)
filter {json {source => "message" # 若日志外层有 message 字段,解析其内部 JSON(根据实际日志格式调整)remove_field => "message" # 解析后删除原始 message 字段}# 提取关键字段(如操作人、模块、操作类型等)mutate {rename => {"operateUser" => "operate_user" # 下划线命名规范(ES 推荐)"operateModule" => "operate_module""operateType" => "operate_type""operateTime" => "operate_time"}convert => {"responseTime" => "integer" # 将响应时间转为整数类型"status" => "integer" # 操作状态(成功/失败)转为整数}}# 时间格式化(使用日志中的操作时间作为 ES 文档时间戳)date {match => ["operateTime", "yyyy-MM-dd HH:mm:ss.SSS"] # 匹配日志中的时间格式target => "@timestamp" # 覆盖 ES 默认的 @timestamp 字段timezone => "Asia/Shanghai" # 时区}
}# 输出配置:将处理后的日志写入 Elasticsearch
output {elasticsearch {hosts => ["http://elasticsearch:9200"] # ES 地址(Docker 内部服务名)index => "spring-boot-operation-logs-%{+YYYY.MM.dd}" # 按日期创建索引(如 2024.05.20)document_type => "_doc" # ES 7.x 仅支持单类型 _doc}# 可选:输出到控制台(测试用)stdout {codec => rubydebug}
}
(3)启动 ELK 集群
在 docker-compose.yml
所在目录执行命令:
# 启动所有服务
docker-compose up -d# 查看启动状态
docker-compose ps# 查看 Logstash 日志(验证配置是否生效)
docker logs -f logstash
(4)验证组件可用性
- ES 验证:访问
http://localhost:9200
,返回如下信息说明启动成功:{"name" : "es","cluster_name" : "docker-cluster","version" : { "number" : "7.17.10" },"tagline" : "You Know, for Search" }
- Kibana 验证:访问
http://localhost:5601
,进入 Kibana 控制台(首次启动需等待几分钟初始化)。
三、SpringBoot 项目改造
1. 核心依赖(pom.xml)
引入日志框架(Logback)、AOP(拦截操作)、ES 客户端(可选,用于手动操作 ES):
<!-- SpringBoot 核心依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><!-- AOP 依赖:用于拦截系统操作生成日志 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId>
</dependency><!-- Logback 依赖:日志输出到 Logstash -->
<dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId>
</dependency>
<dependency><groupId>net.logstash.logback</groupId><artifactId>logstash-logback-encoder</artifactId><version>7.0.1</version> <!-- 与 Logback 兼容 -->
</dependency><!-- 可选:Spring Data Elasticsearch(用于手动操作 ES,如删除日志) -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency><!-- Lombok:简化实体类代码 -->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
2. 定义操作日志实体类
标准化日志字段,确保与 Logstash 过滤规则一致:
import lombok.Data;
import java.time.LocalDateTime;/*** 系统操作日志实体*/
@Data
public class OperationLog {// 日志唯一标识(可选,ES 会自动生成 _id)private String id;// 操作人账号/姓名private String operateUser;// 操作模块(如:用户管理、订单管理)private String operateModule;// 操作类型(如:新增、修改、删除、查询)private String operateType;// 操作IP地址private String operateIp;// 操作URL(接口地址)private String operateUrl;// 操作参数(JSON格式)private String operateParams;// 操作结果(成功/失败)private Integer status; // 1-成功,0-失败// 响应信息(如失败原因)private String responseMsg;// 操作耗时(毫秒)private Long responseTime;// 操作时间private String operateTime; // 格式:yyyy-MM-dd HH:mm:ss.SSS// 操作人所属部门(可选)private String deptName;
}
3. 用 AOP 拦截操作生成日志
通过自定义注解 @OperationLogAnnotation
标记需要记录日志的方法,AOP 切面拦截方法执行,生成日志并输出到 Logstash。
(1)自定义日志注解
import java.lang.annotation.*;/*** 操作日志注解(标记需要记录日志的方法)*/
@Target({ElementType.METHOD}) // 仅作用于方法
@Retention(RetentionPolicy.RUNTIME) // 运行时生效
@Documented
public @interface OperationLogAnnotation {// 操作模块(如:用户管理)String module() default "";// 操作类型(如:新增)String type() default "";
}
(2)AOP 切面实现
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;/*** 操作日志 AOP 切面*/
@Aspect
@Component
@Slf4j
public class OperationLogAspect {// 定义切入点:拦截所有带 @OperationLogAnnotation 注解的方法@Pointcut("@annotation(com.example.demo.annotation.OperationLogAnnotation)")public void operationLogPointcut() {}// 环绕通知:在方法执行前后记录日志@Around("operationLogPointcut()")public Object around(ProceedingJoinPoint joinPoint) throws Throwable {// 1. 初始化日志对象OperationLog logVo = new OperationLog();// 2. 获取请求上下文ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = attributes.getRequest();// 3. 填充日志基础信息(请求相关)logVo.setOperateIp(getIpAddress(request)); // 获取IPlogVo.setOperateUrl(request.getRequestURI()); // 获取URLlogVo.setOperateParams(getRequestParams(joinPoint)); // 获取请求参数logVo.setOperateUser(getCurrentUser()); // 获取当前操作人(需结合权限框架,如Spring Security)logVo.setDeptName(getCurrentDept()); // 获取操作人部门(可选)// 4. 从注解中获取模块和操作类型MethodSignature signature = (MethodSignature) joinPoint.getSignature();Method method = signature.getMethod();OperationLogAnnotation annotation = method.getAnnotation(OperationLogAnnotation.class);logVo.setOperateModule(annotation.module());logVo.setOperateType(annotation.type());// 5. 记录方法执行时间和结果long startTime = System.currentTimeMillis(); // 开始时间Object result;try {// 执行目标方法result = joinPoint.proceed();// 方法执行成功logVo.setStatus(1);logVo.setResponseMsg("操作成功");} catch (Exception e) {// 方法执行失败logVo.setStatus(0);logVo.setResponseMsg("操作失败:" + e.getMessage());throw e; // 继续抛出异常,不影响业务} finally {// 计算耗时long responseTime = System.currentTimeMillis() - startTime;logVo.setResponseTime(responseTime);// 设置操作时间(格式与 Logstash 配置一致)logVo.setOperateTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));// 6. 输出日志(Logback 会将日志发送到 Logstash)log.info("operation_log: {}", logVo);}return result;}// ------------------- 工具方法 -------------------/*** 获取请求IP地址*/private String getIpAddress(HttpServletRequest request) {String ip = request.getHeader("x-forwarded-for");if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("Proxy-Client-IP");}if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("WL-Proxy-Client-IP");}if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {ip = request.getRemoteAddr();}return ip;}/*** 获取请求参数(JSON格式)*/private String getRequestParams(ProceedingJoinPoint joinPoint) {Object[] args = joinPoint.getArgs();try {return new com.alibaba.fastjson.JSONObject().toJSONString(args); // 需引入 fastjson 依赖} catch (Exception e) {return "参数解析失败";}}/*** 获取当前操作人(示例:实际需从 Session/Spring Security 中获取)*/private String getCurrentUser() {// 伪代码:String username = SecurityContextHolder.getContext().getAuthentication().getName();return "admin";}/*** 获取当前操作人部门(示例)*/private String getCurrentDept() {return "技术部";}
}
4. 配置 Logback 输出日志到 Logstash
在 src/main/resources
下创建 logback-spring.xml
,配置日志输出格式和目标(TCP 发送到 Logstash):
<?xml version="1.0" encoding="UTF-8"?>
<configuration><include resource="org/springframework/boot/logging/logback/defaults.xml"/><include resource="org/springframework/boot/logging/logback/console-appender.xml"/><!-- 配置 Logstash 输出 --><appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender"><!-- Logstash TCP 输入端口(对应 logstash.conf 中的 tcp.port=5000) --><destination>localhost:5000</destination><!-- 日志编码:JSON 格式 --><encoder class="net.logstash.logback.encoder.LogstashEncoder"><!-- 自定义字段:区分应用(多应用时用于筛选) --><customFields>{"appName":"spring-boot-elk-demo"}</customFields><!-- 忽略默认字段,仅保留自定义日志内容 --><includeMdcKeyName>false</includeMdcKeyName><includeCallerData>false</includeCallerData></encoder></appender><!-- 根日志配置 --><root level="INFO"><appender-ref ref="CONSOLE"/> <!-- 输出到控制台(测试用) --><appender-ref ref="LOGSTASH"/> <!-- 输出到 Logstash --></root><!-- 自定义日志器:仅记录操作日志(可选,避免其他日志干扰) --><logger name="com.example.demo.aspect.OperationLogAspect" level="INFO" additivity="false"><appender-ref ref="LOGSTASH"/></logger>
</configuration>
5. 测试日志生成
在 Controller 方法上添加 @OperationLogAnnotation
注解,启动 SpringBoot 项目并访问接口:
import com.example.demo.annotation.OperationLogAnnotation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TestController {// 标记该方法需要记录操作日志@OperationLogAnnotation(module = "用户管理", type = "查询")@GetMapping("/user/query")public String queryUser(@RequestParam String username) {return "查询用户:" + username;}@OperationLogAnnotation(module = "用户管理", type = "新增")@GetMapping("/user/add")public String addUser(@RequestParam String username) {// 模拟异常if ("error".equals(username)) {throw new RuntimeException("用户名已存在");}return "新增用户成功:" + username;}
}
访问接口:
- 成功案例:
http://localhost:8080/user/query?username=test
- 失败案例:
http://localhost:8080/user/add?username=error
四、Kibana 日志可视化配置
1. 创建索引模式
- 访问 Kibana 控制台(
http://localhost:5601
); - 左侧菜单进入 Management → Stack Management → Index Patterns;
- 点击 Create index pattern,输入索引名称(如
spring-boot-operation-logs-*
,匹配 Logstash 配置的按日期索引); - 选择时间字段(
@timestamp
),点击 Create index pattern。
2. 日志查询与筛选
- 左侧菜单进入 Discover;
- 顶部选择创建的索引模式,即可查看所有操作日志;
- 支持通过字段筛选(如
operate_module:用户管理
)、时间范围筛选; - 可保存常用查询(如 “失败操作日志”)。
3. 创建日志仪表盘
- 左侧菜单进入 Dashboard → Create dashboard;
- 添加可视化组件(如:饼图展示操作成功 / 失败占比、柱状图展示各模块操作次数、表格展示最新操作日志);
- 保存仪表盘,用于日常日志监控。
五、关键优化与注意事项
1. 性能优化
- ES 索引优化:
- 按日期分片(如每天一个索引),避免单索引过大;
- 关闭索引副本(测试环境),或设置合理副本数(生产环境建议 1-2 个);
- 对高频查询字段(如
operate_user
、operate_module
)设置为 keyword 类型,提高检索速度。
- Logstash 优化:
- 调整 JVM 内存(根据服务器配置,建议 1-2G);
- 启用 Logstash 管道批处理(在
logstash.conf
中添加pipeline.workers: 4
、pipeline.batch.size: 1000
)。
- SpringBoot 日志优化:
- 避免日志冗余(如仅记录关键操作,不记录调试日志);
- 批量输出日志(高并发场景下,使用异步日志输出,避免阻塞业务线程)。
2. 生产环境注意事项
- ES 集群部署:单节点仅用于测试,生产环境需部署 3 个以上节点,确保高可用;
- 安全认证:开启 ES/Kibana 的用户名密码认证(xpack.security.enabled=true),避免未授权访问;
- 日志轮转:SpringBoot 本地日志需配置轮转(如 Logback 按大小 / 日期切割),避免日志文件过大;
- 数据清理:通过 ES 索引生命周期管理(ILM)设置日志过期时间(如保留 30 天),自动删除旧日志。
3. 常见问题排查
- Logstash 无法连接 ES:检查 ES 地址是否正确(Docker 内部用服务名
elasticsearch
,外部用 IP),ES 端口是否开放; - Kibana 看不到日志:
- 检查 Logstash 日志(
docker logs -f logstash
),是否有报错; - 检查 ES 索引是否创建(访问
http://localhost:9200/_cat/indices?v
); - 确认 Kibana 索引模式匹配 ES 索引名称。
- 检查 Logstash 日志(
- 日志字段解析异常:确保 Logstash 的
filter
配置与 SpringBoot 日志字段格式一致(如时间格式、字段名称)。
六、扩展功能
1. 手动操作 ES 日志(如删除、导出)
通过 Spring Data Elasticsearch 操作 ES 索引,示例:
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;import javax.annotation.Resource;@Service
public class OperationLogService {@Resourceprivate ElasticsearchRestTemplate elasticsearchRestTemplate;/*** 删除指定时间前的日志(如 30 天前)*/public void deleteExpiredLogs(String expireTime) {NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.rangeQuery("@timestamp").lt(expireTime)).build();// 删除匹配的日志(需注意:ES 删除操作会触发段合并,建议通过 ILM 管理)elasticsearchRestTemplate.delete(query, OperationLog.class);}
}
2. 集成 Filebeat(替代 SpringBoot 直接连接 Logstash)
高并发场景下,建议用 Filebeat 采集 SpringBoot 本地日志文件,再传输给 Logstash,减少 SpringBoot 与 Logstash 的直接耦合:
- 部署 Filebeat,配置采集 SpringBoot 日志文件;
- Logstash 输入改为
beats { port => 5044 }
,接收 Filebeat 数据。