当前位置: 首页 > news >正文

3.2-C++基础组件

目录

一、原子操作与锁

        原子操作原子性的实现

        锁的理解

二、无锁队列设计

三、定时器

        初识定时器

        手写定时器

四、死锁检测组件

        理论实现

        代码实现

五、分布式锁

六、内存泄漏检测组件

        理论

        代码


一、原子操作与锁

原子操作原子性的实现

单核单处理情况下,只需要屏蔽中断,即可保证原子性。

多核多处理器情况下,在硬件提供原子指令从指令层杜绝中断以外,通过MESI协议确保在执行原子指令的瞬间,只有一个核心拥有数据的独占修改权,其他核心的副本全部失效。

MESI协议小剧场:

角色介绍:MESI 的四种状态

首先,复习一下四个状态,这是协议的语言:

  • M (Modified)​​:脏数据。只有本核心有副本,且与主内存不一致。拥有“独占修改权”。

  • E (Exclusive)​​:干净数据。只有本核心有副本,但与主内存一致。也拥有“独占权”,但还未修改。

  • S (Shared)​​:干净数据。多个核心都有副本,都与主内存一致。大家只有“读取权”,没有“修改权”。

  • I (Invalid)​​:无效数据。副本已过期,不能使用。相当于没有副本。

战场:CPU缓存、缓存总线和嗅探器

每个CPU核心都有自己的缓存。所有缓存都连接到一个共享的总线上。每个缓存都有一个嗅探器(Snooper)​,它的任务是时刻监视总线上的所有通信消息。

战斗目标:获取独占权(E或M状态)

假设有两个核心:​Core 1​ 和 ​Core 2。它们缓存中都有变量 X 的副本,且状态均为 ​S (Shared)​。现在,​Core 1​ 要执行一个原子操作(如 X++)。

为了原子性地修改 X,Core 1 ​必须先将它的缓存行状态从 ​S​ 提升为 ​E​ 或 ​M。MESI协议通过以下步骤确保这一点:


战斗过程:一次完整的“独占权夺取”

第1步:Core 1 发出“读请求”并寻求独占(Read For Ownership, RFO)​

Core 1 需要修改 X,但它当前的缓存副本是 ​S​ 状态,没有修改权。于是,它在总线上广播一条消息:

​“我要读地址A的数据,并且我想要独占权限!”(Read-For-Ownership for address A)​

这条RFO请求是夺取独占权的发令枪

第2步:其他核心的嗅探器拦截请求并采取行动

总线上所有其他核心(这里主要是Core 2)的嗅探器都看到了这条RFO消息。

  • Core 2 的嗅探器​ 检查自己是否有地址A的缓存。

    • 有,且状态为S​:Core 2 知道自己有一个共享副本,但现在另一个核心要求独占权。

    • 行动​:Core 2 ​必须将自己的这个缓存行状态立即从 S 降级为 I (Invalid)​。这意味着Core 2 的 X 副本作废了。

    • 响应​:Core 2 通过总线回复一个确认(Acknowledge)​​ 消息,意思是:“收到命令,我的副本已无效化。”

第3步:Core 1 收集响应并完成升级

  • Core 1 等待所有其他核心的确认响应

  • 一旦收到所有确认(表明世界上再也没有其他核心拥有 X 的有效副本了),Core 1 的夺取行动就成功了

  • 现在,Core 1 可以将自己缓存中 X 的状态从 ​S​ 提升为 ​E (Exclusive)​

此时此刻,在取得独占权的这个瞬间:​

  • Core 1​:是全世界唯一一个拥有 X 有效缓存副本的核心,状态为 ​E。它获得了宝贵的独占修改权

  • Core 2​:它的 X 副本状态为 ​I,已经完全失效。如果Core 2 现在试图读 X,它会产生缓存未命中,必须重新向总线发起请求。

  • 主内存​:数据依然是旧值。

执行原子操作

