当前位置: 首页 > news >正文

【网络入侵检测】基于Suricata源码分析FlowWorker实现

【作者主页】只道当时是寻常

【专栏介绍】Suricata入侵检测。专注网络、主机安全,欢迎关注与评论。

1.概要

👋 本文主要基于 Suricata 开源项目源码,深入剖析其核心模块之一 ——流处理模块 FlowWorker的完整实现流程。作为网络流量分析与入侵检测的关键组件,FlowWorker 承担着连接跟踪、状态管理、数据重组及检测调度等核心功能。

2. 源码解析

2.1 流(flow)

在 Suricata 中,流(Flows)非常重要。流有点类似于连接,只不过流的概念更具普遍性。所有具有相同元组(协议、源 IP 地址、目的 IP 地址、源端口、目的端口)的数据包都属于同一个流

属于某个流的数据包在内部是相互关联的,如下图所示,在同一条流上,两段都有数据包传输。

2.1.1 获取流对象

Suricata 接收并解码数据包后,会通过 FlowHandlePacket 函数依据当前数据包信息,判断它是否属于已创建的某个流。若不属于任何已创建的流,即可能是新流的首个数据包,此时便会创建该数据包所在的流对象。

flow_hash 是存储流对象的全局哈希表,其定义如下所示:

FlowBucket *flow_hash;

流哈希表的 key 存储在 Packet 结构中的 flow_hash 变量中,在对数据包进行解码操作时,通过调用 FlowGetHash 函数计算当前数据包的 flow 哈希 key。计算哈希 key 需要以下九个元素值:

  • 源IP地址;

  • 目的IP地址;

  • 源端口号;

  • 目的端口号;

  • 协议类型;

  • 递归层级(适用于隧道(tunnel)类型的网络包,需要递归解码报文头);

  • 网卡设备标识符(livedev ids),在 Suricata 中可通过 livedev.use-for-tracking 选项控制是否将网卡id作为哈希key计算因素。此选项处于启用状态,意味着在哈希计算中会包含网卡设备的标识符,使得来自不同网卡的相同流量被视为不同的流。如果关闭,则不同网卡收到的同一流量视为同一流。

  • vlan id;

  • 随机数(32bit),该随机数是在程序启动初期流引擎初始化时生成,在程序的整个生命周期过程中不变。

2.1.2 更新流和包信息

2.1.1 章节已介绍 Suricata 获取网络包后,根据包信息从流哈希表中找到对应流对象。本章节介绍,Suricata 拿到流对象后,会结合流对象和包中的信息,更新两者的标志位信息。

FlowUpdate 函数是入口函数,该函数代码实现如下所示:

/** \brief handle flow for packet**  Handle flow creation/lookup*/
static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
{FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);int state = p->flow->flow_state;switch (state) {
#ifdef CAPTURE_OFFLOADcase FLOW_STATE_CAPTURE_BYPASSED: {StatsAddUI64(tv, fw->both_bypass_pkts, 1);StatsAddUI64(tv, fw->both_bypass_bytes, GET_PKT_LEN(p));Flow *f = p->flow;FlowDeReference(&p->flow);FLOWLOCK_UNLOCK(f);return TM_ECODE_DONE;}
#endifcase FLOW_STATE_LOCAL_BYPASSED: {StatsAddUI64(tv, fw->local_bypass_pkts, 1);StatsAddUI64(tv, fw->local_bypass_bytes, GET_PKT_LEN(p));Flow *f = p->flow;FlowDeReference(&p->flow);FLOWLOCK_UNLOCK(f);return TM_ECODE_DONE;}default:return TM_ECODE_OK;}
}

下面主要详细介绍 FlowHandlePacketUpdate 函数的逻辑。我将该函数拆解成多个代码块来介绍 Suricata 如何更新流和包的标志位信息。

(1)FlowGetPacketDirection 函数通过比较端口号大小或者比较IP地址的方法推算出当前报文的方向,即 TOSERVER 还是 TOCLIENT

const int pkt_dir = FlowGetPacketDirection(f, p);

