当前位置: 首页 > news >正文

ELK日志管理框架介绍

        在小铃铛的毕业设计中涉及到了ELK日志管理框架,在调研期间发现在中文中没有很好的对ELK框架进行介绍的文章,因此拟在本文中进行较为详细的实现的介绍。

理论知识

ELK 框架介绍

ELK 是一个流行的开源日志管理解决方案堆栈,由三个核心组件组成:

ELK框架核心组件
组件名称组件特性
Elasticsearch - 分布式搜索和分析引擎
   - 提供实时搜索和数据分析能力
   - 基于Lucene构建,具有高扩展性
Logstash   - 服务器端数据处理管道
   - 用于收集、解析和转换日志数据
   - 支持多种输入源和输出目标
Kibana - 数据可视化平台
   - 提供丰富的图表和仪表板功能
   - 允许用户交互式地探索数据

随着发展,ELK生态系统也出现了一些变体:

  • EFK:用Fluentd替代Logstash

  • ELK+Beats:加入轻量级数据采集器Beats系列工具

  • Elastic Stack:官方对ELK堆栈的新命名

主要特点

- 实时分析:能够近乎实时地处理和分析数据
- 可扩展性:可以水平扩展以处理大量数据
- 灵活性:支持多种数据源和格式
- 强大的搜索能力:提供全文搜索和结构化搜索

常见应用场景

- 日志集中管理和分析
- 应用程序性能监控
- 安全信息和事件管理(SIEM)
- 业务智能分析

实际应用

小铃铛在系统中使用了ELK Stack框架,数据流如下:

接下来以文章的日志数据的收集为例进行介绍。

Step1.日志模板设计

首先需要设计需要在日志中都收集什么数据,可以自定义模板,写在Logstash的模板配置里,即/templates文件夹下面,例如以下文件:

post-metrics-template.json

{"index_patterns": ["blog-post-metrics-*"],"version": 3,"priority": 400,"template": {"settings": {"number_of_shards": 1,"number_of_replicas": 1,"index.lifecycle.name": "blog-policy","index.lifecycle.rollover_alias": "blog-post-metrics"},"mappings": {"properties": {"@timestamp": {"type": "date"},"log_type": {"type": "keyword"},"service_name": {"type": "keyword"},"level": {"type": "keyword"},"operation": {"type": "keyword"},"post_id": {"type": "long"},"post_title": {"type": "keyword"},"user_id": {"type": "long"},"success": {"type": "boolean"},"duration": {"type": "long"},"error_message": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},"hostname": {"type": "keyword"},"os_name": {"type": "keyword"},"os_version": {"type": "keyword"},"environment": {"type": "keyword"},"service": {"type": "keyword"},"view_count": {"type": "long"}}}}
}

Step2.设计日志收集时机

根据我个人对日志的理解,后端会在两种情况下打印日志:针对某个位置的类的方法被调用时打印(静态的,使用文件位置定位)和针对类的某方法被调用时打印(动态的,使用注解自主决定)。或许以上的概括不够准确,我将进一步展示。

针对某个位置的类的方法被调用时

        当前AOP设计打印日志的时机是:调用了“com.kitty.blog.application”或“com.kitty.blog.interfaces”文件夹下的函数,并且没有执行“com.kitty.blog.infrastructure.security.filter”下的函数。

(但是以下示例并不是针对收集文章日志写的,看懂原理即可)

