当前位置: 首页 > news >正文

统计客户端使用情况,使用es存储数据,实现去重以及计数

这篇文件的重点在tshark、filebeat、和logstash。

需求:统计客户使用的客户端版本

实现工具:tshark 1.10.14,filebeat 8.17.0,logstash 8.17.0,elasticsearch 8.17.0,kibana 8.17.0

总体设计:使用tshark抓包,并以格式化的文本形式(字段中间以|分隔)输出到txt文件,txt文件每10M分割一个新文件,通过filebeat采集输出到logstash(由于filebeat功能过于简单,无法实现_id重复时更新操作,以及提取复杂字符串的需求,使用logstash作为中间处理层),最后使用es存储,kibana查看。

1.首先编写脚本实现网络抓取,分割文件,定期清理旧文件的功能,脚本如下

#!/bin/bash# TShark 持续抓包脚本# 配置参数
INTERFACE="any"          # 监听的网卡接口
OUTPUT_DIR="/home/vagrant/tshark"  # 输出目录
FILE_PREFIX="capture"     # 文件名前缀
MAX_FILE_SIZE=1000         # 单个文件最大大小(MB)
MAX_FILE_COUNT=10         # 最大文件数量(超过将删除最旧文件)
CAPTURE_FILTER=""         # 捕获过滤器(如"port 80")
BUFFER_SIZE=20            # 内存缓冲区大小(MB)
SNAPLEN=0                 # 抓包长度(0表示完整数据包)
PROCOTOL=http             # 协议# 创建输出目录
mkdir -p "$OUTPUT_DIR"# 生成带时间戳的文件名
current_file="${OUTPUT_DIR}/${FILE_PREFIX}_$(date +'%Y%m%d_%H%M%S')"# 清理旧文件函数
cleanup_old_files() {echo "[$(date)] 接收到清理信号,开始清理..."# 保留最新的 MAX_FILE_COUNT 个文件ls -t "${OUTPUT_DIR}/${FILE_PREFIX}"_*.txt 2>/dev/null | tail -n +3 | xargs -r rm -f
}# 捕获终止信号
trap 'echo "捕获终止..."; cleanup_old_files; exit 0' INT TERMecho "开始捕获网络流量..."
echo "输出文件: $current_file"# 开始捕获
tshark -i "$INTERFACE" \-T fields  -e ip.src -e ip.dst  -e tcp.dstport -e http.host -e http.user_agent -e http.authorization -e frame.time_epoch -E separator="|" | split -d -b 10M --additional-suffix=.txt - "${OUTPUT_DIR}/${FILE_PREFIX}_$(date +'%Y%m%d_%H%M%S')_" &
TSHARK_PID=$!echo "[$(date)] Tshark已启动 (PID: $TSHARK_PID)"
echo "[$(date)] 开始定期清理,每小时执行一次..."while kill -0 $TSHARK_PID 2>/dev/null; do  # 检查tshark是否仍在运行cleanup_old_filessleep 3600  # 每小时执行一次
doneecho "[$(date)] Tshark已停止,清理脚本退出"

2.使用filebeat采集,主要配置如下:

#提取各字段,删除无用字段,当user_agent为空时,放弃这个事件
processors:- dissect:tokenizer: "%{src_ip}|%{dst_ip}|%{dst_port}|%{host}|%{user_agent}|%{auth}|%{timestamp}"field: "message"- drop_fields:fields: ["ecs", "host", "input", "log", "agent", "dissect.timestamp", "message"]- drop_event:when:equals:dissect.user_agent: ""

3.logstash主要配置如下:

input {beats {port => 5044}
}filter {mutate {replace => { "temp_auth" => "%{[dissect][auth]}" }gsub => ["temp_auth", "OPEN-BODY-SIG ", ""]}#kv提取键值对内容kv {source => "temp_auth"field_split => ","value_split => "="prefix => "auth_"remove_field => "temp_auth"}mutate {rename => { "auth_AppId" => "app_id" }rename => { "auth_ClientId" => "app_id" }}fingerprint {source => ["[dissect][user_agent]", "app_id"]method => "SHA256"  # 也可以使用 MURMUR3, SHA1, MD5 等concatenate_sources => true  # 将多个字段值连接后再哈希target => "[@metadata][_id]"}
}output {elasticsearch {hosts => ["http://xxxxxx:9200"] # 根据你的ES地址修改index => "httppackmsg"document_id => "%{[@metadata][_id]}"action => "update"doc_as_upsert => truescript => "ctx._source.count = (ctx._source.count ?: 0) + 1"script_type => "inline"script_lang => "painless"}# 调试用(可选)stdout {codec => rubydebug}
}

logstash中使用kv提取auth中的键值对内容,并给提取出的key设置prefix,rename将字段修改为自建想要的名字。在fingerprint 中通过将user_agent和appid连接,使用SHA256生成_id.

相关文章:

  • java中的Servlet4.x详解
  • GitHub排名第一的开源ERP项目:Odoo生产计划与执行的功能概述
  • Git初始化本地已有项目,并推送到远端Git仓库完整操作指南
  • Linux《自主Shell命令行解释器》
  • 昆仑通态MCGSpro,自定义配方功能
  • 用 CodeBuddy 搭建「MiniGoal 小目标打卡器」:一次流畅的 UniApp 开发体验
  • Grafana当前状态:SingleStat面板
  • Windows系统编译Qt使用的kafka(librdkafka)
  • 精益数据分析(68/126):数据透视表实战与解决方案验证——从问卷分析到产品落地的关键跨越
  • python学习day1
  • MTK zephyr平台:系统休眠流程
  • Golang的网络安全策略实践
  • 数据库连接池技术与 Druid 连接工具类实现
  • Golang中的runtime.LockOSThread 和 runtime.UnlockOSThread
  • RabbitMQ通信模式(Simplest)Python示例
  • 百度飞桨OCR(PP-OCRv4_server_det|PP-OCRv4_server_rec_doc)文本识别-Java项目实践
  • Python函数——万字详解
  • 算法题(150):拼数
  • 用Python将 PDF 中的表格提取为 Excel/CSV
  • OpenCV计算机视觉实战(6)——经典计算机视觉算法
  • 美国前总统拜登确诊前列腺癌
  • 83岁山水花鸟画家、书法家吴静山离世,系岭南画派代表人物
  • 九江宜春领导干部任前公示,3人拟提名为县(市、区)长候选人
  • 《大风杀》导演张琪:为了不算计观众,拍了部不讨好的警匪片
  • 第一集|好饭不怕晚,折腰若如初见
  • 奥古斯都时代的历史学家李维