(2)CAPTURE_OFFLOAD 宏是流量捕获卸载的开关。开启该模式后,Suricata 可按策略将部分流量处理工作卸载到硬件或内核模块,以提升高流量时的性能。例如流量被标记为 bypass 时,Suricata 能通过下发 eBPF/XDP 规则实现底层导流。下面代码块用于更新流对象的 lastts 变量,该变量记录当前流最新包的时间戳(即当前包的时间)。

#ifdef CAPTURE_OFFLOADint state = f->flow_state;if (state != FLOW_STATE_CAPTURE_BYPASSED) {
#endif/* update the last seen timestamp of this flow */if (SCTIME_CMP_GT(p->ts, f->lastts)) {f->lastts = p->ts;const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + f->timeout_policy;if (timeout_at != f->timeout_at) {f->timeout_at = timeout_at;}}
#ifdef CAPTURE_OFFLOAD} else {/* still seeing packet, we downgrade to local bypass */if (SCTIME_SECS(p->ts) - SCTIME_SECS(f->lastts) > FLOW_BYPASSED_TIMEOUT / 2) {SCLogDebug("Downgrading flow to local bypass");f->lastts = p->ts;FlowUpdateState(f, FLOW_STATE_LOCAL_BYPASSED);} else {/* In IPS mode the packet could come from the other interface so it would* need to be bypassed */if (EngineModeIsIPS()) {BypassedFlowUpdate(f, p);}}}
#endif

(3)根据报文方向(TOSERVER/TOCLIENT)更新计数器和标志位:统计流中两个方向的报文数量、字节总数;依据 Flow 对象标志位设置 Packet 标志位状态,并更新流的最小和最大 TTL 值。