@Aspect
@Component
@Slf4j
public class BackendLoggingAspect {@Before("execution(* com.kitty.blog.application..*.*(..))" +" || execution(* com.kitty.blog.interfaces..*.*(..))" +" && !execution(* com.kitty.blog.infrastructure.security.filter..*.*(..))")public void before(JoinPoint joinPoint) {Map<String, Object> logData = new HashMap<>();logData.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());logData.put("log_type", LogConstants.LogType.API_METRICS);logData.put("application", LogConstants.APPLICATION_NAME);logData.put("phase", "method_start");logData.put("class", joinPoint.getTarget().getClass().getName());logData.put("method", joinPoint.getSignature().getName());logData.put("args", Arrays.toString(joinPoint.getArgs()));// 添加标签try {logData.put("host", java.net.InetAddress.getLocalHost().getHostName());logData.put("service", LogConstants.APPLICATION_NAME);logData.put("environment", System.getProperty("spring.profiles.active", "dev"));} catch (Exception e) {log.warn("Failed to add tags to log data", e);}log.info("Method Execution: {}", net.logstash.logback.argument.StructuredArguments.entries(logData));}@After("execution(* com.kitty.blog.application..*.*(..))" +" || execution(* com.kitty.blog.interfaces..*.*(..))" +" && !execution(* com.kitty.blog.infrastructure.security.filter..*.*(..))")public void after(JoinPoint joinPoint) {Map<String, Object> logData = new HashMap<>();logData.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());logData.put("log_type", LogConstants.LogType.API_METRICS);logData.put("application", LogConstants.APPLICATION_NAME);logData.put("phase", "method_end");logData.put("class", joinPoint.getTarget().getClass().getName());logData.put("method", joinPoint.getSignature().getName());// 添加标签try {logData.put("host", java.net.InetAddress.getLocalHost().getHostName());logData.put("service", LogConstants.APPLICATION_NAME);logData.put("environment", System.getProperty("spring.profiles.active", "dev"));} catch (Exception e) {log.warn("Failed to add tags to log data", e);}log.info("Method Execution: {}", net.logstash.logback.argument.StructuredArguments.entries(logData));}@AfterThrowing(pointcut = "execution(* com.kitty.blog.application..*.*(..))" +" || execution(* com.kitty.blog.interfaces..*.*(..))" +" && !execution(* com.kitty.blog.infrastructure.security.filter..*.*(..))", throwing = "ex")public void afterThrowing(JoinPoint joinPoint, Throwable ex) {Map<String, Object> logData = new HashMap<>();logData.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());logData.put("log_type", LogConstants.LogType.ERROR);logData.put("application", LogConstants.APPLICATION_NAME);logData.put("phase", "method_error");logData.put("class", joinPoint.getTarget().getClass().getName());logData.put("method", joinPoint.getSignature().getName());logData.put("error_message", ex.getMessage());logData.put("stack_trace", Arrays.toString(ex.getStackTrace()));// 添加标签try {logData.put("host", java.net.InetAddress.getLocalHost().getHostName());logData.put("service", LogConstants.APPLICATION_NAME);logData.put("environment", System.getProperty("spring.profiles.active", "dev"));} catch (Exception e) {log.warn("Failed to add tags to log data", e);}log.error("Method Execution Error: {}", net.logstash.logback.argument.StructuredArguments.entries(logData));}
}

针对类的某方法被调用时

        当前AOP设计打印日志的时机是:调用了被@LogPostMetrics这个annotation注解过的函数。其中,annotation可以自定义也可以使用预定义的。

@Aspect
@Component
@Slf4j
public class PostMetricsAspect {@Autowiredprivate PostRepository postRepository;@Around("@annotation(com.kitty.blog.common.annotation.LogPostMetrics)")public Object logPostMetrics(ProceedingJoinPoint joinPoint) throws Throwable {long startTime = System.currentTimeMillis();String operation = getOperationType(joinPoint);Long postId = getPostId(joinPoint);try {Object result = joinPoint.proceed();long duration = System.currentTimeMillis() - startTime;recordPostMetrics(operation, postId, true, null, duration);return result;} catch (Exception e) {long duration = System.currentTimeMillis() - startTime;recordPostMetrics(operation, postId, false, e.getMessage(), duration);throw e;}}private String getOperationType(ProceedingJoinPoint joinPoint) {MethodSignature signature = (MethodSignature) joinPoint.getSignature();LogPostMetrics annotation = signature.getMethod().getAnnotation(LogPostMetrics.class);return annotation.value();}private Long getPostId(ProceedingJoinPoint joinPoint) {Object[] args = joinPoint.getArgs();for (Object arg : args) {if (arg instanceof Long) {return (Long) arg;} else if (arg instanceof Integer) {return ((Integer) arg).longValue();} else if (arg instanceof String) {try {return Long.parseLong((String) arg);} catch (NumberFormatException e) {log.warn("Failed to parse post id from string: {}", arg);}}}return null;}private void recordPostMetrics(String operation, Long postId, boolean success, String errorMessage, long duration) {try {Map<String, Object> metrics = new HashMap<>();// 基础字段metrics.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());metrics.put("log_type", LogConstants.LogType.POST_METRICS);metrics.put("service", LogConstants.APPLICATION_NAME);metrics.put("level", "INFO");// 操作信息metrics.put("operation", operation);metrics.put("post_id", postId);if (postId != null) {try {Post post = postRepository.findById(postId.intValue()).orElse(null);if (post != null) {metrics.put("post_title", post.getTitle());metrics.put("user_id", post.getUserId());metrics.put("view_count", post.getViews());} else {metrics.put("post_title", "未找到文章");metrics.put("user_id", "unknown");log.error("文章不存在: {}", postId);}} catch (Exception e) {log.error("获取文章详情失败: {}", e.getMessage(), e);metrics.put("post_title", "获取文章信息失败");metrics.put("user_id", "unknown");}} else {metrics.put("post_title", "无文章ID");metrics.put("user_id", "unknown");log.error("文章ID为空");}metrics.put("success", success);metrics.put("duration", duration);if (errorMessage != null) {metrics.put("error_message", errorMessage);log.error("操作失败: {}", errorMessage);}// 添加标签metrics.put("host", java.net.InetAddress.getLocalHost().getHostName());metrics.put("environment", System.getProperty("spring.profiles.active", "dev"));log.info("Post Metrics: {}",net.logstash.logback.argument.StructuredArguments.entries(metrics));} catch (Exception e) {log.error("记录文章指标失败", e);}}
}

