Linux时间轮定时器
前言
该文主要实现一个基于Linux的时间轮定时器,可以理解成一个定时任务管理器,我们可以往里面添加一些定时任务,任务将被在固定时间后执行,同时也可以刷新现有的定时任务达到延时执行任务的效果。
- 功能:定时任务模块,让任务能在指定时间后执行
- 意义:组件内部用于释放非活跃连接(希望非活跃连接在N秒后被释放 )
- 功能设计:
- 添加定时任务
- 刷新定时任务(使定时任务重新开始计时 )
- 取消定时任务
可以用于HTTP服务器对于非活跃链接的超时释放。
创建定时器
timerfd_create
timerfd_create
函数用于创建一个新的定时器对象,并返回一个与之关联的文件描述符。Linux 系统下的系统调用函数
timerfd_create
,用于创建一个定时器对象,返回一个文件描述符。 定时器到期时,系统自动向其文件描述符写入 8 字节数据(uint64_t 类型表示 “距离上一次读取到现在超时次数”)。
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
参数:
-
clockid
:指定定时器所基于的时钟源。主要有两个选项CLOCK_REALTIME
:系统实时时间。它的基准时间是 1970年1月1日(UTC),并且会随着系统时间的修改(例如,由系统管理员或NTP同步)而改变。适用于需要与系统日历时间保持同步的定时任务。CLOCK_MONOTONIC
:系统单调时间。它从某个未指定的起点(通常是系统启动时间)开始计算,且不受系统时间修改的影响。这是大多数定时任务的推荐选择,因为它能保证时间的单调递增,避免因系统时间调整导致定时器行为异常。
-
flags
:用于设置文件描述符的属性。可以是以下值的按位或运算结果-
0
:使用默认的阻塞模式。 -
TFD_NONBLOCK
:为文件描述符设置非阻塞(Non-blocking)模式。 -
TFD_CLOEXEC
:设置执行时关闭(Close-on-exec)标志,这意味着当程序调用exec
系列函数时,这个文件描述符会自动被关闭,防止它被新程序继承。
-
返回值:
- 成功:返回一个新创建的文件描述符。
- 失败:返回 -1,并设置相应的
errno
。
理解阻塞:
用 timerfd_create
创建的文件描述符,默认是阻塞模式。
比如用 read(tfd, ...)
读定时器到期事件时:
- 若定时器没到期,
read
会卡住(阻塞),线程/进程暂停执行,直到定时器到期、有数据可读才返回。
这种阻塞方式是比较适合这里的需要的,所以使用默认设置即可。
timerfd_settime
timerfd_settime
函数用于启动、停止或重新配置由timerfd_create
创建的定时器。
#include <sys/timerfd.h>
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
参数:
-
fd
:由timerfd_create
返回的文件描述符。 -
flags
:控制定时器的类型。0
:表示new_value
中设置的时间是相对时间(例如,从当前时刻开始计算 5 秒后超时)。
相对时间:是以当前时间为起始点来计算定时器的超时时间,如:在 12 点 0 分 0 秒调用 timerfd_settime ,那么定时器会已该时间作为基准判断是否超时。
TFD_TIMER_ABSTIME
:表示new_value
中设置的时间是绝对时间(例如,在某个特定的 Unix 时间戳超时)。使用绝对时间可以避免某些潜在的计时差。
-
new_value
:指向struct itimerspec
结构的指针,用于指定新的定时器设置。这个结构体定义如下:struct timespec {time_t tv_sec; /* 秒 */long tv_nsec; /* 纳秒 */ }; struct itimerspec {struct timespec it_interval; /* 定时器的间隔周期(用于周期性定时器)*/struct timespec it_value; /* 定时器的第一次超时时间 */ };
-
it_value
:指定定时器第一次超时的时间。如果其两个字段(tv_sec
和tv_nsec
)都为 0,则停止现有的定时器。如果非零,则启动或重新配置定时器。 -
it_interval
:指定定时器第一次超时后的间隔周期。如果其两个字段都为 0,则该定时器是一次性的,只会超时一次。如果非零,则定时器会以该间隔周期性地触发。
首次超时指启动定时器后,经过设定时长即判定为超时;周期间隔指首次超时发生后,每隔设定时长就判定一次超时 。 比如 12 点 0 分 0 秒调用函数,若首次超时设为 3 秒、周期间隔设为 2 秒:定时器会在 12 点 0 分 3 秒触发第一次超时,此后每隔 2 秒(12 点 0 分 5 秒、12 点 0 分 7 秒…… )触发一次超时 。
-
-
old_value
:如果不为NULL
,函数会将定时器之前的设置存入这个指针指向的结构中。如果不需要,可以设置为NULL
。
返回值:
- 成功:返回 0。
- 失败:返回 -1,并设置相应的
errno
简单使用
#include <iostream>
#include <unistd.h>
#include <stdint.h>
#include <sys/timerfd.h>int main()
{//创建定时器int timefd = timerfd_create(CLOCK_MONOTONIC,0);//设置定时时间struct itimerspec itm;//设置初始超时时间itm.it_value.tv_sec = 1;itm.it_value.tv_nsec = 0; //纳秒部分//设置周期超时时间itm.it_interval.tv_sec = 1;itm.it_interval.tv_nsec = 0; //纳秒部分//启动定时器timerfd_settime(timefd,0,&itm,NULL);while(true){uint64_t tmp;//读取8字节数据int ret = read(timefd,&tmp,sizeof(tmp));if(ret < 0){return -1;}std::cout << "距离上次读取超时了" << tmp << "次" << std::endl;sleep(1);}return 0;
}
运行结果:
因为每次循环都会读取一遍,所以每次读取都距离上次读取时的超时时间都是1次。
管理定时任务
该时间轮采用循环队列,大致原理图如下:
如何添加定时任务?
例如要添加一个n秒后执行的定时任务,我们只需在时间轮数组下标为(tick + n)% capacity,加入该任务即可。(capacity为时间轮长度)
如何执行定时任务?
tick指针每秒往后走一步,走到哪就执行哪定时任务。
如果定时时间很长,例如有好几年,一年就有 31,536,000秒,难道要开这么大的时间轮数组?
对于更长的时间轮,采用如下图方案:
图中只列举到时级时间轮,如果有更大的需求可以同理添加。
-
时级时间轮:走到哪就把哪的任务根据剩余时间添加到分级时间轮。
-
分级时间轮:走到哪就把哪的任务根据剩余时间添加到秒级时间轮。
-
秒级时间轮:走到哪就执行哪的任务
举个例子:
现在有一个两小时28分钟11秒的定时任务,首先它别添加到时级时间轮,设置定时时间两小时。时级时间轮的tick走两步后找到该任务,发现该任务还剩28分钟11秒,然后将它添加到分级时间轮,设置定时时间28分钟。分级时间轮的tick走28步后找到该任务,发现该任务还剩11秒,然后将他添加到秒级时间轮,设置定时时间11秒。秒级时间轮的tick走11秒后找到该任务,已无剩余时间,执行该任务。
这样就可以极大的减少空间开销达到更大的计时目的。
延时任务
在HTTP服务器中每有一个客户端建立连接,服务器就会开始一个定时任务,如果在定时任务期间该客户端没有任何事件到来,我们就可以销毁连接。但是如果定时任务期间客户端有I/O事件到来,我们就需要刷新定时任务的时间,重新计时。
在我们的时间轮中如何实现?
遍历找到任务,销毁再重新添加吗?
效率太慢了,而且不够优雅!!!
通过 类的析构函数 +
shared_ptr
智能指针 实现定时任务的延时控制。利用智能指针的引用计数和析构时机,间接让定时任务“延迟生效”。
将定时任务封装为一个类TimerTask
,该类主要包含了定时任务的回调函数。
最重要的是该类的析构函数是执行定时任务回调函数,并且时间轮中不再存储任务,而是存储任务类TimerTask
的shared_ptr
智能指针。
有何妙处?
首先我们需要了解shared_ptr
指针指向的对象,只有当所有指向这个对象的shared_ptr
指针被销毁,这个对象才会被真正销毁,并调用析构函数。
当我们需要刷新一个定时任务时我们无需销毁之前的定时任务,只需要直接添加一个新的定时任务,因为我们添加的是shared_ptr
指针,所以此时就有两个shared_ptr
指针指向该任务,如下图:
不难看出我们的时间轮其实是一个二维数组,当tick走到第一个任务1所在时间轮下标1时,直接调用wheel[tick].clear(),这样就可以O(1)地销毁当前下标下的所有shared_ptr
指针,但是此时任务1并没有被销毁,因为下标4还有根指针指针指向任务1,只有当tick走到4时,销毁所有智能指针后才会调用任务1TimerTask
的析构函数,也就是执行定时任务,这样就巧妙地做到了延时任务地目的。
注意,我们刷新任务时,需要根据已有的定时任务shared_ptr
创建一个新的shared_ptr
指针指向该任务类,我们肯定不能遍历去找该任务,效率太慢。
所以引入任务id,对于每个任务添加一个唯一性标识任务id,再引入一个哈希表建立id-weak_ptr的映射,这里一定不能建立id-shared_ptr的映射,因为这个shared_ptr
是会增加引用计数的,也就是说这个引用计数会一直在,定时任务将永远不会得到执行,而weak_ptr
不会增加引用计数,我们还可以根据weak_ptr
构建shared_ptr
来添加延时任务。
所以现在刷新任务,我们只需根据任务id找到对应的weak_ptr
构建shared_ptr
加入时间轮,以达到延时任务的目的。
源代码:
#include <iostream>
#include <functional>
#include <unistd.h>
#include <memory>
#include <cstdint>
#include <vector>
#include <unordered_map>using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:uint64_t _id; // 定时器的iduint32_t _timeout; // 定时任务的超时时间bool _canceled; // 定时任务是否被取消TaskFunc _task_cb; // 定时器对象要执行的定时任务ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息public:TimerTask(uint64_t id,uint32_t timeout, const TaskFunc& cb):_id(id),_timeout(timeout),_task_cb(cb),_canceled(false){}~TimerTask(){//执行对应的定时任务if(_canceled == false) _task_cb();_release();}void Cancel(){_canceled = true;}void SetRelease(const ReleaseFunc& cb){_release = cb;}uint32_t DelayTime(){return _timeout;}
};class TimerWheel
{
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick; //当前的秒针,秒针走到哪,就释放哪,执行哪的任务int _capacity; // 时间轮的最大容量,也就是最大延迟时间std::vector<std::vector<PtrTask>> _wheel; // 时间轮,一个二维数组std::unordered_map<uint64_t,WeakTask> _timers; // id to Weaktask 的hash映射//删除定时任务void RemoveTimer(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){_timers.erase(it);} }public:TimerWheel():_capacity(60),_tick(0),_wheel(_capacity){}//添加定时任务void TimerAdd(uint64_t id,uint32_t timeout,const TaskFunc& cb){//构建shared_ptr指针PtrTask ptr(new TimerTask(id,timeout,cb));ptr->SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id));//找到需要存放的位置int pos = (_tick + timeout) % _capacity;//尾插定时任务指针 _wheel[pos].push_back(ptr);//更新hash_timers[id] = WeakTask(ptr);}//刷新某定时任务void TimerRefresh(uint64_t id){//通过timers中的WeakTask构造一个shareed_ptr出来,添加到轮子的新位置中auto it = _timers.find(id);if(it == _timers.end()){//定时任务不存在,也就不需要刷新了return;}//更具weak_ptr转化shared_ptrPtrTask ptr = it->second.lock();//找到新位置,插入int timeout = ptr->DelayTime();int pos = (_tick + timeout) % _capacity;_wheel[pos].push_back(ptr);}//取消某定时任务void TimerCancel(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return;}PtrTask ptr = it->second.lock();if(ptr) ptr->Cancel();}//秒针往后走一部,这个函数应该每秒执行一次;void RunTimerTask(){_tick = (_tick + 1) % _capacity;// 清空指定位置的数组,就把在该位置的所有shared_ptr全部释放掉了_wheel[_tick].clear();}
};
// 测试代码
// class Test
// {
// public:
// Test()
// {
// std::cout << "构造" << std::endl;
// }
// ~Test()
// {
// std::cout << "析构" << std::endl;
// }
// };// void DelTest(Test* t)
// {
// delete t;
// }// int main()
// {
// TimerWheel tw;// Test* t = new Test();// tw.TimerAdd(888,5,std::bind(DelTest,t));// for(int i = 0;i < 5;i++)
// {
// sleep(1);
// tw.TimerRefresh(888);//刷新任务
// tw.RunTimerTask();//秒针向后走
// std::cout << "刷新了一下定时任务,5秒钟后销毁!" << std::endl;
// }// while(true)
// {
// sleep(1);
// std::cout << "---------------------------" << std::endl;
// tw.RunTimerTask();//秒针往后走,但不刷新任务
// }// return 0;
// }
测试代码运行结果: