当前位置: 首页 > news >正文

【中间件】brpc_基础_TimerThread

文章目录

  • TimerThread
    • 1 简介
    • 2 主要设计点
      • 2.1 数据结构:分层时间轮(Hierarchical Timing Wheel)
      • 2.2 线程模型
      • 2.3 任务管理
    • 3 关键代码分析
      • 3.1 类定义(`timer_thread.h`)
      • 3.2 时间轮初始化(`timer_thread.cpp`)
      • 3.3 任务调度(`timer_thread.cpp`)
      • 3.4 线程主循环(`timer_thread.cpp`)
    • 4 性能优化
    • 5 关键参数与配置
    • 6 应用场景
    • 7 总结

TimerThread

1 简介

BRPC 中用于 高效管理定时任务 的组件,负责调度延时任务(如 RPC 超时处理、周期性任务)。

设计目标:支持高并发、低延迟的定时任务管理,

适用场景:大规模分布式系统中的超时控制和任务调度。


2 主要设计点

2.1 数据结构:分层时间轮(Hierarchical Timing Wheel)

  • 时间轮层级:BRPC 使用 多层时间轮(例如秒级、毫秒级),每层时间轮覆盖不同的时间精度和范围,减少任务迁移频率。
  • 时间槽(Bucket):每个时间轮分为多个槽位,任务按到期时间分布到对应槽位中。
  • 任务迁移:高层时间轮的任务在到期后迁移到低层时间轮,逐步细化调度精度。

2.2 线程模型

  • 独立线程TimerThread 运行在单独的线程中,通过 bthread 异步唤醒usleep 轮询 检查任务到期。
  • 低延迟唤醒:使用 butexepoll 实现高效休眠唤醒机制,减少 CPU 空转。

2.3 任务管理

  • 任务结构
    struct TimerTask {int64_t expiration_ms;  // 到期时间(绝对时间戳)void (*fn)(void*);      // 回调函数void* arg;              // 回调参数TimerTask* next;        // 链表指针
    };
    
  • 任务插入:根据到期时间计算所属时间轮层级和槽位,插入对应链表。
  • 任务执行:到期槽位的任务链表被批量取出并执行回调。

3 关键代码分析