        该自定义的注解在使用时和预定义的一样,只需要在目标方法上写@LogPostMetrics即可。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LogPostMetrics {String value() default "";
}

Step3.用Logback类规定部分数据

        在logback-spring.xml主要规定了日志以天为单位会重新创建一个新的文件,便于管理。        

<?xml version="1.0" encoding="UTF-8"?>
<configuration><timestamp key="CURRENT_DATE" datePattern="yyyy-MM-dd"/><property name="LOG_PATH" value="./logs"/><property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"/><!-- 控制台输出 --><appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>${LOG_PATTERN}</pattern></encoder></appender><!-- 博客文章指标日志 --><appender name="POST_METRICS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/blog-post-metrics-${CURRENT_DATE}.json</file><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/blog-post-metrics-%d{yyyy.MM.dd}.json</fileNamePattern><maxHistory>7</maxHistory></rollingPolicy><encoder class="net.logstash.logback.encoder.LogstashEncoder"><customFields>{"application":"blog-system","log_type":"post-metrics"}</customFields><timestampPattern>yyyy-MM-dd'T'HH:mm:ss.SSSZZ</timestampPattern><includeMdcData>true</includeMdcData><includeContext>true</includeContext><fieldNames><tags>[ignore]</tags><hostname>host</hostname></fieldNames><provider class="net.logstash.logback.composite.loggingevent.LogstashMarkersJsonProvider"/><provider class="net.logstash.logback.composite.loggingevent.MdcJsonProvider"/></encoder></appender><!-- 日志配置 --><root level="INFO"><appender-ref ref="CONSOLE"/><appender-ref ref="ERROR_FILE"/></root><logger name="com.kitty.blog.common.aspect.PostMetricsAspect" level="INFO" additivity="false"><appender-ref ref="POST_METRICS_FILE"/><appender-ref ref="CONSOLE"/></logger>
</configuration>

Step4.ELK环境搭建

现在已经配置好了日志文件将存储的位置,我们需要保证Filebeat可以读取到,比如说下面是我的Docker环境里的Filebeat配置。

docker-compose.yml

  filebeat:image: elastic/filebeat:8.11.1user: rootvolumes:- ./config/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml- ../../../logs:/var/log/blog:ro- /var/lib/docker/containers:/var/lib/docker/containers:ro- /var/run/docker.sock:/var/run/docker.sock:rodepends_on:logstash:condition: service_healthyenvironment:- ENVIRONMENT=dev- ELASTIC_USERNAME=elastic- ELASTIC_PASSWORD=123456networks:- elk-networkcommand: >bash -c "chmod go-w /usr/share/filebeat/filebeat.yml &&chown root:root /usr/share/filebeat/filebeat.yml &&chmod 0644 /usr/share/filebeat/filebeat.yml &&filebeat -e -strict.perms=false"

filebeat.yml 

filebeat.inputs:
- type: logenabled: truepaths:- /var/log/blog/blog-api-metrics-*.jsonjson.keys_under_root: truejson.add_error_key: truejson.ignore_decoding_error: truefields:app: blog-systemenvironment: ${ENVIRONMENT:-dev}log_type: api-metricsfields_under_root: true- type: logenabled: truepaths:- /var/log/blog/blog-error-*.jsonjson.keys_under_root: truejson.add_error_key: truejson.ignore_decoding_error: truefields:app: blog-systemenvironment: ${ENVIRONMENT:-dev}log_type: errorfields_under_root: true- type: logenabled: truepaths:- /var/log/blog/blog-post-metrics-*.jsonjson.keys_under_root: truejson.add_error_key: truejson.ignore_decoding_error: truefields:app: blog-systemenvironment: ${ENVIRONMENT:-dev}log_type: post-metricsfields_under_root: true- type: logenabled: truepaths:- /var/log/blog/blog-system-metrics-*.jsonjson.keys_under_root: truejson.add_error_key: truejson.ignore_decoding_error: truefields:app: blog-systemenvironment: ${ENVIRONMENT:-dev}log_type: system-metricsfields_under_root: true- type: logenabled: truepaths:- /var/log/blog/blog-user-activity-*.jsonjson.keys_under_root: truejson.add_error_key: truejson.ignore_decoding_error: truefields:app: blog-systemenvironment: ${ENVIRONMENT:-dev}log_type: user-activityfields_under_root: trueprocessors:- add_host_metadata: ~- add_docker_metadata: ~output.logstash:hosts: ["logstash:5000"]loadbalance: truebulk_max_size: 2048logging.level: info
logging.to_files: true
logging.files:path: /var/log/filebeatname: filebeat.logrotateeverybytes: 10485760  # 10MBkeepfiles: 7permissions: 0644setup.ilm.enabled: false
setup.template.enabled: false

那么现在日志已经被收集到Filebeat里,Logstash的配置如下。

docker-compose.yml