/* update flags and counters */if (pkt_dir == TOSERVER) {f->todstpktcnt++;f->todstbytecnt += GET_PKT_LEN(p);p->flowflags = FLOW_PKT_TOSERVER;if (!(f->flags & FLOW_TO_DST_SEEN)) { // 如果在这个流中这个方向没有看到过数据包,那么这是第一个数据包if (FlowUpdateSeenFlag(p)) {f->flags |= FLOW_TO_DST_SEEN;p->flowflags |= FLOW_PKT_TOSERVER_FIRST;}}/* xfer proto detect ts flag to first packet in ts dir */// 当flow(流)已经完成ToServer方向的协议检测(FLOW_PROTO_DETECT_TS_DONE标志被设置)时// 将这个状态从flow结构转移到当前数据包(packet)中// 通过设置PKT_PROTO_DETECT_TS_DONE标志来标记该数据包if (f->flags & FLOW_PROTO_DETECT_TS_DONE) {f->flags &= ~FLOW_PROTO_DETECT_TS_DONE;p->flags |= PKT_PROTO_DETECT_TS_DONE;}FlowUpdateEthernet(tv, dtv, f, p->ethh, true);/* update flow's ttl fields if needed */// 更新流的最小和最大TTL值if (PKT_IS_IPV4(p)) {FlowUpdateTtlTS(f, p, IPV4_GET_IPTTL(p));} else if (PKT_IS_IPV6(p)) {FlowUpdateTtlTS(f, p, IPV6_GET_HLIM(p));}} else {f->tosrcpktcnt++;f->tosrcbytecnt += GET_PKT_LEN(p);p->flowflags = FLOW_PKT_TOCLIENT;if (!(f->flags & FLOW_TO_SRC_SEEN)) {if (FlowUpdateSeenFlag(p)) {f->flags |= FLOW_TO_SRC_SEEN;p->flowflags |= FLOW_PKT_TOCLIENT_FIRST;}}/* xfer proto detect tc flag to first packet in tc dir */if (f->flags & FLOW_PROTO_DETECT_TC_DONE) {f->flags &= ~FLOW_PROTO_DETECT_TC_DONE;p->flags |= PKT_PROTO_DETECT_TC_DONE;}FlowUpdateEthernet(tv, dtv, f, p->ethh, false);/* update flow's ttl fields if needed */if (PKT_IS_IPV4(p)) {FlowUpdateTtlTC(f, p, IPV4_GET_IPTTL(p));} else if (PKT_IS_IPV6(p)) {FlowUpdateTtlTC(f, p, IPV6_GET_HLIM(p));}}

(4)根据当前流状态更新 Packet 状态,流状态包含 FLOW_STATE_NEWFLOW_STATE_ESTABLISHEDFLOW_STATE_CLOSEDFLOW_STATE_LOCAL_BYPASSED 四种。其中 FLOW_STATE_ESTABLISHED 表示流两端会话已建立:若为 TCP 协议,通过 TcpSession 状态是否为 TCP_ESTABLISHED 判断;若为其他协议,通过两端是否都发送数据(即 FLOW_TO_DST_SEEN FLOW_TO_SRC_SEEN 标志位均置位)判断。

if (f->flow_state == FLOW_STATE_ESTABLISHED) {SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);p->flowflags |= FLOW_PKT_ESTABLISHED;} else if (f->proto == IPPROTO_TCP) {TcpSession *ssn = (TcpSession *)f->protoctx;if (ssn != NULL && ssn->state >= TCP_ESTABLISHED) {p->flowflags |= FLOW_PKT_ESTABLISHED;}} else if ((f->flags & (FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) ==(FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) {SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);p->flowflags |= FLOW_PKT_ESTABLISHED;if (
#ifdef CAPTURE_OFFLOAD(f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) &&
#endif(f->flow_state != FLOW_STATE_LOCAL_BYPASSED)) {FlowUpdateState(f, FLOW_STATE_ESTABLISHED);}}

(5)如果当前流设置了 FLOW_ACTION_DROP 标志位,则说明这条流上的所有包都应该丢弃,PacketDrop 函数通过设置 Packet 中的标志位标识当前包要丢弃。

if (f->flags & FLOW_ACTION_DROP) {PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_DROP);
}

(6)如果当前流设置了FLOW_NOPACKET_INSPECTION(当前流的所有包不再检查) 和 FLOW_NOPAYLOAD_INSPECTION(当前流的所有包应用数据不再检查)标志位置位,则同步更新到 Pakcet 中。

 /*set the detection bypass flags*/
if (f->flags & FLOW_NOPACKET_INSPECTION) {SCLogDebug("setting FLOW_NOPACKET_INSPECTION flag on flow %p", f);DecodeSetNoPacketInspectionFlag(p);
}
if (f->flags & FLOW_NOPAYLOAD_INSPECTION) {SCLogDebug("setting FLOW_NOPAYLOAD_INSPECTION flag on flow %p", f);DecodeSetNoPayloadInspectionFlag(p);
}

2.2 应用数据解析

Suricata 获取流对象并更新 Flow Packet 状态后,按传输层协议进行数据重组和应用数据解析。如果是 TCP 协议则需要通过 stream 引擎进行数据重组操作等,如果是UDP协议则无需重组。最后根据协议类型分别调用 AppLayerHandleTCPData 函数和 AppLayerHandleUdp 函数解析应用层数据。

2.3 报文检测

截止2.2章节应用数据解析后,当前报文的包头和应用数据均解析完毕,状态也同步更新,接下来则执行检测操作。

 /* handle Detect */
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
if (detect_thread != NULL) {FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);Detect(tv, p, detect_thread);FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
}

2.4 日志输出

2.3 章节中,报文经检测引擎检测完成后,需通过指定方式输出检测过程中产生的告警日志,该操作通过调用 OutputLoggerLog 函数实现。

TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
{LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;RootLogger *logger = TAILQ_FIRST(&active_loggers);LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store);while (logger && thread_store_node) {logger->LogFunc(tv, p, thread_store_node->thread_data);logger = TAILQ_NEXT(logger, entries);thread_store_node = TAILQ_NEXT(thread_store_node, entries);}return TM_ECODE_OK;
}

2.5 资源释放

报文检测和日志输出后则需要释放过期的数据和资源,重置标志位等操作。

 /*  Release tcp segments. Done here after alerting can use them. */if (p->flow != NULL) {DEBUG_ASSERT_FLOW_LOCKED(p->flow);if (FlowIsBypassed(p->flow)) {FlowCleanupAppLayer(p->flow);if (p->proto == IPPROTO_TCP) {StreamTcpSessionCleanup(p->flow->protoctx);}} else if (p->proto == IPPROTO_TCP && p->flow->protoctx) {FramesPrune(p->flow, p);FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?STREAM_TOSERVER : STREAM_TOCLIENT);FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);} else if (p->proto == IPPROTO_UDP) {FramesPrune(p->flow, p);}if ((PKT_IS_PSEUDOPKT(p)) ||(p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {if (PKT_IS_TOSERVER(p)) {if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);p->flow->flags &= ~FLOW_TS_APP_UPDATED;}} else {if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);p->flow->flags &= ~FLOW_TC_APP_UPDATED;}}} else {SCLogDebug("not pseudo, no app update: skip");}if (p->flow->flags & FLOW_ACTION_DROP) {SCLogDebug("flow drop in place: remove app update flags");p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);}Flow *f = p->flow;FlowDeReference(&p->flow);FLOWLOCK_UNLOCK(f);}

