【网络入侵检测】基于Suricata源码分析FlowWorker实现
【作者主页】只道当时是寻常
【专栏介绍】Suricata入侵检测。专注网络、主机安全,欢迎关注与评论。
1.概要
👋 本文主要基于 Suricata 开源项目源码,深入剖析其核心模块之一 ——流处理模块 FlowWorker的完整实现流程。作为网络流量分析与入侵检测的关键组件,FlowWorker 承担着连接跟踪、状态管理、数据重组及检测调度等核心功能。
2. 源码解析
2.1 流(flow)
在 Suricata 中,流(Flows)非常重要。流有点类似于连接,只不过流的概念更具普遍性。所有具有相同元组(协议、源 IP 地址、目的 IP 地址、源端口、目的端口)的数据包都属于同一个流。
属于某个流的数据包在内部是相互关联的,如下图所示,在同一条流上,两段都有数据包传输。
2.1.1 获取流对象
Suricata 接收并解码数据包后,会通过 FlowHandlePacket 函数依据当前数据包信息,判断它是否属于已创建的某个流。若不属于任何已创建的流,即可能是新流的首个数据包,此时便会创建该数据包所在的流对象。
flow_hash 是存储流对象的全局哈希表,其定义如下所示:
FlowBucket *flow_hash;
流哈希表的 key 存储在 Packet 结构中的 flow_hash 变量中,在对数据包进行解码操作时,通过调用 FlowGetHash 函数计算当前数据包的 flow 哈希 key。计算哈希 key 需要以下九个元素值:
-
源IP地址;
-
目的IP地址;
-
源端口号;
-
目的端口号;
-
协议类型;
-
递归层级(适用于隧道(tunnel)类型的网络包,需要递归解码报文头);
-
网卡设备标识符(livedev ids),在 Suricata 中可通过 livedev.use-for-tracking 选项控制是否将网卡id作为哈希key计算因素。此选项处于启用状态,意味着在哈希计算中会包含网卡设备的标识符,使得来自不同网卡的相同流量被视为不同的流。如果关闭,则不同网卡收到的同一流量视为同一流。
-
vlan id;
-
随机数(32bit),该随机数是在程序启动初期流引擎初始化时生成,在程序的整个生命周期过程中不变。
2.1.2 更新流和包信息
2.1.1 章节已介绍 Suricata 获取网络包后,根据包信息从流哈希表中找到对应流对象。本章节介绍,Suricata 拿到流对象后,会结合流对象和包中的信息,更新两者的标志位信息。
FlowUpdate 函数是入口函数,该函数代码实现如下所示:
/** \brief handle flow for packet** Handle flow creation/lookup*/
static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
{FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);int state = p->flow->flow_state;switch (state) {
#ifdef CAPTURE_OFFLOADcase FLOW_STATE_CAPTURE_BYPASSED: {StatsAddUI64(tv, fw->both_bypass_pkts, 1);StatsAddUI64(tv, fw->both_bypass_bytes, GET_PKT_LEN(p));Flow *f = p->flow;FlowDeReference(&p->flow);FLOWLOCK_UNLOCK(f);return TM_ECODE_DONE;}
#endifcase FLOW_STATE_LOCAL_BYPASSED: {StatsAddUI64(tv, fw->local_bypass_pkts, 1);StatsAddUI64(tv, fw->local_bypass_bytes, GET_PKT_LEN(p));Flow *f = p->flow;FlowDeReference(&p->flow);FLOWLOCK_UNLOCK(f);return TM_ECODE_DONE;}default:return TM_ECODE_OK;}
}
下面主要详细介绍 FlowHandlePacketUpdate 函数的逻辑。我将该函数拆解成多个代码块来介绍 Suricata 如何更新流和包的标志位信息。
(1)FlowGetPacketDirection 函数通过比较端口号大小或者比较IP地址的方法推算出当前报文的方向,即 TOSERVER 还是 TOCLIENT。
const int pkt_dir = FlowGetPacketDirection(f, p);
(2)CAPTURE_OFFLOAD 宏是流量捕获卸载的开关。开启该模式后,Suricata 可按策略将部分流量处理工作卸载到硬件或内核模块,以提升高流量时的性能。例如流量被标记为 bypass 时,Suricata 能通过下发 eBPF/XDP 规则实现底层导流。下面代码块用于更新流对象的 lastts 变量,该变量记录当前流最新包的时间戳(即当前包的时间)。
#ifdef CAPTURE_OFFLOADint state = f->flow_state;if (state != FLOW_STATE_CAPTURE_BYPASSED) {
#endif/* update the last seen timestamp of this flow */if (SCTIME_CMP_GT(p->ts, f->lastts)) {f->lastts = p->ts;const uint32_t timeout_at = (uint32_t)SCTIME_SECS(f->lastts) + f->timeout_policy;if (timeout_at != f->timeout_at) {f->timeout_at = timeout_at;}}
#ifdef CAPTURE_OFFLOAD} else {/* still seeing packet, we downgrade to local bypass */if (SCTIME_SECS(p->ts) - SCTIME_SECS(f->lastts) > FLOW_BYPASSED_TIMEOUT / 2) {SCLogDebug("Downgrading flow to local bypass");f->lastts = p->ts;FlowUpdateState(f, FLOW_STATE_LOCAL_BYPASSED);} else {/* In IPS mode the packet could come from the other interface so it would* need to be bypassed */if (EngineModeIsIPS()) {BypassedFlowUpdate(f, p);}}}
#endif
(3)根据报文方向(TOSERVER/TOCLIENT)更新计数器和标志位:统计流中两个方向的报文数量、字节总数;依据 Flow 对象标志位设置 Packet 标志位状态,并更新流的最小和最大 TTL 值。
/* update flags and counters */if (pkt_dir == TOSERVER) {f->todstpktcnt++;f->todstbytecnt += GET_PKT_LEN(p);p->flowflags = FLOW_PKT_TOSERVER;if (!(f->flags & FLOW_TO_DST_SEEN)) { // 如果在这个流中这个方向没有看到过数据包,那么这是第一个数据包if (FlowUpdateSeenFlag(p)) {f->flags |= FLOW_TO_DST_SEEN;p->flowflags |= FLOW_PKT_TOSERVER_FIRST;}}/* xfer proto detect ts flag to first packet in ts dir */// 当flow(流)已经完成ToServer方向的协议检测(FLOW_PROTO_DETECT_TS_DONE标志被设置)时// 将这个状态从flow结构转移到当前数据包(packet)中// 通过设置PKT_PROTO_DETECT_TS_DONE标志来标记该数据包if (f->flags & FLOW_PROTO_DETECT_TS_DONE) {f->flags &= ~FLOW_PROTO_DETECT_TS_DONE;p->flags |= PKT_PROTO_DETECT_TS_DONE;}FlowUpdateEthernet(tv, dtv, f, p->ethh, true);/* update flow's ttl fields if needed */// 更新流的最小和最大TTL值if (PKT_IS_IPV4(p)) {FlowUpdateTtlTS(f, p, IPV4_GET_IPTTL(p));} else if (PKT_IS_IPV6(p)) {FlowUpdateTtlTS(f, p, IPV6_GET_HLIM(p));}} else {f->tosrcpktcnt++;f->tosrcbytecnt += GET_PKT_LEN(p);p->flowflags = FLOW_PKT_TOCLIENT;if (!(f->flags & FLOW_TO_SRC_SEEN)) {if (FlowUpdateSeenFlag(p)) {f->flags |= FLOW_TO_SRC_SEEN;p->flowflags |= FLOW_PKT_TOCLIENT_FIRST;}}/* xfer proto detect tc flag to first packet in tc dir */if (f->flags & FLOW_PROTO_DETECT_TC_DONE) {f->flags &= ~FLOW_PROTO_DETECT_TC_DONE;p->flags |= PKT_PROTO_DETECT_TC_DONE;}FlowUpdateEthernet(tv, dtv, f, p->ethh, false);/* update flow's ttl fields if needed */if (PKT_IS_IPV4(p)) {FlowUpdateTtlTC(f, p, IPV4_GET_IPTTL(p));} else if (PKT_IS_IPV6(p)) {FlowUpdateTtlTC(f, p, IPV6_GET_HLIM(p));}}
(4)根据当前流状态更新 Packet 状态,流状态包含 FLOW_STATE_NEW、FLOW_STATE_ESTABLISHED、FLOW_STATE_CLOSED、FLOW_STATE_LOCAL_BYPASSED 四种。其中 FLOW_STATE_ESTABLISHED 表示流两端会话已建立:若为 TCP 协议,通过 TcpSession 状态是否为 TCP_ESTABLISHED 判断;若为其他协议,通过两端是否都发送数据(即 FLOW_TO_DST_SEEN 和 FLOW_TO_SRC_SEEN 标志位均置位)判断。
if (f->flow_state == FLOW_STATE_ESTABLISHED) {SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);p->flowflags |= FLOW_PKT_ESTABLISHED;} else if (f->proto == IPPROTO_TCP) {TcpSession *ssn = (TcpSession *)f->protoctx;if (ssn != NULL && ssn->state >= TCP_ESTABLISHED) {p->flowflags |= FLOW_PKT_ESTABLISHED;}} else if ((f->flags & (FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) ==(FLOW_TO_DST_SEEN|FLOW_TO_SRC_SEEN)) {SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);p->flowflags |= FLOW_PKT_ESTABLISHED;if (
#ifdef CAPTURE_OFFLOAD(f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) &&
#endif(f->flow_state != FLOW_STATE_LOCAL_BYPASSED)) {FlowUpdateState(f, FLOW_STATE_ESTABLISHED);}}
(5)如果当前流设置了 FLOW_ACTION_DROP 标志位,则说明这条流上的所有包都应该丢弃,PacketDrop 函数通过设置 Packet 中的标志位标识当前包要丢弃。
if (f->flags & FLOW_ACTION_DROP) {PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_DROP);
}
(6)如果当前流设置了FLOW_NOPACKET_INSPECTION(当前流的所有包不再检查) 和 FLOW_NOPAYLOAD_INSPECTION(当前流的所有包应用数据不再检查)标志位置位,则同步更新到 Pakcet 中。
/*set the detection bypass flags*/
if (f->flags & FLOW_NOPACKET_INSPECTION) {SCLogDebug("setting FLOW_NOPACKET_INSPECTION flag on flow %p", f);DecodeSetNoPacketInspectionFlag(p);
}
if (f->flags & FLOW_NOPAYLOAD_INSPECTION) {SCLogDebug("setting FLOW_NOPAYLOAD_INSPECTION flag on flow %p", f);DecodeSetNoPayloadInspectionFlag(p);
}
2.2 应用数据解析
Suricata 获取流对象并更新 Flow 和 Packet 状态后,按传输层协议进行数据重组和应用数据解析。如果是 TCP 协议则需要通过 stream 引擎进行数据重组操作等,如果是UDP协议则无需重组。最后根据协议类型分别调用 AppLayerHandleTCPData 函数和 AppLayerHandleUdp 函数解析应用层数据。
2.3 报文检测
截止2.2章节应用数据解析后,当前报文的包头和应用数据均解析完毕,状态也同步更新,接下来则执行检测操作。
/* handle Detect */
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
if (detect_thread != NULL) {FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);Detect(tv, p, detect_thread);FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
}
2.4 日志输出
2.3 章节中,报文经检测引擎检测完成后,需通过指定方式输出检测过程中产生的告警日志,该操作通过调用 OutputLoggerLog 函数实现。
TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
{LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;RootLogger *logger = TAILQ_FIRST(&active_loggers);LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store);while (logger && thread_store_node) {logger->LogFunc(tv, p, thread_store_node->thread_data);logger = TAILQ_NEXT(logger, entries);thread_store_node = TAILQ_NEXT(thread_store_node, entries);}return TM_ECODE_OK;
}
2.5 资源释放
报文检测和日志输出后则需要释放过期的数据和资源,重置标志位等操作。
/* Release tcp segments. Done here after alerting can use them. */if (p->flow != NULL) {DEBUG_ASSERT_FLOW_LOCKED(p->flow);if (FlowIsBypassed(p->flow)) {FlowCleanupAppLayer(p->flow);if (p->proto == IPPROTO_TCP) {StreamTcpSessionCleanup(p->flow->protoctx);}} else if (p->proto == IPPROTO_TCP && p->flow->protoctx) {FramesPrune(p->flow, p);FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?STREAM_TOSERVER : STREAM_TOCLIENT);FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);} else if (p->proto == IPPROTO_UDP) {FramesPrune(p->flow, p);}if ((PKT_IS_PSEUDOPKT(p)) ||(p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {if (PKT_IS_TOSERVER(p)) {if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);p->flow->flags &= ~FLOW_TS_APP_UPDATED;}} else {if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);p->flow->flags &= ~FLOW_TC_APP_UPDATED;}}} else {SCLogDebug("not pseudo, no app update: skip");}if (p->flow->flags & FLOW_ACTION_DROP) {SCLogDebug("flow drop in place: remove app update flags");p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);}Flow *f = p->flow;FlowDeReference(&p->flow);FLOWLOCK_UNLOCK(f);}
2.6 流清理维护
Suricata 的 FlowWorker 模块处理完上述操作后,通过 FlowWorkerProcessInjectedFlows 函数从线程共享的流队列 tv->flow_queue 中提取其他线程(如流管理线程)注入的流对象。这些流可能因超时而未检测,或因流哈希表空间不足被强制清理,均需进行强制重组检测。
FlowWorker 模块将这些流对象放入本地流队列 fw->fls.work_queue,由 FlowWorkerProcessLocalFlows 函数处理。该函数并非一次性处理所有流对象,而是在每次处理 Packet 时顺带处理 2 个。但若系统处于退出状态(需处理完所有流后退出)或抓包超时,则会一次性处理完本地队列中的所有流对象。
2.7 总结
Suricata 处理数据时按接收数据包、解码数据包、流处理(FlowWorker)的顺序进行,这三个模块可由单个线程同步调用或多个线程异步调用。
运行模式和接收解码参考下面:
【网络入侵检测】基于Suricata源码分析运行模式(Runmode)https://blog.csdn.net/qq_29490749/article/details/148028102?spm=1001.2014.3001.5501【网络入侵检测】基于源码分析Suricata的解码模块
https://blog.csdn.net/qq_29490749/article/details/147623664?spm=1001.2014.3001.5501【网络入侵检测】基于源码分析Suricata的PCAP模式
https://blog.csdn.net/qq_29490749/article/details/147517295?spm=1001.2014.3001.5501本文主要介绍 FlowWorker 模块对数据包的处理流程,即当前数据包已被 Suricata 接收并完成解码。FlowWorker 模块的主要工作包括:根据数据包获取 Flow 对象、解析应用数据、执行报文检测、输出告警日志、释放过期流资源,以及维护超时或退出状态下的流资源等。