  logstash:image: logstash:8.11.1depends_on:elasticsearch:condition: service_healthy  # 添加健康检查依赖ports:- "5000:5000"volumes:- ./config/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml:ro # 标记为只读- ./config/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro- ./config/logstash/templates:/usr/share/logstash/templates:roenvironment:- "LS_JAVA_OPTS=-Xms256m -Xmx256m"- ELASTICSEARCH_USERNAME=elastic- ELASTICSEARCH_PASSWORD=123456networks:- elk-networkhealthcheck:test: ["CMD-SHELL", "curl -s -f http://localhost:9600 || exit 1"]interval: 30stimeout: 10sretries: 3start_period: 40s

logstash.conf

input {beats {port => 5000host => "0.0.0.0"}
}filter {json {source => "message"skip_on_invalid_json => truetarget => "parsed_json"}# 处理 host 字段mutate {add_field => {"hostname" => "%{[host][name]}""os_name" => "%{[host][os][name]}""os_version" => "%{[host][os][version]}"}remove_field => ["host"]}# 将 host、environment 和 service 作为独立字段处理mutate {add_field => {"environment" => "%{[parsed_json][environment]}""service" => "%{[parsed_json][service]}"}remove_field => ["tags"]}# 调试输出当前事件ruby {code => "require 'logger'logger = Logger.new(STDOUT)logger.info('Current event:')logger.info(event.to_hash.inspect)"}# 根据log_type设置目标索引if [log_type] == "user-activity" {mutate {add_field => { "[@metadata][index]" => "blog-user-activity-%{+YYYY.MM.dd}" }add_field => { "[@metadata][template_name]" => "user-activity-template" }}} else if [log_type] == "post-metrics" {mutate {add_field => { "[@metadata][index]" => "blog-post-metrics-%{+YYYY.MM.dd}" }add_field => { "[@metadata][template_name]" => "post-metrics-template" }}# Extract view_count from parsed JSONif [parsed_json][view_count] {mutate {add_field => { "view_count" => "%{[parsed_json][view_count]}" }}mutate {convert => { "view_count" => "integer" }}}}else if [log_type] == "api-metrics" {mutate {add_field => { "[@metadata][index]" => "blog-api-metrics-%{+YYYY.MM.dd}" }add_field => { "[@metadata][template_name]" => "blog-template" }}} else if [log_type] == "error" {mutate {add_field => { "[@metadata][index]" => "blog-error-logs-%{+YYYY.MM.dd}" }add_field => { "[@metadata][template_name]" => "blog-template" }}} else if [log_type] == "system-metrics" {mutate {add_field => { "[@metadata][index]" => "blog-system-metrics-%{+YYYY.MM.dd}" }add_field => { "[@metadata][template_name]" => "blog-template" }}}# 如果没有匹配到任何类型,添加到默认索引if ![@metadata][index] {mutate {add_field => { "[@metadata][index]" => "blog-unknown-1" }add_field => { "[@metadata][template_name]" => "blog" }add_field => { "error_message" => "Unknown log_type: %{[log_type]}" }}}# 确保时间戳格式正确date {match => [ "@timestamp", "ISO8601" ]target => "@timestamp"timezone => "Asia/Shanghai"}# 移除不需要的字段mutate {remove_field => ["message", "tags", "beat", "input", "prospector", "agent"]}
}output {if [@metadata][index] {if [@metadata][template_name] == "post-metrics-template" {elasticsearch {hosts => ["elasticsearch:9200"]user => "${ELASTICSEARCH_USERNAME}"password => "${ELASTICSEARCH_PASSWORD}"index => "%{[@metadata][index]}"template => "/usr/share/logstash/templates/post-metrics-template.json"template_name => "post-metrics-template"template_overwrite => truedata_stream => false}} else if [@metadata][template_name] == "user-activity-template" {elasticsearch {hosts => ["elasticsearch:9200"]user => "${ELASTICSEARCH_USERNAME}"password => "${ELASTICSEARCH_PASSWORD}"index => "%{[@metadata][index]}"template => "/usr/share/logstash/templates/user-activity-template.json"template_name => "user-activity-template"template_overwrite => truedata_stream => false}} else if [@metadata][template_name] == "blog-template" {elasticsearch {hosts => ["elasticsearch:9200"]user => "${ELASTICSEARCH_USERNAME}"password => "${ELASTICSEARCH_PASSWORD}"index => "%{[@metadata][index]}"template => "/usr/share/logstash/templates/blog-template.json"template_name => "blog-template"template_overwrite => truedata_stream => false}}}
}

logstash.yml 

http.host: "0.0.0.0"
xpack.monitoring.enabled: falsepath.config: /usr/share/logstash/pipeline
path.logs: /var/log/logstashpipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50queue.type: memory
queue.max_bytes: 1024mblog.level: info

接下来存储到Elasticsearch里,如果需要额外功能需自己下载插件,例如中文分词器。

docker-compose.yml

请确定暴露出来一个账号和密码使得Logstash和Kibana可以访问。当然,账号可以不是同一个,但是应至少给Logstash的用户配有写入权限,Kibana的用户配有读出权限。

