统计客户端使用情况,使用es存储数据,实现去重以及计数
这篇文件的重点在tshark、filebeat、和logstash。
需求:统计客户使用的客户端版本
实现工具:tshark 1.10.14,filebeat 8.17.0,logstash 8.17.0,elasticsearch 8.17.0,kibana 8.17.0
总体设计:使用tshark抓包,并以格式化的文本形式(字段中间以|分隔)输出到txt文件,txt文件每10M分割一个新文件,通过filebeat采集输出到logstash(由于filebeat功能过于简单,无法实现_id重复时更新操作,以及提取复杂字符串的需求,使用logstash作为中间处理层),最后使用es存储,kibana查看。
1.首先编写脚本实现网络抓取,分割文件,定期清理旧文件的功能,脚本如下
#!/bin/bash# TShark 持续抓包脚本# 配置参数
INTERFACE="any" # 监听的网卡接口
OUTPUT_DIR="/home/vagrant/tshark" # 输出目录
FILE_PREFIX="capture" # 文件名前缀
MAX_FILE_SIZE=1000 # 单个文件最大大小(MB)
MAX_FILE_COUNT=10 # 最大文件数量(超过将删除最旧文件)
CAPTURE_FILTER="" # 捕获过滤器(如"port 80")
BUFFER_SIZE=20 # 内存缓冲区大小(MB)
SNAPLEN=0 # 抓包长度(0表示完整数据包)
PROCOTOL=http # 协议# 创建输出目录
mkdir -p "$OUTPUT_DIR"# 生成带时间戳的文件名
current_file="${OUTPUT_DIR}/${FILE_PREFIX}_$(date +'%Y%m%d_%H%M%S')"# 清理旧文件函数
cleanup_old_files() {echo "[$(date)] 接收到清理信号,开始清理..."# 保留最新的 MAX_FILE_COUNT 个文件ls -t "${OUTPUT_DIR}/${FILE_PREFIX}"_*.txt 2>/dev/null | tail -n +3 | xargs -r rm -f
}# 捕获终止信号
trap 'echo "捕获终止..."; cleanup_old_files; exit 0' INT TERMecho "开始捕获网络流量..."
echo "输出文件: $current_file"# 开始捕获
tshark -i "$INTERFACE" \-T fields -e ip.src -e ip.dst -e tcp.dstport -e http.host -e http.user_agent -e http.authorization -e frame.time_epoch -E separator="|" | split -d -b 10M --additional-suffix=.txt - "${OUTPUT_DIR}/${FILE_PREFIX}_$(date +'%Y%m%d_%H%M%S')_" &
TSHARK_PID=$!echo "[$(date)] Tshark已启动 (PID: $TSHARK_PID)"
echo "[$(date)] 开始定期清理,每小时执行一次..."while kill -0 $TSHARK_PID 2>/dev/null; do # 检查tshark是否仍在运行cleanup_old_filessleep 3600 # 每小时执行一次
doneecho "[$(date)] Tshark已停止,清理脚本退出"
2.使用filebeat采集,主要配置如下:
#提取各字段,删除无用字段,当user_agent为空时,放弃这个事件
processors:- dissect:tokenizer: "%{src_ip}|%{dst_ip}|%{dst_port}|%{host}|%{user_agent}|%{auth}|%{timestamp}"field: "message"- drop_fields:fields: ["ecs", "host", "input", "log", "agent", "dissect.timestamp", "message"]- drop_event:when:equals:dissect.user_agent: ""
3.logstash主要配置如下:
input {beats {port => 5044}
}filter {mutate {replace => { "temp_auth" => "%{[dissect][auth]}" }gsub => ["temp_auth", "OPEN-BODY-SIG ", ""]}#kv提取键值对内容kv {source => "temp_auth"field_split => ","value_split => "="prefix => "auth_"remove_field => "temp_auth"}mutate {rename => { "auth_AppId" => "app_id" }rename => { "auth_ClientId" => "app_id" }}fingerprint {source => ["[dissect][user_agent]", "app_id"]method => "SHA256" # 也可以使用 MURMUR3, SHA1, MD5 等concatenate_sources => true # 将多个字段值连接后再哈希target => "[@metadata][_id]"}
}output {elasticsearch {hosts => ["http://xxxxxx:9200"] # 根据你的ES地址修改index => "httppackmsg"document_id => "%{[@metadata][_id]}"action => "update"doc_as_upsert => truescript => "ctx._source.count = (ctx._source.count ?: 0) + 1"script_type => "inline"script_lang => "painless"}# 调试用(可选)stdout {codec => rubydebug}
}
logstash中使用kv提取auth中的键值对内容,并给提取出的key设置prefix,rename将字段修改为自建想要的名字。在fingerprint 中通过将user_agent和appid连接,使用SHA256生成_id.