2.6 流清理维护

Suricata 的 FlowWorker 模块处理完上述操作后,通过 FlowWorkerProcessInjectedFlows 函数从线程共享的流队列 tv->flow_queue 中提取其他线程(如流管理线程)注入的流对象。这些流可能因超时而未检测,或因哈希表空间不足被强制清理,均需进行强制重组检测。

FlowWorker 模块将这些流对象放入本地流队列 fw->fls.work_queue,由 FlowWorkerProcessLocalFlows 函数处理。该函数并非一次性处理所有流对象,而是在每次处理 Packet 时顺带处理 2 个。但若系统处于退出状态(需处理完所有流后退出)或抓包超时,则会一次性处理完本地队列中的所有流对象。

2.7 总结

Suricata 处理数据时按接收数据包、解码数据包、流处理(FlowWorker)的顺序进行,这三个模块可由单个线程同步调用或多个线程异步调用。

运行模式和接收解码参考下面:

【网络入侵检测】基于Suricata源码分析运行模式(Runmode)https://blog.csdn.net/qq_29490749/article/details/148028102?spm=1001.2014.3001.5501【网络入侵检测】基于源码分析Suricata的解码模块https://blog.csdn.net/qq_29490749/article/details/147623664?spm=1001.2014.3001.5501【网络入侵检测】基于源码分析Suricata的PCAP模式https://blog.csdn.net/qq_29490749/article/details/147517295?spm=1001.2014.3001.5501本文主要介绍 FlowWorker 模块对数据包的处理流程,即当前数据包已被 Suricata 接收并完成解码。FlowWorker 模块的主要工作包括:根据数据包获取 Flow 对象、解析应用数据、执行报文检测、输出告警日志、释放过期流资源,以及维护超时或退出状态下的流资源等。

相关文章:

  • 智能仓储落地:机器人如何通过自动化减少仓库操作失误?
  • DeepSeek - 尝试一下GitHub Models中的DeepSeek
  • EasyRTC音视频实时通话助力微信小程序:打造低延迟、高可靠的VoIP端到端呼叫解决方案
  • 【ConvLSTM第二期】模拟视频帧的时序建模(Python代码实现)
  • Text-to-SQL评估体系:从Spider 1.0数据集到2.0框架的跨越与革新
  • HOW - 简历和求职面试宝典(八)
  • 【春秋云镜】CVE-2022-26965 靶场writeup
  • 江西某石灰石矿边坡自动化监测
  • 【Hive 运维实战】一键管理 Hive 服务:Metastore 与 HiveServer2 控制脚本开发与实践
  • LeetCode Hot100(多维动态规划)
  • 基于vue框架的独居老人上门护理小程序的设计r322q(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • 前端面试核心考点全解析
  • 华为OD机试真题——告警抑制(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • .NET 开源工业视觉系统 OpenIVS 快速搭建自动化检测平台
  • ST-GCN
  • 表单请求为什么需要进行 URL 编码?—详解application/x-www-form-urlencoded的正确用法
  • 自动化立体仓库堆垛机SRM控制系统FC19手动控制功能块开发
  • 制造业的未来图景:超自动化与劳动力转型的双重革命
  • 数据结构(7)树-二叉树-堆
  • <el-date-picker>组件传参时,选中时间和传参偏差8小时
  • 公安局网站建设方案/市场营销案例分析及解答
  • 龙岗网站建设 信科网络/游戏推广员拉人技巧
  • 网站设计一般包括什么/百度联盟项目看广告挣钱
  • 做网站外国的服务器/seo公司上海牛巨微
  • 朝阳做网站的公司/长沙百度搜索排名
  • 福州网站设计哪家靠谱/成都比较靠谱的seo