3.1 类定义(timer_thread.h

class TimerThread {
public:TimerThread();~TimerThread();// 启动定时器线程int start(const TimerThreadOptions* options);// 提交定时任务(绝对时间 expiration_ms)void schedule(void (*fn)(void*), void* arg, int64_t expiration_ms);// 停止线程void stop();private:// 时间轮层级定义struct Wheel {int64_t tick_ms;       // 本层时间轮每个槽位的时间跨度int num_slots;         // 槽位数量TimerTask** buckets;   // 槽位任务链表int64_t current_tick;  // 当前指向的槽位};std::vector<Wheel> _wheels;// 线程控制bthread_t _tid;std::atomic<bool> _stop;// 任务队列同步butil::Mutex _mutex;butil::ConditionVariable _cond;
};

3.2 时间轮初始化(timer_thread.cpp

int TimerThread::start(const TimerThreadOptions* options) {// 初始化多层时间轮(例如:毫秒级、秒级)_wheels.emplace_back(Wheel{1, 64});    // 1ms per tick, 64 slots (64ms range)_wheels.emplace_back(Wheel{64, 64});   // 64ms per tick, 64 slots (~4s range)_wheels.emplace_back(Wheel{4096, 64}); // 4096ms per tick, 64 slots (~4min range)// 启动线程return bthread_start_background(&_tid, nullptr, TimerThread::run, this);
}

3.3 任务调度(timer_thread.cpp

void TimerThread::schedule(void (*fn)(void*), void* arg, int64_t expiration_ms) {TimerTask* task = new TimerTask{expiration_ms, fn, arg, nullptr};butil::MutexLockGuard lock(_mutex);// 计算任务所属的时间轮层级和槽位int level = find_wheel_level(expiration_ms);int slot = calculate_slot(expiration_ms, level);// 插入对应槽位链表task->next = _wheels[level].buckets[slot];_wheels[level].buckets[slot] = task;
}

3.4 线程主循环(timer_thread.cpp

void* TimerThread::run(void* arg) {TimerThread* t = static_cast<TimerThread*>(arg);while (!t->_stop.load(std::memory_order_relaxed)) {int64_t now_ms = butil::gettimeofday_ms();// 处理所有到期任务for (auto& wheel : t->_wheels) {int64_t current_tick = now_ms / wheel.tick_ms;int slot = current_tick % wheel.num_slots;TimerTask* head = wheel.buckets[slot];while (head != nullptr) {TimerTask* next = head->next;if (head->expiration_ms <= now_ms) {head->fn(head->arg);  // 执行回调delete head;} else {// 重新插入更精确的时间轮t->reschedule(head);}head = next;}wheel.buckets[slot] = nullptr;}// 休眠至下一个检查点usleep(next_check_interval_ms * 1000);}return nullptr;
}

4 性能优化

  • 分层时间轮:减少任务迁移次数,提升插入和删除效率。
  • 批量处理:每次处理一个槽位的所有任务,减少锁竞争。
  • 惰性删除:任务未到期时重新插入更合适的时间轮,避免重复计算。
  • 无锁任务队列:使用 bthread 的原子操作减少同步开销。

5 关键参数与配置

  • 时间轮层级:通过 TimerThreadOptions 可配置层级数量和每层参数。
  • 检查间隔:动态计算下一个最近任务的到期时间,优化休眠时长。
  • 线程优先级:通过 bthread_attr_t 设置定时器线程的调度优先级。

6 应用场景

  1. RPC 超时控制:为每个 RPC 请求设置超时任务,超时后取消请求。
  2. 心跳检测:周期性发送心跳包,维护连接活性。
  3. 缓存刷新:定时刷新本地缓存,保证数据一致性。

7 总结

TimerThread 通过分层时间轮和高效线程调度机制,实现了高吞吐、低延迟的定时任务管理。其核心优势包括:

  • 高效任务插入:平均时间复杂度 O(1)。
  • 低调度开销:通过分层减少任务迁移。
  • 线程安全:结合 bthread 的原子操作和锁机制。

改进方向

  • 支持高精度定时:如微秒级定时(需调整时间轮层级)。
  • 任务取消接口:允许动态取消已提交的任务。
  • 资源限制:防止恶意任务耗尽内存。

相关文章:

  • Visual Studio 项目转Qt项目
  • 抖音生活服务“五一”数据:小城游火爆,“食住”消费增速显著
  • LeetCode:返回倒数第k个结点
  • 【论文阅读】Joint Deep Modeling of Users and Items Using Reviews for Recommendation
  • 部署GM DC Monitor 一体化监控预警平台
  • CGAL:Circular_kernel_2内核
  • 设计模式-基础概念学习总结(继承、多态、虚方法、方法重写)
  • 策略模式(Strategy Pattern)
  • ansible基础-优化
  • 路由器详细讲解
  • 驱动开发硬核特训 · Day 28(上篇):pinctrl 子系统详解与实战分析
  • 【阿里云大模型高级工程师ACP习题集】2.9 大模型应用生产实践(下篇)
  • 详解 FFMPEG 交叉编译 `FLAGS` 和 `INCLUDES` 的作用
  • graphviz和dot绘制流程图
  • Debezium TableSchemaBuilder详解
  • 从Excel到高级工具:数据分析进阶指南
  • Android Compose 中 CompositionLocal 的全面解析与最佳实践
  • 把Android设备变成“国标摄像头”:GB28181移动终端实战接入指南
  • Gradio全解20——Streaming:流式传输的多媒体应用(5)——基于WebRTC的摄像头实时目标检测
  • jwt身份验证和基本的利用方式
  • 正荣地产:前4个月销售14.96亿元,控股股东已获委任联合清盘人
  • 多人称华为手机忽现拍照模糊疑存缺陷,售后回应:主摄像头故障
  • 丰田汽车:美国关税或导致4、5月损失1800亿日元,新财年净利润下滑三成
  • 韩正出席庆祝中国欧盟建交50周年招待会并致辞
  • 旧宫新语|瑞琦:再探《古玩图》——清宫艺术品的前世与今生
  • 日本来信|劳动者的书信④