当前位置: 首页 > news >正文

基础组件(六):网络缓冲区设计 和 定时器方案

文章目录

  • 一、用户态网络缓冲区
    • 1. Linux系统如何收发数据包?
      • 接收网络数据包的流程
      • 发送网络数据包的流程(tcp)
    • 2. 网络缓冲区相关问题
      • Q1.为什么为每条连接需要读写缓冲区?
      • Q2.udp和tcp协议是否影响用户态缓冲区设计?
      • Q3.不同网络模型是否影响用户态缓冲区设计?
      • Q4.ip层分片了,为什么tcp还要分段?
    • 3. 网络缓冲区实现
      • 定长buffer(结构简单但频繁数据腾挪)
      • ring buffer (解决数据频繁移动)
      • chain buffer (块状链表)
  • 二、定时器模块
    • 1. 定时器构成
      • 容器(存放定时任务的数据结构)
      • 检测触发机制(看PDF)
    • 2. 定时器优化思路
      • (1)红黑树(平衡二叉搜索树)
      • (2)最小堆(完全二叉树)
      • (3)时间轮(多轮指针)
      • Q: 有关时间轮的一些问题


一、用户态网络缓冲区

1. Linux系统如何收发数据包?

协议栈的构成
对着思维导图/流程图 反复叙述(收/发网络包流程)
在这里插入图片描述
各层数据包表示
在这里插入图片描述

接收网络数据包的流程

在这里插入图片描述

  1. 网卡收到数据包,通过DMA将数据写入内存(ringbuffer结构)
  2. 网卡向CPU发起硬件中断,CPU收到中断请求,根据中断表查找中断处理函数,调用中断处理函数
  3. 中断处理函数将屏蔽中断,发起软件中断
    避免CPU频繁被网卡中断
    使用软中断处理耗时操作,避免执行时间过长,导致CPU没法响应其他硬件中断
  4. 内核ksoftirqd线程负责软中断处理,该线程从ringbuffer中逐个取出数据帧到sk_buff
  5. 从帧头取出IP协议,判断是IPv4还是IPv6,去掉帧头,帧尾
  6. 从IP头看上一层协议是tcp还是udp,根据五元组找到socket,并将数据提取出来放到socket的接受缓冲区
    软中断处理结束后开启硬件中断
  7. 应用程序通过系统调用将socket的接收缓冲区的数据拷贝到应用层缓冲区

发送网络数据包的流程(tcp)

在这里插入图片描述

  1. 应用程序通过系统调用将用户数据拷贝sk_buffer并放到socket的发送缓冲区(udp不使用发送缓冲区)
  2. 网络协议栈从socket的发送缓冲区取出sk_buff,并克隆出一个新的sk_buffer(tcp支持丢失重传)
  3. 向下传递依次增加TCP/UDP头部,IP头部,帧头(MAC头部),帧尾(tcp分段,ip分片)
    TCP 分段是为了根据应用的需要,控制数据流的传输。
    IP 分片是为了保证数据包大小符合网络层的传输限制。
  4. 触发软中断通知网卡驱动程序,有新的网络包需要发送
  5. 网卡驱动程序从发送队列依次取出sk_buff写ringbuffer(内存DMA区域)
  6. 触发网卡发送,发送成功,触发硬件中断,释放sk_buff和ringbuffer内存
    tcp对应的是克隆而来的
    udp对应的是原始的
  7. 当收到tcp报文的ack应答时,将释放原始的sk_buff

2. 网络缓冲区相关问题

Q1.为什么为每条连接需要读写缓冲区?

  1. 读缓冲区
    从读缓冲区取出的数据不一定包含一个完整的数据包。
    生产者的速度 > 消费者的速度。
  2. 写缓冲区
    数据包不能一次性全部传输。
    生产者的速度大于消费者的速度。
    请添加图片描述

不同网络模型是否影响用户态缓冲区设计?

Q2.udp和tcp协议是否影响用户态缓冲区设计?

不会影响, 两者粘包处理方式不同
在这里插入图片描述

Q3.不同网络模型是否影响用户态缓冲区设计?

