C++基础组件
一、定时器
初识定时器
定时器是什么?
定时器是一个项目中,组织管理延时任务的模块,帮助延时任务高效的度过延迟的那段时间,而非sleep阻塞等待。
为什么要有定时器?
如果没有定时器的话,想要3s后给用户弹出一个通知,调用sleep(3),那这3s线程就会被阻塞,也无法去处理其他任务,这是低效的行为。
定时器是怎么“高效”的?
参考epoll的事件通知方法,如果能够做到延迟的这段时间线程去处理其他任务,在时间结束后可以给到线程一个通知,就可以最大限度利用cpu资源。
定时器的实现架构?
基于上面高效的理解,定时器需要一个数据结构存储延时事件 + 一个触发机制。
实现定时器的容器是什么?
对于这个存储的数据结构,需要能够根据插入事件触发的时间自动进行排序,所以可以是红黑树实现的multiset / multimap,或者是最小堆。还可以根据执行顺序进行组织,比如时间轮。
时间轮是什么?
通过空间换时间。将时间分成三级,时、分、秒,每一级都有对应的格子数,每个格子存储的是一个链表,当时间指针走到对应格子的时候执行链表中的所有事件。如果要添加一个1h5min20s后执行的任务,就先挂到(当前小时 + 1) % 24的格子下面,当走到这个格子下面的时候,将他重新映射到(当前分钟 + 5) % 60的格子下面,当分针也走到这个格子的时候,再降级到对应秒针格子下面。再走到对应格子的时候,执行对应事件。
实现定时器的触发机制是什么?
将定时器的超时处理转为io处理,Linux内核提供了timerfd,以此来在io多路复用中注册一个读事件,比如select的FD_SET、epoll的epoll_ctl,在发起io时系统调用陷入内核态处理,在io响应时返回用户态。
手写定时器
思路:需要实现一个定时器timer类,里面有容器存储定时事件,每个定时事件也是一个类(结构体),有自己的执行时间timeout,有自己的回调函数callback。通过timer类下WaitTime函数计算容器中所有事件里最小的等待时间diff,作为epoll的第四个参数,这样epoll就会规定在diff时间后返回。而将io处理转为事件处理的是一个timer类下的Handle函数,在epoll等待最小时间返回后,表明有事件可以执行了,那么调用Handle函数,在函数内调用事件的回调函数,容器也删除这个事件。
代码:
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <time.h>
#include <unistd.h>
#include <functional>
#include <chrono>
#include <set>
#include <memory>
#include <iostream>class TimerNode {
public:friend class Timer;TimerNode(uint64_t timeout, std::function<void()> callback):timeout_(timeout),callback_(std::move(callback)){}//move避免拷贝
private:uint64_t timeout_;std::function<void()> callback_;
};
class Timer {
public:~Timer() {}static uint64_t GetCurrentTime() {using namespace std::chrono;return duration_cast<milliseconds>(steady_clock::now.time_since_epoch()).count();//steady_clock不受系统时间调整影响}TimerNode* AddTimerNode(uint64_t diff, std::function<void()> cb) {TimerNode* node = new TimerNode(GetCurrentTime + diff, std::move(cb));//node的类型是指针if (timer_map_.empty() == 0 && node->timeout_ > timer_map_.rbegin()->first) {auto it = timer_map_.emplace_hint(timer_map_.crbegin().base(), std::make_pair(node->timeout_, std::move(node)));return it->second;}else {auto it = timer_map_.insert(std::make_pair(node->timeout_,node));return it->second;}}void DelTimeout(TimerNode* node) {auto it = timer_map_.equal_range(node->timeout_);//先找值相同的 比直接遍历更快for (auto iter = it.first; iter != it.second; iter++) {if (iter->second == node) {timer_map_.erase(iter);break;}}}int WaitTime() {//最小等待时间auto iter = timer_map_.begin();if (iter == timer_map_.end()) {return -1;}uint64_t diff = iter->first - GetCurrentTime();return diff > 0 ? diff : 0;}void HandleTimeout() {auto iter = timer_map_.begin();while (iter != timer_map_.end() && iter->first <= GetCurrentTime()) {iter->second->callback_();//指向事件的指针 调用事件函数iter = timer_map_.erase(iter);}}
private:std::multimap<uint64_t, TimerNode*> timer_map_;
};int main() {int epfd = epoll_create(0);if (epfd == -1) {std::cerr << "epoll create error: " << errno << std::endl;return -1;}Timer timer;int i;timer.AddTimerNode(1000, [&]() {//[&]只会捕获 lambda 体内实际使用的变量std::cout << "timeout 1 second: " << i++ << std::endl;});epoll_event evs[512];while (1) {int n = epoll_wait(epfd, evs, 512, timer.WaitTime());if (n == -1) {std::cerr << "epoll wait error: " << errno << std::endl;break;}timer.HandleTimeout();//epoll与定时器容器的 “链接手段”}return 0;
}
二、死锁检测组件
理论实现
问题引入:有A、B、C、D、E五个线程,mtx1~5,五个临界资源。如何检测这五个线程间是否存在死锁?
问题解决:如果能够用一条边(A->2)的形式表示资源请求;且知道2在被占用情况下,对应占用的线程编号(假设是B),那么就可以构建一条(A->B)边,就可以通过判断是否存在环,检测是否存在死锁情况。
1.涉及到资源请求的代码表示,就是图的一条边。
2.涉及到资源占有的线程查询,就是查询 <mtx,threadid>结构体数组表,找到对应资源的占用线程id。
代码实现
1.原生的临界资源上锁/解锁 pthread_mutex_lock/unlock
,不提供上面我们解决问题需要的线程对资源的请求和持有关系,所以需要hook改写。对于每个线程之间可能要构建边,所以在创建线程的时候,也要创建一个v结点
int pthread_mutex_lock(pthread_mutex_t *mutex) {pthread_t selfid = pthread_self();lock_before((uint64_t)selfid, (uint64_t)mutex);查找结构体数组,看mutex是否已经被占有,以及占有的线程id是多少当前selfid 与 占有线程id 两个点构建一个图的边pthread_mutex_lock_f(mutex); //真正的加锁lock_after((uint64_t)selfid, (uint64_t)mutex);在结构体数组中,增加一个当前selfid与mutex的 item在图中 删除selfid 与 之前的占有线程id 两个点构建的边}int pthread_mutex_unlock(pthread_mutex_t *mutex) {pthread_mutex_unlock_f(mutex);pthread_t selfid = pthread_self();unlock_after((uint64_t)selfid, (uint64_t)mutex); 删除结构体数组中 <mutex ,占用线程id>的item}int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr,void *(*start_routine)(void *), void *restrict arg) {pthread_create_f(thread, attr, start_routine, arg);构建一个图的结点struct source_type v1;v1.id = *thread;v1.type = PROCESS;add_vertex(v1);
}// init
void init_hook(void) {if (!pthread_mutex_lock_f)pthread_mutex_lock_f = dlsym(RTLD_NEXT, "pthread_mutex_lock");if (!pthread_mutex_unlock_f)pthread_mutex_unlock_f = dlsym(RTLD_NEXT, "pthread_mutex_unlock");if (!pthread_create_f) {pthread_create_f = dlsym(RTLD_NEXT, "pthread_create");}}
2.lock_before 、 lock_after、unlock_after的实现,与结构体数组和图都有关。结构体数组需要提供三个接口:增、删、查;图需要提供三个接口:增(点、边)、删(边)、dfs判环。
表提供的接口:
struct rela_node_s {pthread_mutex_t *mtx;pthread_t thid;
};struct rela_node_s rela_table[MAX] = {0};pthread_t search_rela_table(pthread_mutex_t *mtx) {int i = 0;for (i = 0;i < MAX;i ++) {if (mtx == rela_table[i].mtx) {return rela_table[i].thid;}}return 0;
} int del_rela_table(pthread_mutex_t *mtx, pthread_t tid) {int i = 0;for (i = 0;i < MAX;i ++) {if ((mtx == rela_table[i].mtx) && (tid == rela_table[i].thid)) {rela_table[i].mtx = NULL;rela_table[i].thid = 0;return 0;}}return -1;
}int add_rela_table(pthread_mutex_t *mtx, pthread_t tid) {int i = 0;for (i = 0;i < MAX;i ++) {if ((rela_table[i].mtx == NULL) && (rela_table[i].thid == 0)) {rela_table[i].mtx = mtx;rela_table[i].thid = tid;return 0;}}return -1;
}三个函数的实现,理解上份代码注释即可
三、分布式锁
分布式锁是什么?
分布式锁和普通的mutex锁作用一样,也是限制对临界资源的访问,只是管理的范围更大,比如不同网段的分布式集群要对同一个资源进行互斥访问,则需要一个存储在数据中心(如数据库)上的分布式锁,实现互斥。Redis,mysql中都有分布式锁的实现。
Redis是什么?
Redis是一个基于存储,集可持久化、分布式与一体的键值对存储系统,不是Mysql那样的关系型数据库,没有复杂的表连接,优势是高速读写和灵活的数据结构。
分布式锁的特性?
锁超时处理,因为是结点间进程通信,需要通过tcp/udp网络连接,当发起锁请求的结点宕机,在超时的时候有一个“超权限”的结点,来帮助解锁。
互斥性,通过比如一个owner字段,设置持有锁的对象id,可以默认为0时,锁空闲。
可用性,如果存储锁的结点宕机了,要保证锁的可用性。开多个备份点,主结点宕机进行主从切换。
容错性,即多个备份点间的数据一致性,可以通过Raft一致性算法。或者redlock红锁。
redlock是什么?
一个基于 Redis 的分布式锁算法,通过向多个独立节点申请锁,并遵循“多数派”原则来提升可靠性,规避单点故障风险。优点是保障了容错性,缺点是访问多个节点性能下降,部署多个Redis复杂。
Redis分布式锁的实现?
互斥性的实现:由于是key-value型,setnx rlock 10001即对rlock加锁,别的用户再执行setnx rlock 10002会返回失败。
超时处理实现:SET rlock 10001 NX EX 10,NX是当当前键不存在的时候设置,EX 10设置锁过期时间为10s,10s内不DEL rlock,也会自动删除锁。
可用性实现:Redis本身就是集群,有备份
容错性:即redlock的核心思想。部署多个独立的Redis节点,这些节点独立运行没有主从关系;客户端申请锁必须向所有节点申请,只有超过一半的节点都返回成功,才算获得锁成功。
四、内存泄漏检测组件
理论
内存泄漏的原因是什么?
根本原因都是malloc了,但是没有free.
如何才能判断是否存在内存泄漏?有内存泄漏了又如何确定在代码的哪一行?
程序的运行结果不会指示内存泄漏信息,只会显示业务信息。我们需要宏定义改写实现自己的malloc和free函数(获得行号的关键)。在原始的每一次malloc,都会创建一个ptr指针指向分配空间,而我们的malloc,需要创建一个以分配地址为名的文件,并且将void *ptr = malloc(size);在代码中的行号通过编译器自带的__LINE__ 写入到文件里,代码运行的文件名也可以通过__FILE__写入进去。对应的,在我们的free(ptr)函数中,每次调用都要删除名为ptr所指向地址的文件。最终程序运行结束即可通过是否存在文件判断是否存在内存泄漏,通过文件内容确定内存泄漏在代码具体行数。
代码
#define _GNU_SOURCE
#include <dlfcn.h>
#include <link.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>//#define malloc(size) nMalloc(size)
//#define free(ptr) nFree(ptr)void *nMalloc(size_t size, const char *filename, const char *funcname, int line) {void *ptr = malloc(size);char buff[128] = {0};snprintf(buff, 128, "./block/%p.mem", ptr);FILE* fp = fopen(buff, "w");if (!fp) {free(ptr);return NULL;}fprintf(fp, "[+][%s:%s:%d] %p: %ld malloc\n", filename, funcname, line, ptr, size);fflush(fp);fclose(fp);return ptr;
}void nFree(void *ptr, const char *filename, const char *funcname, int line) {char buff[128] = {0};snprintf(buff, 128, "./block/%p.mem", ptr);if (unlink(buff) < 0) { // no existprintf("double free: %p\n", ptr);return ;}return free(ptr);}#define malloc(size) nMalloc(size, __FILE__, __func__, __LINE__)
#define free(ptr) nFree(ptr, __FILE__, __func__, __LINE__)int main() {size_t size = 5;void *p1 = malloc(size);void *p2 = malloc(size * 2);void *p3 = malloc(size * 3);free(p1);free(p3);}