现在,Core 1 可以安全地执行原子操作了:​

  1. 它执行底层的原子指令(例如,一条硬件的 LOCK INC 指令)。

  2. 它将 X 的值加1。

  3. 因为这个修改,它缓存行的状态从 ​E​ 变为 ​M (Modified)​。现在数据是“脏”的,与主内存不一致。

在整个原子指令执行的极短时间内,由于缓存行处于 E/M 状态,缓存协议确保了绝对的数据独占性。​​ 其他核心根本无法访问这个数据,因为它们的副本是 ​I​(无效的),任何访问尝试都会导致缓存未命中,而总线又被Core 1的原子操作所“锁定”或协调。

后续:数据同步

当Core 2 之后需要读取 X 时:

  1. Core 2 发现自己的副本是 ​I,于是在总线上发出“读请求”。

  2. Core 1 的嗅探器看到这个请求,发现自己有最新的 ​M​ 状态数据。

  3. Core 1 ​拦截这个读请求,将自己缓存中的最新数据直接通过总线写回给Core 2,同时也写回主内存以更新。

  4. 现在,Core 1 和 Core 2 的缓存行状态都变为 ​S (Shared)​,数据保持一致。

锁的理解

自旋锁和互斥锁的区别:

1)等待策略:自旋锁会忙等待,检测到临界资源被加了自旋锁会while(lock->flag)

                        互斥锁会进入内核态休眠,检测到临界资源不可用会通过futex系统调用进入内核态

2)上下文切换:自旋锁一直都在占用当前核心,无上下文切换。

                           互斥锁有。

3)使用场景:使用过程都是 加锁-操作-解锁。所以当对临界资源的操作时间长的时候,选择互斥锁。当对临界资源操作时间短的时候,选择自旋锁,减少线程切换开销。

        操作时间长短的定义:以陷入内核态+切换线程+切回用户态的总时间为标准。

二、无锁队列设计

有锁队列是什么?

通过互斥锁或其他同步机制保证线程安全的队列,比如之前实现的线程池。

有锁队列中锁的局限有:线程切换、死锁、性能瓶颈。

无锁队列是什么?

属于非阻塞队列,通过原子操作来实现线程安全的队列。

无锁队列的类型有:

1.lock-free(无锁),贴近我们的理解,每次至少一个线程成功,其他申请的线程重试,依赖的是cas等原子操作。

CAS是什么?

核心机制,包含三个操作数:​内存位置(V)、预期原值(A)、新值(B)​。

只有当位置 V 的值等于预期原值 A 时,才会将位置 V 的值更新为新值 B。每次都返回V值,如果等于A说明修改成功,如果不等于,说明在这个期间有其他线程修改了V,操作失败。

2.wait-free(无等待),所有的线程必成功,无重试,依赖exchange操作。

有锁队列和无锁队列的直观区别?

相同:每次都只能一个线程访问临界区的资源。

不同:

1.有阻塞与无阻塞:有锁队列的没有进入临界区的线程,只能阻塞等待。且如果访问临界区的线程崩溃了,就会造成死锁。无锁队列每个线程随时都可以尝试CAS操作,没有进入临界区的线程可以去先干别的事,循环重试。

2.锁与原子操作的区别。

无锁队列的实现

1.volatile关键字:保证可见性,某线程修改变量后能被其他线程看见。确保的是当前线程内代码与“外部代理”(硬件、中断、信号)之间的正确交互。禁止编译器优化掉指令,编译器会在每次读到volatile修饰的变量的时候都重新加载数据,不因为本线程内没有修改该变量的代码而不再读取。

2.内存屏障:由于CPU为了性能的提升,会重排代码指令,但是如果是多线程的情况下直接重排代码,会发生异常行为。内存屏障强制在屏障处建立一个“同步点”(最常见的是 ​Release-Acquire​ 配对),屏障前的操作必须全部完成且可见后,才能执行屏障后的操作,从而在多线程中维护了逻辑的正确性。