不会影响, 但两者对内核缓冲区数据读取出来方式不同

  • 接收缓冲区:
    阻塞 IO:线程阻塞,等待数据到达。
    Reactor:通过 IO 多路复用检测缓冲区就绪,使用事件驱动模型通知用户态调用 read。
    Proactor:异步投递请求,内核拷贝数据后,通过完成通知回调用户态。
  • 发送缓冲区:
    阻塞 IO:write() 调用阻塞直到数据写入内核的发送缓冲区。
    Reactor:通过 epoll 监听可写事件,避免缓冲区满时阻塞。
    Proactor:异步 WriteFile() 提交请求,内核完成数据拷贝后通过完成通知回调。

Q4.ip层分片了,为什么tcp还要分段?

if tcp层不分段 + ip层分片的话,当数据传输到ip层,一个小包丢失整个包都要重传。

tcp层分段,ip层分片 的话只需 重传丢失的这小段数据包即可.
udp就不需要分段, 因为udp不提供可靠传输,无重传机制,所以一个小包丢失就是整个包丢弃。


3. 网络缓冲区实现

网络缓冲区设计 : 生产消费者模型, 用队列结构
请添加图片描述
在这里插入图片描述

定长buffer(结构简单但频繁数据腾挪)

实现简单,因为每次取出数据后需要将数据腾挪到头部,都需要对齐,
但空间浪费,空间不足还会引起数据频繁移动。
在这里插入图片描述

ring buffer (解决数据频繁移动)

针对解决:无锁队列ring buffer 、环形缓冲区避免数据频繁移动 √,
但未解决空间伸缩容问题❌
​因为没有腾挪数据可能造成离散数据,会增加系统调用❌
请添加图片描述
头尾指针解决数据导入时的对齐问题
取余操作 > 位运算操作 : m % 2^n == m & (2^n -1)
ring buffer设计
> 缓冲区读取不涉及多线程, 采取编译器屏障
do{asm volatile(‘’ ‘’ :::‘’ memory)}while (0);
CPU屏障 mb( ) rmb( ) wmb( )

chain buffer (块状链表)

  • 特点:
    不需要腾挪数据,避免数据频繁移动√,
    动态扩缩容且无需数据拷贝,也解决空间伸缩容问题√
    造成不连续空间,可能引发多次系统调用❌
    请添加图片描述
    代码实现
    misalign:从开头以及取走的长度,下次从开头加上misalign就是数据开始点
    offset:有效数据长度

二、定时器模块

1. 定时器构成

定时器模块 = 容器组织大量定时任务 + 检测触发机制
作用:组织管理大量延时任务的模块,不过度占用线程,高效处理定时任务。没有定时器系统只能被动等待或手动轮询,会极大浪费 CPU性能 或使任务无法按时触发。

容器(存放定时任务的数据结构)

定时器管理结构(调度容器):存放所有定时任务的核心数据结构,用于高效地插入、删除、查询最近即将触发的任务。

  • 触发时间排序的结构 : 红黑树STL(map, set, multimap, multiset) 、最小堆(priority_queue)
  • 执行序排列的结构: 时间轮(针对当前时间指针做偏移)

检测触发机制(看PDF)

定时任务触发机制(驱动器):是实际触发到期任务的机制,有点类似“闹钟”。
在定时器的管理结构中,虽然我们知道每个任务的目标触发时间,但还必须解决:“如何及时、高效地检测并触发已经到期的任务?”
现在的检测触发方式主要有以下两种设计理念:

  • 采用io多路复用的epoll_awit最后一个参数timeout触发(eg. redis、nginx)
int ret = epoll_wait(epfd, events, max_events,3000); //最多等3秒

这里为等待三秒。使用这些 timeout 参数,可以无需额外定时器结构,也能被动检测“时间是否过去”,这样做可避免轮询带来的cpu消耗;但这种方式也具有一定的缺点:不够精确(毫秒级);如果需要动态更新超时时间(如堆顶任务改变),需不断传入新 timeout;适合小型服务或简化实现。

  • 采用timerfd将定时事件转化为io处理, 让io多路复用进行检测 (eg. workflow)