  elasticsearch:image: elasticsearch:8.11.1environment:- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- xpack.security.enabled=true- ELASTIC_PASSWORD=123456- bootstrap.memory_lock=true- cluster.name=docker-cluster- network.host=0.0.0.0- xpack.security.transport.ssl.enabled=false- xpack.security.http.ssl.enabled=false- "ELASTIC_USERNAME=elastic"- KIBANA_SYSTEM_PASSWORD=kibana123ports:- "9201:9200"- "9300:9300"volumes:- elasticsearch_data:/usr/share/elasticsearch/data- ./config/elasticsearch/plugins/analysis-ik:/usr/share/elasticsearch/plugins/analysis-ik- ./config/elasticsearch/setup:/usr/share/elasticsearch/setupnetworks:- elk-networkhealthcheck:test: ["CMD-SHELL", "curl -s -u elastic:123456 http://localhost:9200/_cluster/health || exit 1"]interval: 30stimeout: 10sretries: 5start_period: 60sulimits:  # 添加系统限制memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536

接下来配置Kibana:

docker-compose.yml

  kibana:image: kibana:8.11.1depends_on:elasticsearch:condition: service_healthyports:- "5601:5601"environment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200- ELASTICSEARCH_USERNAME=kibana_system- ELASTICSEARCH_PASSWORD=kibana123- SERVER_NAME=kibana- SERVER_HOST=0.0.0.0- XPACK_REPORTING_ENABLED=falsehealthcheck:test: ["CMD-SHELL", "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'"]interval: 10stimeout: 10sretries: 120networks:- elk-network

Kibana的默认访问地址是http://localhost:5601,管理员用户和密码是:elastic和123456。此时,你就可以使用可视化的方式来进行分析数据并管理集群状态等操作了。

相关文章:

  • XSS(跨站脚本攻击)详解
  • 对称哈希连接实现
  • ECharts 提示框(tooltip)居中显示位置的设置技巧
  • 学习STC51单片机30(芯片为STC89C52RCRC)
  • Jina AI 开源 node-DeepResearch
  • 网络协议通俗易懂详解指南
  • 应用层协议:HTTPS
  • 物联网技术发展与应用研究分析
  • 【AUTOSAR COM CAN】CanTSyn模块技术解析
  • ubuntu显示器未知
  • MobX与响应式编程实践
  • MySQL:分区的基本使用
  • 外贸网站服务器选择Siteground还是Hostinger,哪个更好?
  • 【C/C++】STL实现版本为什么比手写版本高?
  • 在Mathematica中使用Newton-Raphson迭代绘制一个花脸
  • 跳转指令四维全解:从【call/jmp 】的时空法则到内存迷宫导航术
  • 跳跃游戏 dp还是线段树优化
  • 在ubuntu等linux系统上申请https证书
  • OneNet + openssl + MTLL
  • GoC指令测试卷 A
  • 苏州公司网站建设服务/西安关键词优化排名
  • 南京建设银行官方网站/软件开发工资一般多少
  • 太原住房和城乡建设部网站/郑州网站seo优化公司
  • mac wordpress本地安装/性价比高seo排名
  • 有什么免费企业网站是做企业黄页的/百度seo搜索
  • 网站首页html代码/抖音seo怎么收费