3.原子操作:上面两项技术都不保证原子性,需要原子操作来保证竞争合法。

关系1:原子操作往往内置了内存屏障,如:

std::atomic<int> counter(0);// 线程 1
counter.store(42, std::memory_order_release); // 写操作,包含一个 release 屏障// 线程 2
int value = counter.load(std::memory_order_acquire); // 读操作,包含一个 acquire 屏障

关系2:volatile与他们几乎无关,唯一与原子操作结合的地方是在“需要操作的变量本身是内存映射的硬件寄存器,并且需要在多个线程之间以原子方式访问它。”

  • volatile 用于告诉编译器:“别优化,这个地址是硬件,每次都必须读写内存”。
  • atomic 用于告诉 CPU:“这个操作需要原子性和多线程同步语义”。

三、定时器

初识定时器

定时器是什么?

定时器是一个项目中,组织管理延时任务的模块,帮助延时任务高效的度过延迟的那段时间,而非sleep阻塞等待。

为什么要有定时器?

如果没有定时器的话,想要3s后给用户弹出一个通知,调用sleep(3),那这3s线程就会被阻塞,也无法去处理其他任务,这是低效的行为。

定时器是怎么“高效”的?

参考epoll的事件通知方法,如果能够做到延迟的这段时间线程去处理其他任务,在时间结束后可以给到线程一个通知,就可以最大限度利用cpu资源。

定时器的实现架构?

基于上面高效的理解,定时器需要一个数据结构存储延时事件 + 一个触发机制。

实现定时器的容器是什么?

对于这个存储的数据结构,需要能够根据插入事件触发的时间自动进行排序,所以可以是红黑树实现的multiset / multimap,或者是最小堆。还可以根据执行顺序进行组织,比如时间轮。

时间轮是什么?

通过空间换时间。将时间分成三级,时、分、秒,每一级都有对应的格子数,每个格子存储的是一个链表,当时间指针走到对应格子的时候执行链表中的所有事件。如果要添加一个1h5min20s后执行的任务,就先挂到(当前小时 + 1) % 24的格子下面,当走到这个格子下面的时候,将他重新映射到(当前分钟 + 5) % 60的格子下面,当分针也走到这个格子的时候,再降级到对应秒针格子下面。再走到对应格子的时候,执行对应事件。

实现定时器的触发机制是什么?

将定时器的超时处理转为io处理,Linux内核提供了timerfd,以此来在io多路复用中注册一个读事件,比如select的FD_SET、epoll的epoll_ctl,在发起io时系统调用陷入内核态处理,在io响应时返回用户态。

手写定时器

思路:需要实现一个定时器timer类,里面有容器存储定时事件,每个定时事件也是一个类(结构体),有自己的执行时间timeout,有自己的回调函数callback。通过timer类下WaitTime函数计算容器中所有事件里最小的等待时间diff,作为epoll的第四个参数,这样epoll就会规定在diff时间后返回。而将io处理转为事件处理的是一个timer类下的Handle函数,在epoll等待最小时间返回后,表明有事件可以执行了,那么调用Handle函数,在函数内调用事件的回调函数,容器也删除这个事件。

代码:

#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <time.h> 
#include <unistd.h> 
#include <functional>
#include <chrono>
#include <set>
#include <memory>
#include <iostream>class TimerNode {
public:friend class Timer;TimerNode(uint64_t timeout, std::function<void()> callback):timeout_(timeout),callback_(std::move(callback)){}//move避免拷贝
private:uint64_t timeout_;std::function<void()> callback_;
};
class Timer {
public:~Timer() {}static uint64_t GetCurrentTime() {using namespace std::chrono;return duration_cast<milliseconds>(steady_clock::now.time_since_epoch()).count();//steady_clock不受系统时间调整影响}TimerNode* AddTimerNode(uint64_t diff, std::function<void()> cb) {TimerNode* node = new TimerNode(GetCurrentTime + diff, std::move(cb));//node的类型是指针if (timer_map_.empty() == 0 && node->timeout_ > timer_map_.rbegin()->first) {auto it = timer_map_.emplace_hint(timer_map_.crbegin().base(), std::make_pair(node->timeout_, std::move(node)));return it->second;}else {auto it = timer_map_.insert(std::make_pair(node->timeout_,node));return it->second;}}void DelTimeout(TimerNode* node) {auto it = timer_map_.equal_range(node->timeout_);//先找值相同的 比直接遍历更快for (auto iter = it.first; iter != it.second; iter++) {if (iter->second == node) {timer_map_.erase(iter);break;}}}int WaitTime() {//最小等待时间auto iter = timer_map_.begin();if (iter == timer_map_.end()) {return -1;}uint64_t diff = iter->first - GetCurrentTime();return diff > 0 ? diff : 0;}void HandleTimeout() {auto iter = timer_map_.begin();while (iter != timer_map_.end() && iter->first <= GetCurrentTime()) {iter->second->callback_();//指向事件的指针 调用事件函数iter = timer_map_.erase(iter);}}
private:std::multimap<uint64_t, TimerNode*> timer_map_;
};int main() {int epfd = epoll_create(0);if (epfd == -1) {std::cerr << "epoll create error: " << errno << std::endl;return -1;}Timer timer;int i;timer.AddTimerNode(1000, [&]() {//[&]只会捕获 lambda 体内实际使用的变量std::cout << "timeout 1 second: " << i++ << std::endl;});epoll_event evs[512];while (1) {int n = epoll_wait(epfd, evs, 512, timer.WaitTime());if (n == -1) {std::cerr << "epoll wait error: " << errno << std::endl;break;}timer.HandleTimeout();//epoll与定时器容器的 “链接手段”}return 0;
}

四、死锁检测组件

理论实现

问题引入:有A、B、C、D、E五个线程,mtx1~5,五个临界资源。如何检测这五个线程间是否存在死锁?

问题解决:如果能够用一条边(A->2)的形式表示资源请求;且知道2在被占用情况下,对应占用的线程编号(假设是B),那么就可以构建一条(A->B)边,就可以通过判断是否存在环,检测是否存在死锁情况。

1.涉及到资源请求的代码表示,就是的一条边。

2.涉及到资源占有的线程查询,就是查询 <mtx,threadid>结构体数组表,找到对应资源的占用线程id。

代码实现

1.原生的临界资源上锁/解锁 pthread_mutex_lock/unlock,不提供上面我们解决问题需要的线程对资源的请求和持有关系,所以需要hook改写。对于每个线程之间可能要构建边,所以在创建线程的时候,也要创建一个v结点

int pthread_mutex_lock(pthread_mutex_t *mutex) {pthread_t selfid = pthread_self();lock_before((uint64_t)selfid, (uint64_t)mutex);查找结构体数组,看mutex是否已经被占有,以及占有的线程id是多少当前selfid 与 占有线程id  两个点构建一个图的边pthread_mutex_lock_f(mutex);    //真正的加锁lock_after((uint64_t)selfid, (uint64_t)mutex);在结构体数组中,增加一个当前selfid与mutex的 item在图中   删除selfid 与 之前的占有线程id  两个点构建的边}int pthread_mutex_unlock(pthread_mutex_t *mutex) {pthread_mutex_unlock_f(mutex);pthread_t selfid = pthread_self();unlock_after((uint64_t)selfid, (uint64_t)mutex);    删除结构体数组中 <mutex ,占用线程id>的item}int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr,void *(*start_routine)(void *), void *restrict arg) {pthread_create_f(thread, attr, start_routine, arg);构建一个图的结点struct source_type v1;v1.id = *thread;v1.type = PROCESS;add_vertex(v1);
}// init
void init_hook(void) {if (!pthread_mutex_lock_f)pthread_mutex_lock_f = dlsym(RTLD_NEXT, "pthread_mutex_lock");if (!pthread_mutex_unlock_f)pthread_mutex_unlock_f = dlsym(RTLD_NEXT, "pthread_mutex_unlock");if (!pthread_create_f) {pthread_create_f = dlsym(RTLD_NEXT, "pthread_create");}}

2.lock_before 、 lock_after、unlock_after的实现,与结构体数组和图都有关。结构体数组需要提供三个接口:增、删、查;图需要提供三个接口:增(点、边)、删(边)、dfs判环。

表提供的接口:
struct rela_node_s {pthread_mutex_t *mtx;pthread_t thid;
};struct rela_node_s rela_table[MAX] = {0};pthread_t search_rela_table(pthread_mutex_t *mtx) {int i = 0;for (i = 0;i < MAX;i ++) {if (mtx == rela_table[i].mtx) {return rela_table[i].thid;}}return 0;
} int del_rela_table(pthread_mutex_t *mtx, pthread_t tid) {int i = 0;for (i = 0;i < MAX;i ++) {if ((mtx == rela_table[i].mtx) && (tid == rela_table[i].thid)) {rela_table[i].mtx = NULL;rela_table[i].thid = 0;return 0;}}return -1;
}int add_rela_table(pthread_mutex_t *mtx, pthread_t tid) {int i = 0;for (i = 0;i < MAX;i ++) {if ((rela_table[i].mtx == NULL) && (rela_table[i].thid == 0)) {rela_table[i].mtx = mtx;rela_table[i].thid = tid;return 0;}}return -1;
}三个函数的实现,理解上份代码注释即可

五、分布式锁

分布式锁是什么?

分布式锁和普通的mutex锁作用一样,也是限制对临界资源的访问,只是管理的范围更大,比如不同网段的分布式集群要对同一个资源进行互斥访问,则需要一个存储在数据中心(如数据库)上的分布式锁,实现互斥。Redis,mysql中都有分布式锁的实现。

Redis是什么?

Redis是一个基于存储,集可持久化、分布式与一体的键值对存储系统,不是Mysql那样的关系型数据库,没有复杂的表连接,优势是高速读写和灵活的数据结构。

分布式锁的特性?

锁超时处理,因为是结点间进程通信,需要通过tcp/udp网络连接,当发起锁请求的结点宕机,在超时的时候有一个“超权限”的结点,来帮助解锁。

互斥性,通过比如一个owner字段,设置持有锁的对象id,可以默认为0时,锁空闲。

可用性,如果存储锁的结点宕机了,要保证锁的可用性。开多个备份点,主结点宕机进行主从切换。

容错性,即多个备份点间的数据一致性,可以通过Raft一致性算法。或者redlock红锁。

redlock是什么?

一个基于 Redis 的分布式锁算法,通过向多个独立节点申请锁,并遵循“多数派”原则来提升可靠性,规避单点故障风险。优点是保障了容错性,缺点是访问多个节点性能下降,部署多个Redis复杂。

Redis分布式锁的实现?

互斥性的实现:由于是key-value型,setnx rlock 10001即对rlock加锁,别的用户再执行setnx rlock 10002会返回失败。

超时处理实现:SET rlock 10001 NX EX 10,NX是当当前键不存在的时候设置,EX 10设置锁过期时间为10s,10s内不DEL rlock,也会自动删除锁。

可用性实现:Redis本身就是集群,有备份

容错性:即redlock的核心思想。部署多个独立的Redis节点,这些节点独立运行没有主从关系;客户端申请锁必须向所有节点申请,只有超过一半的节点都返回成功,才算获得锁成功。

六、内存泄漏检测组件

理论

内存泄漏的原因是什么?

根本原因都是malloc了,但是没有free.

如何才能判断是否存在内存泄漏?有内存泄漏了又如何确定在代码的哪一行?

程序的运行结果不会指示内存泄漏信息,只会显示业务信息。我们需要宏定义改写实现自己的malloc和free函数(获得行号的关键)。在原始的每一次malloc,都会创建一个ptr指针指向分配空间,而我们的malloc,需要创建一个以分配地址为名的文件,并且将void *ptr = malloc(size);在代码中的行号通过编译器自带的__LINE__ 写入到文件里,代码运行的文件名也可以通过__FILE__写入进去。对应的,在我们的free(ptr)函数中,每次调用都要删除名为ptr所指向地址的文件。最终程序运行结束即可通过是否存在文件判断是否存在内存泄漏,通过文件内容确定内存泄漏在代码具体行数

代码

#define _GNU_SOURCE
#include <dlfcn.h>
#include <link.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>//#define malloc(size) nMalloc(size)
//#define free(ptr)	 nFree(ptr)void *nMalloc(size_t size, const char *filename, const char *funcname, int line) {void *ptr = malloc(size);char buff[128] = {0};snprintf(buff, 128, "./block/%p.mem", ptr);FILE* fp = fopen(buff, "w");if (!fp) {free(ptr);return NULL;}fprintf(fp, "[+][%s:%s:%d] %p: %ld malloc\n", filename, funcname, line, ptr, size);fflush(fp);fclose(fp);return ptr;
}void nFree(void *ptr, const char *filename, const char *funcname, int line) {char buff[128] = {0};snprintf(buff, 128, "./block/%p.mem", ptr);if (unlink(buff) < 0) { // no existprintf("double free: %p\n", ptr);return ;}return free(ptr);}#define malloc(size) nMalloc(size, __FILE__, __func__, __LINE__)
#define free(ptr)	 nFree(ptr, __FILE__, __func__, __LINE__)int main() {size_t size = 5;void *p1 = malloc(size);void *p2 = malloc(size * 2);void *p3 = malloc(size * 3);free(p1);free(p3);}

http://www.dtcms.com/a/363925.html

相关文章:

  • Kafka面试精讲 Day 5:Broker集群管理与协调机制
  • 深度学习-----通过本地数据实现图片识别的训练
  • PS痕迹检测器:基于深度学习的图像篡改检测
  • 撤销修改 情况⼀:对于⼯作区的代码,还没有 add
  • 浏览器内存 (JavaScript运行时内存)存储的优劣分析
  • linux(cut,sort,uniq ,tr,sed,awk)命令介绍
  • 贝叶斯定理:理解概率更新与实际场景应用
  • 在VS Code中直接操控浏览器
  • 预算紧张?这5款低代码平台免费还好用!
  • 光储充一体化智慧能源平台助力某能投公司绿色能源转型
  • 【面试场景题】如何理解设计模式
  • three.js手机端的4种旋转方式
  • 有鹿巡扫机器人:智慧清洁时代的多面手
  • (四)Python控制结构(条件结构)
  • MMORPG 游戏战斗系统架构
  • 2025互联网大厂Java后端面试:3-5年经验必问核心考点解析
  • 机器学习辅助的Backtrader资产配置优化策略
  • 【vue2】vue2.7x的项目中集成tailwind.css真的不要太香
  • Python 类的方法类型详解
  • 企业如何实现零工用工零风险?盖雅全自动化合规管控
  • 望获实时Linux:亚微秒级系统响应的实现方法
  • Qt中字节对齐问题和数据的序列化和反序列化的问题
  • springboot2.x集成swagger api(springdoc-openapi-ui)
  • 开源企业级快速开发平台(JeecgBoot)
  • python - ( js )object对象、json对象、字符串对象的相关方法、数组对象的相关方法、BOM对象、BOM模型中 Navigator 对象
  • 人工智能与强化学习:使用OpenAI Gym进行项目开发
  • Scikit-learn从入门到实践:Scikit-learn入门与实践
  • Scikit-learn从入门到实践:Scikit-learn入门-安装与基础操作
  • SQLynx VS DBeaver:数据库管理工具的两种思路
  • 京东科技大模型RAG岗三轮面试全复盘:从八股到开放题的通关指南