timerfd是 Linux 提供的一种定时器机制,它的本质是一个文件描述符(fd),可以表示一个定时器;到期后变为 可读 状态(就像 socket 一样);可与 epoll / select 等 I/O 多路复用接口集成;支持一次性或周期性触发;精度高,可达 纳秒级;
主要接口:
1.timerfd_create() 创建定时器,生成一个 fd
2.timerfd_settime() 设置超时时间
3.epoll_ctl() 监听 timerfd 的 超时事件,统一交给 epoll_wait 处理

简单实现

#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <time.h>  // 用于 timespec 和 itimerspec
#include <unistd.h> // close()
#include <functional>
#include <chrono>
#include <set>
#include <memory>
#include <iostream>using namespace std;// 定时器基础结构体,存储任务的唯一 ID 和触发时间,用于排序和查找
struct TimerNodeBase {time_t expire;  // 过期时间(毫秒级)uint64_t id;    // 唯一 ID
};// 继承自 TimerNodeBase,添加回调函数,构成完整的定时任务
struct TimerNode : public TimerNodeBase {using Callback = std::function<void(const TimerNode &node)>;Callback func;  // 任务回调函数// 构造函数,设置任务的 id、到期时间与执行函数TimerNode(int64_t id, time_t expire, Callback func) : func(func) {this->expire = expire;this->id = id;}
};// 定义小于运算符,用于 std::set 排序,先按到期时间排,再按 id 排
bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) {if (lhd.expire < rhd.expire) {return true;} else if (lhd.expire > rhd.expire) {return false;} else return lhd.id < rhd.id;
}// 定时器管理器类,负责定时任务的添加、删除、触发检测与执行
class Timer {
public:// 获取当前时间(单位:毫秒)static inline time_t GetTick() {return chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now().time_since_epoch()).count();}// 添加定时任务,生成唯一 ID,根据到期时间插入到排序容器中TimerNodeBase AddTimer(int msec, TimerNode::Callback func) {time_t expire = GetTick() + msec;// 插入时若早于现有最大时间,用普通 emplace;否则使用 emplace_hint 优化if (timeouts.empty() || expire <= timeouts.crbegin()->expire) {auto pairs = timeouts.emplace(GenID(), expire, std::move(func));return static_cast<TimerNodeBase>(*pairs.first);}auto ele = timeouts.emplace_hint(timeouts.crbegin().base(), GenID(), expire, std::move(func));return static_cast<TimerNodeBase>(*ele);}// 删除定时任务(若存在),通过 node 信息匹配void DelTimer(TimerNodeBase &node) {auto iter = timeouts.find(node);if (iter != timeouts.end())timeouts.erase(iter);}// 处理所有已到期任务:按时间顺序遍历,执行并移除void HandleTimer(time_t now) {auto iter = timeouts.begin();while (iter != timeouts.end() && iter->expire <= now) {iter->func(*iter);  // 执行任务回调iter = timeouts.erase(iter);  // 移除已执行任务}}// 根据最早过期任务更新 timerfd,确保 epoll 能及时返回virtual void UpdateTimerfd(const int fd) {struct timespec abstime;auto iter = timeouts.begin();if (iter != timeouts.end()) {abstime.tv_sec = iter->expire / 1000;abstime.tv_nsec = (iter->expire % 1000) * 1000000;} else {abstime.tv_sec = 0;abstime.tv_nsec = 0;}struct itimerspec its = {.it_interval = {},.it_value = abstime};timerfd_settime(fd, TFD_TIMER_ABSTIME, &its, nullptr);}private:// 全局递增 ID 生成器,用于唯一标识每个定时任务static inline uint64_t GenID() {return gid++;}static uint64_t gid;// 使用 std::set 按过期时间排序任务,便于快速获取最早任务set<TimerNode, std::less<>> timeouts;
};
uint64_t Timer::gid = 0;// 主函数,注册 timerfd 到 epoll,添加定时任务,并进入事件循环
int main() {// 创建 epoll 实例,用于监听 timerfd 事件int epfd = epoll_create(1);// 创建 timerfd(Linux 提供的高精度定时器),用于精确唤醒int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);struct epoll_event ev = {.events=EPOLLIN | EPOLLET};// 将 timerfd 注册到 epoll,使用边缘触发epoll_ctl(epfd, EPOLL_CTL_ADD, timerfd, &ev);unique_ptr<Timer> timer = make_unique<Timer>();int i = 0;// 添加定时任务,测试 1s、2.1s、3s 三个任务触发效果timer->AddTimer(1000, [&](const TimerNode &node) {cout << Timer::GetTick() << " node id:" << node.id << " revoked times:" << ++i << endl;});timer->AddTimer(3000, [&](const TimerNode &node) {cout << Timer::GetTick() << " node id:" << node.id << " revoked times:" << ++i << endl;});auto node = timer->AddTimer(2100, [&](const TimerNode &node) {cout << Timer::GetTick() << " node id:" << node.id << " revoked times:" << ++i << endl;});// 删除其中一个定时任务,用于测试删除功能是否生效timer->DelTimer(node);struct epoll_event evs[64] = {0};while (true) {// 每次 epoll_wait 前,更新 timerfd,设置最近任务触发时间timer->UpdateTimerfd(timerfd);int n = epoll_wait(epfd, evs, 64, -1);time_t now = Timer::GetTick();// epoll 触发后处理已过期任务,执行对应回调函数timer->HandleTimer(now);}close(timerfd);close(epfd);return 0;
}

2. 定时器优化思路

(1)红黑树(平衡二叉搜索树)

请添加图片描述
第一个head红节点本身不存数据, 有三个指针: 左连begin() 红黑树最小元素、中连root、右连最右侧元素.end() 失效迭代器不存数据

右连end()不是 红黑树最大元素=crbegin()=end()-1才是倒叙数第一个元素

平衡 二叉 搜索树 : 维持增删改查操作 时间复杂度在O(log n),使每次比较都能排除一半的节点(二分查找)

平衡左右子树高度 ==> 从root出发左右子树 黑节点个数一样

红黑树特征:有序、平衡、黑节点高度一致。
红黑树操作:增删改查,左旋转右旋转,重新着色。
红黑树在定时器中的应用:组织定时任务,保持平衡。

clock-timer.h   #时间表盘的实现timewheel.h     #时间轮的实现   mh-timer        #最小堆rbt-timer       #红黑树timer_with_timerfd.cc  # 面试推荐的方法timerNodeBase

面试题: 现场写个 定时器, 建议使用 multimap

(2)最小堆(完全二叉树)

请添加图片描述
请添加图片描述
最小堆操作:添加节点和删除节点,时间复杂度为O(log n)
最小堆在定时器中的应用:快速找到最小值,驱动定时器运行

(3)时间轮(多轮指针)

时间轮通常用于多线程环境, 红黑树、最小堆用于单线程

时间轮 : 时间指针 + 最小精度+ 最大范围
时间指针 :按照最小精度进行移动, 添加定时任务就是对时间指针的偏移
最小精度1s : 能接受的误差
最大范围 :  能支持的最大间隔时间任务

请添加图片描述
为什么设计多个时间轮 : 时间指针 + 最小精度1s + 最大范围层级 : 解决空间浪费问题
请添加图片描述
时间轮就像一个时钟表盘,将时间划分为等长的“槽(slot)”,每个槽代表一个固定的时间区间。例如:一共 60 个槽,每个槽表示 100ms,那么一圈表示 6 秒;定时任务被放入某个槽位中,等待“时针”转动到这个槽时触发。
(1)基本结构
时间轮使用指针数组存储任务,每个格子(Slot)对应一个任务链表,存储在同一时间触发的任务。

struct TimerTask {int rotations;      // 还有几圈后触发std::function<void()> callback;TimerTask* next;
};std::vector<TimerTask*> slots;  // 每个槽是一个链表
int current_slot = 0;           // 当前指针位置

(2)任务添加

将一个设定延迟时间的任务放入时间轮的合适位置,等待将来触发。
变量:current_slot: 当前时间轮指针位置;interval: 每个槽代表的时间间隔;slot_count: 时间轮的总槽数;timeout: 任务延迟时间;

ticks = timeout / interval //计算出需要延后的位置
slot_index = (current_slot + ticks) % slot_count //计算出放置任务的位置
rotations = ticks / slot_count   // 表示再转多少圈后才能触发

(3)任务触发
在时间轮每“跳动”一格(即经过一个 interval 时间)时,判断当前槽位中哪些任务可以执行。每次 tick() 时,指针移动到下一个槽位 current_slot++;遍历当前槽位的链表中所有任务。对每个任务:若 rotations == 0:立即触发执行;否则:rotations–,等待下一轮。
请添加图片描述
(4)任务的重新映射
当时间轮转满一圈:
单层时间轮:超时任务需要重新映射到新槽位
多层时间轮:
高层时间轮(如分钟层)每次移动一格(触发低层重新映射)
任务触发时间减去当前时间,计算新槽位存放


Q: 有关时间轮的一些问题

(1)时间轮怎么保证线程安全?

  • 时间轮通常涉及多个线程(定时线程、任务添加线程、任务执行线程),需要处理并发问题:
    1.任务添加同步:
    使用锁(mutex、spinlock)或 无锁数据结构(lock-free queue)确保线程安全
    或者使用事件队列(event queue),让时间轮线程自行处理任务添加
    2.任务检测并发:
    读写锁(RWLock):防止多个线程同时访问任务链表
    CAS(Compare-And-Swap):用于无锁操作
    3.任务执行并发:
    任务执行通常由线程池处理,避免阻塞时间轮

(2)时间轮线程为什么不直接执行任务?

  • 时间轮的核心职责是检测任务是否超时,直接执行任务会有以下问题:
    1.任务可能耗时较长,导致时间指针无法及时移动,影响任务触发的准确性
    2.影响高精度定时,多个任务需要在不同时间执行,可能造成任务堆积
    3.更好的线程管理:时间轮线程 只负责任务检测,将任务放入 任务队列 或 线程池 处理任务执行线程 由 线程池 管理,提高并发执行能力。
    在这里插入图片描述

优秀笔记:
1.1 用户态网络缓冲区
1.2 用户态网络缓冲区设计
2.1 基于红黑树,时间轮,最小堆的定时器方案
2.2 定时器方案红黑树,时间轮,最小堆
参考学习:https://github.com/0voice

http://www.dtcms.com/a/308192.html

相关文章:

  • TTS语音合成|GPT-SoVITS语音合成服务器部署,实现http访问
  • Vue3+Vite项目如何简单使用tsx
  • nl2sql grpo强化学习训练,加大数据量和轮数后,准确率没提升,反而下降了,如何调整
  • PostgreSQL dblink 与 Spring Boot @Transactional 的事务整合
  • Text2SQL 智能问答系统开发-预定义模板(二)
  • docker离线安装mysql镜像
  • 记录几个SystemVerilog的语法——覆盖率
  • 基于MATLAB的GUI来对不同的(彩色或灰色)图像进行图像增强
  • 【国内电子数据取证厂商龙信科技】内存取证
  • 法式基因音响品牌SK(SINGKING AUDIO)如何以硬核科技重塑专业音频版图
  • 防御保护第一次作业
  • AI Gateway 分析:OpenRouter vs Higress
  • python基础语法3,组合数据类型(简单易上手的python语法教学)(课后习题)
  • BFT平台:打造科研教育“最强机器人矩阵”
  • 自动驾驶控制算法——PID算法
  • 蓝桥杯----DS18B20温度传感器
  • vue3+arcgisAPI4示例:轨迹点模拟移动(附源码下载)
  • InfluxDB 与 Python 框架结合:Django 应用案例(二)
  • ASIC芯片简介
  • LangGraph认知篇-Send机制
  • TypeScript 基础介绍(二)
  • QT6 Python UI文件转换PY文件的方法
  • 如何为C#加入EPPlus 包
  • 【Flask基础②】 | 路由、响应与异常处理
  • 微服务快速集成 TraceId
  • 企业智脑1.3.2版本发布,设备管理+智能体OS双核驱动,重构数字生产力边界
  • 【车联网kafka】Kafka核心架构与实战经验(第二篇)
  • 网络与信息安全有哪些岗位:(4)应急响应工程师
  • 【MySQL集群架构与实践3】使用Dcoker实现读写分离
  • VuePress 使用详解