当前位置: 首页 > news >正文

Linux线程

目录

一·线程概念

1.Linux中线程该如何理解

2.重新定义进程和线程

3.重谈地址空间 4 

4.Linux线程周边的概念 

线程的优点 

线程的缺点 

线程异常 

 线程用途

二. 线程控制

POSIX线程库

创建线程

线程终止 

线程等待 

c++11中的多线程 

线程ID及进程地址空间布局

 小总结

 创建多线程(部分代码)

分离线程 

 三、线程互斥

 进程线程间的互斥相关背景概念

互斥量mutex 

互斥量的接口 

初始化互斥量

销毁互斥量

互斥量加锁和解锁

 代码

 互斥量实现原理探究

锁的封装

四、可重入VS线程安全 

 概念

 常见的线程不安全的情况

 常见的线程安全的情况

 常见不可重入的情况

 常见可重入的情况

五、死锁 

死锁四个必要条件

避免死锁

六、 Linux线程同步 

同步概念与竞态条件

条件变量

快速提出解决方案

条件变量函数 

 初始化

销毁

等待条件满足

唤醒等待

条件变量函数的应用

七、生产者消费者模型 

为何要使用生产者消费者模型

 生产者消费者模型优点

快速实现CP模型 

基于BlockingQueue的生产者消费者模型 

 C++ queue模拟阻塞队列的生产消费模型

阻塞队列代码 

生产者消费者模型内容补充 

1.生产者消费者模型到底为什么高效

2.线程伪唤醒问题 

3.多生产多消费代码 

 八、POSIX信号量

信号量接口 

 基于环形队列的生产消费模型

信号量 环形队列代码 

九、线程池

线程池代码 (单例模式)

 ThreadPool.hpp

十、封装原生线程

封装原生线程代码

十一、 线程安全的单例模式 

 什么是单例模式

 什么是设计模式

单例模式的特点 

饿汉实现方式和懒汉实现方式 

知识补充

饿汉方式实现单例模式

懒汉方式实现单例模式

懒汉方式实现单例模式(线程安全版本)

十二、 STL,智能指针和线程安全

 STL中的容器是否是线程安全的?

智能指针是否是线程安全的? 

十三、其他常见的各种锁 

自旋锁 

十四、 读者写者问题

 读写锁

读写锁接口 

设置读写优先

初始化

 销毁

加锁和解锁


一·线程概念

线程:是进程内的一个执行分支,线程的执行粒度要比进程要细

1.Linux中线程该如何理解

        在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
        一切进程至少都有一个执行线程
        线程在进程内部运行,本质是在进程地址空间内运行
        在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
        透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流 

 

        我们描述线程是复用进程的相关代码,我们通过描述进程的内核数据结构来描述线程。 

        而且cpu也不需要辨别进程线程,cpu只有调度执行流的概念,所以这些进程线程我们可以统称为不同的执行流。

        上图所示的才是真正的进程,我们之前学习的进程实际上是特殊的,只有一个线程的进程。

        那我们怎么理解之前的进程呢?

        操作系统以进程为单位给我们分配资源,而我们当前进程内部只有一个执行流!任何执行流要执行都需要资源,地址空间是进程资源的窗口。

2.重新定义进程和线程

什么是线程?我们认为,线程是操作系统调度的基本单位

 重新理解进程 内核观点:我们认为进程是承担分配系统资源的基本实体

执行流是资源吗?线程是进程内部的执行流资源!

3.重谈地址空间 4 

         我们之前谈到页表映射,我们的印象可能是这样子的,虚拟地址,物理地址还有标志位在一行进行对应。但是我们的物理内存有4GB,用这种映射方法所需要的空间实在太大了,实际上是不合理的。

(除了CR3寄存器指向页表,我们这里再介绍一个寄存器,即CR2寄存器,这个寄存器会存储引起缺页中断以及异常的虚拟地址,比如我们缺页中断了,然后操作系统重新建立虚拟地址和物理地址的映射,等处理完了返回的时候,通过这个寄存器我们就可以知道我们上一次是在哪个位置) 

虚拟地址转换到物理地址的实际过程如下

我们把一个32位的地址分成10 + 10 + 12

        第一个十字节形成了一个页目录,里面的每个位置里面的内容我们称为页目录表项,里面存储的是二级页表地址,(这个十字节转化成的十进制数我们可以看成数组下标),每一个二级页表的大小也和页目录一样大 ,但是里面存储的是页框的起始地址,这样子就能定位到所有的页框。最后的十二个字节作为页框的偏移量,这正好是一个页框的大小,因此加上偏移量可以定位到页框中的任意位置。

        但是我们取到的地址永远是起始地址,我们知道,例如整型是四个字节大小,那么我们怎么知道呢?这就是我们需要类型的原因,cpu是与硬件直接相连的,因此只要找到变量的起始地址,cpu自己是天然知道对应的类型的。

        

最后,线程分配资源的本质就是分配地址空间范围! 

4.Linux线程周边的概念 

 

        (cache是cpu中的一个硬件级别的缓存 )

        线程比进程是要更加轻量化的,为什么呢?

        因为线程的切换是不需要切换进程上下文,页表等,同时线程间的切换不需要重新cache数据,而一旦我们切换进程,cache里面缓存的热数据就会 被丢弃。

        同时我们也需要知道线程之间也不是完全相同的,我们也有主线程,即我们第一个创建的线程 ,例如我们给某个进程4ms的时间片,4个线程每个线程分配了1ms,时间片每次被消耗对应的线程里面相应的记录也会减,而主线程不仅会记录自己的时间片,也会记录整个进程总的时间片。

线程的优点 

        创建一个新线程的代价要比创建一个新进程小得多
        与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
        线程占用的资源要比进程少很多
        能充分利用多处理器的可并行数量
        在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
        计算密集型应用(加密,解密,压缩等等),为了能在多处理器系统上运行,将计算分解到多个线程中实现(如果是单cpu,我们处理计算密集型应用只需要创建一个线程即可,这样是最快的,因为连线程间切换的时间都省了)
        I/O密集型应用(拷贝,网络传输,网络通信等等),为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作 

线程的缺点 

1.性能损失
        一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
2.健壮性降低
        编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
3.缺乏访问控制
        进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
4.编程难度提高
        编写与调试一个多线程程序比单线程程序困难得多 

线程异常 

         单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
        线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出

 线程用途

        合理的使用多线程,能提高CPU密集型程序的执行效率
        合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现) 

二. 线程控制

        内核中有没有很明确的线程的概念呢?没有,只有轻量级进程的概念。

         因此,系统不会给我提供线程的系统调用,只会给我们提供轻量级进程的系统调用。

        可是我们用户需要线程的接口!

        因此Linux程序员开发出了pthread线程库----这是一个应用层的动态库---它将轻量级进程的接口进行封装以满足我们的需求。几乎所有的Linux平台都会默认自带这个库,但是由于它是一个第三方库我们在编译的时候必须指明该库

POSIX线程库

 1.与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的
2.要使用这些函数库,要通过引入头文件<pthread.h>
3.链接这些线程函数库时要使用编译器命令的“-lpthread”选项

创建线程

功能:创建一个新的线程
原型
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
(*start_routine)(void*), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码

错误检查:
1.传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误。
2.pthreads函数出错时不会设置全局变量errno(而大部分其他POSIX函数会这样做)。而是将错误代码通过返回值返回
3.pthreads同样也提供了线程内的errno变量,以支持其它使用errno的代码。对于pthreads函数的错误,建议通过返回值业判定,因为读取返回值要比读取线程内的errno变量的开销更小 

        这个函数的返回值,如果创建的是成功的返回0否则返回错误码,没有使用errno,而是采用返回值的形式

        ( arg参数并不仅仅可以传入一个变量,它还能传入类。 ) 

        代码示例1.

        代码示例2:传递单个参数,调用pthread_create传参的时候需要把参数强转成void*,在函数里用的时候需要再强转回来。

代码示例3:传类。我们传入和传出的还可以是类。同时由此我们也可以,堆区对于这些线程来说也是共享的。

查看用户的所有轻量级进程

ps -aL 

         LWP即light weight process 轻量级进程,因此操作系统可以根据PID和LWP是否相等来判断该进程是否是主线程

 全局变量以及函数等等,所有线程都是可见的,下面代码示例

任何一个线程被干掉了,它对应的进程也会被干掉

线程终止 

 如果需要只终止某个线程而不终止整个进程,可以有三种方法:
        1. 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
        2. 线程可以调用pthread_ exit终止自己
        3. 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程

pthread_exit函数

功能:线程终止
原型
void pthread_exit(void *value_ptr);
参数
value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)

         需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了

pthread_cancel函数

功能:取消一个执行中的线程
原型
int pthread_cancel(pthread_t thread);
参数
thread:线程ID
返回值:成功返回0;失败返回错误码

线程等待 

为什么需要线程等待?
1.已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。(即需要防止内存泄露)
2.创建新的线程不会复用刚才退出线程的地址空间。 

3.如果需要,可以获取其退出结果

功能:等待线程结束
原型
int pthread_join(pthread_t thread, void **value_ptr);
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
    返回值:成功返回0;失败返回错误码

        value_ptr比较特殊,是一个二级指针,不过我们只需要把void*看成是和int一样的类型就好理解了,同理,指针也是会产生临时变量的 。我们就是通过这个二级指针把我们所需要的返回值带出来的

         调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:

1. 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。我们这里返回1,它实际上是被视为一个地址的。我们线程成功等待后要打印还需要强转一下。(由于是64位机器,这里指针大小位8字节,直接用int强转会发生截断,所以我们使用long long int)


2. 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED(即-1)。(不常见)


3. 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。(如果在我们创建的进程里面调用exit(),和异常类似,线程调exit()也就是代表进程调,整个进程都会退出,因此有了pthread_exit来退出线程)


4. 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。 

        可能会有人好奇,为什么线程等待没有向进程等待一样, 会去考虑异常,毕竟只要是人写的代码,那就一定有可能出现问题。这是因为做不到,如果我们某个线程要是出异常了,整个进程都会退出。

c++11中的多线程 

c++11支持多线程,但是c++11中的线程库 实际上还是封装了原生线程库。

不过我们还是更建议使用c++11中的多线程,因为它具有更好的移植性。

线程ID及进程地址空间布局


1.pthread_ create函数会产生一个线程ID(tid),存放在第一个参数指向的地址中。该线程ID和前面说的线程ID(LWP)不是一回事。
2.前面讲的线程ID属于进程调度的范畴。也就是内核进行辨认的标识,与PID的理解类似。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
3.pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。
4.线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID。

线程库中会开辟一块空间用作该线程的栈,也就是说我们新创建的进程的栈实际上在共享区 。

        我这个线程要执行的回调方法的地址是什么, 这个线程的时间片是多少,等等(即线程的众多属性),操作系统是不关心,也不知道的,因为操作系统实际上并没有线程的概念,因此这些“线程”概念实际上是库给我们维护的。

        那我们的线程库要不要管理这些线程呢?要的!我们把线程库级别的线程称为线程控制块,里面包含了线程的很多很多属性例如回调函数的地址,线程的独立栈在哪里,以及LWD对应的是操作系统里的哪个执行流。所以我们只需要找到这个线程控制块,就能控制这个线程了。

基于以上,我们再理解一下 pthread_t

        方框框起来的可以看成线程在用户层面的tcb 

        我们Linux的线程就=用户级线程+内核的LWP。我们有时候会听到用户级线程和内核级线程的概念,他们就是在告诉我们,线程是在用户层实现还是在操作系统内部实现

 小总结

        每一个执行流的本质是一个调用链,我们把一个个宏观的调用链所对应的栈帧结构在栈上进行开辟,我们每次定义变量,比如在main函数中,或者在什么函数中,都是在这些函数自身的栈帧结构中进行定义的。栈结构它的本质是为了支持我们在应用层来完成整个调用链所对应的临时变量的空间的开辟和释放。每一个线程都有自己的调用链,因此他们也需要有一个独立的栈结构,让自己的调用链不受别人的干扰

 创建多线程(部分代码)

实际上代码逻辑和创建多个子进程类似,通过一个循环。 

        我们发现我们每个线程执行的函数都是一样的(因为我们上面的代码传的都是这个函数)

        而且发现代码的运行结果中test_i每次都从0开始增长,这说明不同线程的栈是相互独立的。但是这并不意味着绝对的独立,我们要访问某个线程的栈中的变量,只需要把相应地址带出来即可,只要有地址,其它任何的线程也能访问。

        全局变量是被所有的线程同时看到并访问的 。

        那如果我想要一个私有的全局变量呢?

        用 __thread 修饰,全局变量用__thread 修饰之后每个线程人均一个。对于一些变量的获取可以显著减少系统调用的次数 。比如说一个线程的线程执行函数还可能再继续调用其它函数,这时候如果该函数想要获取进程tid等,就不需要再进行系统调用或者传参,而是可以直接拿

分离线程 

         默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
        如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源

int pthread_detach(pthread_t thread);

可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:

pthread_detach(pthread_self());

joinable和分离是冲突的,一个线程不能既是joinable又是分离的。即两种状态 不能共存

 三、线程互斥

引子:抢票代码 

#include <iostream>
#include <cstdio>
#include <cstring>
#include <vector>
#include <unistd.h>
#include <pthread.h>

using namespace std;

#define NUM 4

class threadData
{
public:
    threadData(int number)
    {
        threadname = "thread-" + to_string(number);
    }

public:
    string threadname;
};

int tickets = 1000; // 用多线程,模拟一轮抢票

void *getTicket(void *args)
{
    threadData *td = static_cast<threadData *>(args);
    const char *name = td->threadname.c_str();
    while (true)
    {
        if(tickets > 0)
        {
            usleep(1000);
            printf("who=%s, get a ticket: %d\n", name, tickets); // ?
            tickets--;
        }
        else
            break;
    }
    printf("%s ... quit\n", name);
    return nullptr;
}

int main()
{
    vector<pthread_t> tids;
    vector<threadData *> thread_datas;
    for (int i = 1; i <= NUM; i++)
    {
        pthread_t tid;
        threadData *td = new threadData(i);
        thread_datas.push_back(td);
        pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
        tids.push_back(tid);
    }

    for (auto thread : tids)
    {
        pthread_join(thread, nullptr);
    }

    for (auto td : thread_datas)
    {
        delete td;
    }
    return 0;
}

 

我们会抢到小于0的票数,这是怎么回事? 

共享数据---->数据不一致问题! 这肯定和多线程并发访问是有关系的

1.对一个全局变量进行多线程并发++/--是否是安全的?并不安全!

--实际上有三步

        进程在切换的时候,会带走自己的上下文! 同理,切换回来的时候会把自己的上下文带回来。 

        寄存器并不等于寄存器中的内容,cpu中的寄存器只有一套,因此 进程切换的时候相应的上下文会被带走。

        我们这里线程在执行的时候将共享数据加载到cpu寄存器中的本质是:把数据的内容变成了自己的上下文-----即以拷贝的方式给自己单独拿了一份。

        假设我们有两个线程,第一个线程的时间片非常短,而第二个线程时间片非常长,由于上述--的操作并不是原子的,有可能当我们刚刚把数据读到寄存器中,线程就被切换了,然后一直调度第二个线程,第二个线程有可能一直--,例如一直减到10,可是这时候线程又切换了,第一个线程进行第二步操作,这时候首先要进行上下文恢复,这时候票数还是1000,进行--之后,票数就变成999,这时候将结果写回内存时就发现,999将原来的结果10给覆盖了!

2.比较运算的问题

        虽然--也会出问题,但是这个票数问题主要并不是在--,而是在比较,比较也是一种运算,而能进行运算的只有cpu,这时候有可能我们票数为1了,但是刚判断完线程就被切走,每个线程都是判断完之后被切走,等到再切回来的时候,进行的已经是--操作了,第一个线程--结束后刷新内存,票数变为0,可是这时候由于线程都已经进行过比较运算了,只会继续--并且刷新内存,因此就出现了负数票数的情况

        由于上述问题,我们需要引进线程互斥的概念,以及锁。 

要解决以上问题,需要做到三点:
        1.代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
        2.如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
        3.如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。 

 进程线程间的互斥相关背景概念

        临界资源:多线程执行流共享的资源就叫做临界资源
        临界区:每个线程内部,访问临界资源的代码,就叫做临界区
        互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
        原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成要么未完成 

互斥量mutex 

         1.大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
        2.但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
        3.多个线程并发的操作共享变量,会带来一些问题。

互斥量的接口 

初始化互斥量


初始化互斥量有两种方法:

方法1,静态分配:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

(后面的是一个宏) 

 方法2,动态分配:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict
attr);
参数:
mutex:要初始化的互斥量
attr:NULL

销毁互斥量


销毁互斥量需要注意:
1.使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
2.不要销毁一个已经加锁的互斥量
3.已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号

调用 pthread_ lock 时,可能会遇到以下情况: 

 1.互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
2.发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

 代码

 

        线程对于锁的竞争能力可能会不同,这里的usleep()用于模拟抢票的后续动作 ,如果我们不添加这段代码,就会出现一个线程把票抢完的现象,因为如果没有这个usleep,某个线程一释放锁就又拿到了锁,其它线程在该线程访问临界资源时都是被挂起的,被唤醒需要时间,所以其它进程肯定都是竞争不过这个线程的。

        类似这种,在纯互斥环境下,如果锁分配不够合理,容易导致其它线程的饥饿问题

但是这种饥饿问题不是必然的,不是说有互斥必有饥饿问题,只是需要合适的环境。 

      我们也可以联想到生活中其它的类似情形,例如我是一个大学生,学校里有一个vip自习室,里面只允许一个人 自习。假设某时刻我不想自习了,我就拿上自习室的钥匙出门,把钥匙挂回门口,但是我一看门口乌泱泱一大群人在外面等,我顿时又想自习了,因为我离门近,所以他们竞争钥匙竞争不过我,我这样进进出出,但始终占着钥匙,就引发了其它人的“饥饿问题”。

        此时自习室管理员看不下去了,就进行如下规定,这就是同步!

 

        同时我们还要知道,在临界区中,线程也是可以被切换的,具有锁的线程是不怕切换的,即使切换线程,锁依然在那个线程手里,其它线程依然不能进入临界区访问临界资源。依然用上面的那个自习室例子,假如我要出去上厕所,要离开自习室,但是我可以把钥匙也带在身上,其它人也是进不去自习室的。因此其它线程只关心这个线程是持有锁的状态还是已经释放了锁,要和他们一起竞争的状态。所以对于其它线程来说,那个持有锁的线程访问临界区内的临界资源的行为就是原子的。

        通过上面的学习,我们是不是也意识到了,锁本身也是共享资源!所以申请和释放锁本身就被设计成了原子性的。 

 互斥量实现原理探究

        1.经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
        2.为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 (意思就是即使有多个cpu,这个过程还是原子的)

        下面是一段伪代码

        关于这里的al,经典的英特尔8086的cpu内的这个寄存器不叫eax,而是叫ax,是16位,(eax里面的e就是extern,加强的意思eax是扩展了的,是32位),所以eax里面分成两部分一部分就是ah,另一部分就是al。

lock做的工作

1,把al内的数据清成0

 2.把al和mutex内的数据作交换

unlock的工作

把mutex内的数据改成1

 (这一工作不管是持有锁还是未持有锁的线程都能做这个工作,原因后续再细说,可以用于解决死锁相关的问题) 

所以谁拿到了这个1,谁就拿到了锁。所以这个过程中最重要的是交换的过程。

  

对于锁,其中有许多各种各样的问题。 

例如

1.别的线程申请了锁,其它的线程去把它释放了。

2.例如我四个线程,其中三个要加锁解锁,另外一个不加锁解锁,想怎么访问就怎么访问,不遵守锁的规则。

代码是程序员写的,这种代码当然能写,但是我们需要为自己的代码负责,上述这两种情况就是属于代码bug了

锁的封装

RAII风格的锁

用一个临时变量,出了相应的域就会自动销毁,然后调用相应函数释放锁。

 

四、可重入VS线程安全 

 概念


1.线程安全:多个线程并发同一段代码时,不会出现不同的结果,就称为线程安全。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,导致其它线程崩溃等,就会出现线程安全问题。
2.重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数

 常见的线程不安全的情况

 1.不保护共享变量的函数
2.函数状态随着被调用,状态发生变化的函数
3.返回指向静态变量指针的函数
4.调用线程不安全函数的函数

 常见的线程安全的情况

 1.每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
2.类或者接口对于线程来说都是原子操作
3.多个线程之间的切换不会导致该接口的执行结果存在二义性

 常见不可重入的情况

1. 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
2.调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
3.可重入函数体内使用了静态的数据结构

 常见可重入的情况

1.不使用全局变量或静态变量
2.不使用用malloc或者new开辟出的空间
3.不调用不可重入函数
4.不返回静态或全局数据,所有数据都有函数的调用者提供
5.使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据 

五、死锁 

        死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 

死锁四个必要条件


        互斥条件:一个资源每次只能被一个执行流使用(前提)
        请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放(原则)
        不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺(原则)
        循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系 (重要条件)

避免死锁


1.破坏死锁的四个必要条件
2.加锁顺序一致
3.避免锁未释放的场景
4.资源一次性分配 

一个锁实际上也可能出现死锁问题,例如一个持有锁的线程在没释放锁的情况下又申请锁。

六、 Linux线程同步 

同步概念与竞态条件

同步:同步是在保证数据安全的情况下 ,让我们的线程访问临界资源有一定的顺序性。

 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。

条件变量


        当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。 

快速提出解决方案

 我们根据上面的抢票问题我们快速提出解决方案

        线程先尝试申请资源,如果申请失败了,我们就可以将这个线程放入等待队列里。而当某个线程释放了锁,那么这个线程先会被放到等待队列的队尾,然后再通过一个“铃铛”来唤醒等待队列中的线程,可以唤醒一个,也可以把所有线程都唤醒。被唤醒的线程去申请锁。

        条件变量需要给我们提供一种简单的通知机制和一个等待队列

条件变量函数 

 初始化

cond即condition 

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:
cond:要初始化的条件变量
attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒所有线程
int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个线程

条件变量函数的应用

1.我们怎么知道我们要让线程去休眠?

一定是临界资源不就绪!临界资源也是有状态的。

2.我们怎么知道临界资源就绪不就绪?

我们自行写代码判断的,例如一个队列空了,满了,要我们自己去检测 。而判断临界资源也是对临界资源的访问,因此条件变量的等待函数是放在加锁之后的。

3.条件变量的等待是会释放锁的,因为如果不释放锁,锁会跟着那个线程一起去休眠,其它的线程也将一直都获取不到锁。而在线程被唤醒并返回时,它又会重新持有锁。所以这个条件等待的函数在执行时会释放锁,在返回时会让线程重新去竞争锁。

七、生产者消费者模型 

为何要使用生产者消费者模型

         生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

 生产者消费者模型优点

        1.解耦(我们这里具体介绍一下耦合的方式和解耦的方式。例如我们现在有一个main函数,我们要进行加法操作,于是我们在main函数里面调用了add函数,我们main函数向add函数传参不就相当于生产数据吗,add函数接收参数进行运算并返回不就相当于消费数据吗,在这种单一线程的处理模式下,在我们调用add函数时,main函数是在等待的,因此这种模型相对生产者消费者模型是耦合度高的。而如果我们采用生产者消费者模型,将main函数和add函数拆成两个线程,由main函数向add函数提供数据,而add函数定期处理数据,这样的处理模式就属于耦合度低的模型)
        2.支持并发(多线程自然是支持并发的)
        3.支持忙闲不均 (由于缓存的存在,可以有效处理生产者和消费者因为速度不一致产生的效率问题,如果没有缓存,消费者每次需要买东西都要到供货商那里申请,由于速度不同,一定会有一方浪费大量时间等待。而有了缓存,可以使生产者预先生产好一批的数据,然后消费者统一到缓存里面拿)

 

本质就是执行流在做通信 ---不过我们现在研究的重点是如何安全高效地通信

        但是只要是共享资源,那就可能出现并发问题,因此我们需要 明确并发场景,找到这几个主体之间的关系。同时我们需要明确,无论是生产者还是消费者,他们都是由线程来扮演的。

        1.生产者之间相互竞争,比如一家厂家给商店提供了某种货物,那它肯定是希望在它供货时没有其它厂家供货

        2.而消费者和消费者之间也是互斥关系。我们日常在商店里买东西不需要和其它人竞争的原因实际上是因为商品足够多,也就是资源足够多。当商品数量为1时,每个消费者自然希望自己购买东西的时候其他人不能买,即互斥

        3.生产者和消费者之间有互斥和同步两种关系。

        不过互斥关系主要是保障安全问题,防止进行生产和消费行为时出现不确定性。例如此时商店中没有货物了,这时候生产者正好提供货物,消费者正好购买货物,此时消费者究竟购买到货物了吗?由于操作系统调度线程的随机性,的谁也不知道先消费还是先提供,此时具有不确定性。同时我们希望消费者购买商品,生产者提供商品具有一定的顺序,因此也需要同步

快速实现CP模型 

生产者消费者模型是一个操作系统上的概念 

基于BlockingQueue的生产者消费者模型 

        在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞) 

 C++ queue模拟阻塞队列的生产消费模型

        为了便于理解,我们以单生产者,单消费者,来进行讲解 

        首先我们补充一个高低水位的概念,例如我们的数据量要是大于高水位,那么消费者就一直消费,如果我们的数据量小于低水位,那么生产者就生产数据。

BlockQueue.hpp 

main.cc

但是谁说这里只能传递整数?

我们还可以传递任务! 

阻塞队列代码 

 BlockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <pthread.h>

template <class T>
class BlockQueue
{
    static const int defalutnum = 20;
public:
    BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
        // low_water_ = maxcap_/3;
        // high_water_ = (maxcap_*2)/3;
    }

    // 谁来唤醒呢?
    T pop()
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == 0) // 因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过再临界资源内部判断的。
        {
            // 如果线程wait时,被误唤醒了呢??
            pthread_cond_wait(&c_cond_, &mutex_); // 你是持有锁的!!1. 调用的时候,自动释放锁,因为唤醒而返回的时候,重新持有锁
        }
        
        T out = q_.front(); // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足
        q_.pop();

        // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
        pthread_cond_signal(&p_cond_); // pthread_cond_broadcast
        pthread_mutex_unlock(&mutex_);

        return out;
    }

    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == maxcap_){ // 做到防止线程被伪唤醒的情况
            // 伪唤醒情况
            pthread_cond_wait(&p_cond_, &mutex_); //1. 调用的时候,自动释放锁 2.?
        }
        // 1. 队列没满 2.被唤醒 
        q_.push(in);                    // 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足
        // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
private:
    std::queue<T> q_; // 共享资源, q被当做整体使用的,q只有一份,加锁。但是共享资源也可以被看做多份!
    //int mincap_;
    int maxcap_;      // 极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;

    // int low_water_;
    // int high_water_;
};

Task.hpp

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

 main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>

void *Consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

    while (true)
    {
        // 消费
        Task t = bq->pop();

        // 计算
        // t.run();
        t();

        std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;
        // t.run();
        // sleep(1);
    }
}

void *Productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    int x = 10;
    int y = 20;
    while (true)
    {
        // 模拟生产者生产数据
        int data1 = rand() % 10 + 1; // [1,10]
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 生产
        bq->push(t);
        std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
        sleep(1);
    }
}

int main()
{
    srand(time(nullptr));

    // 因为 321 原则
    // BlockQueue 内部可不可以传递其他数据,比如对象?比如任务???
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c[3], p[5];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(p + i, nullptr, Productor, bq);
    }

    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }
    delete bq;
    return 0;
}

Makefile

blockqueue:main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f blockqueue

生产者消费者模型内容补充 

1.生产者消费者模型到底为什么高效

 

        我们生产者的数据从哪里来呢?从我们的用户或者网络来,同时,生产者生产的数据也是需要花时间获取的!

        而我们消费者消费了数据之后,后续的加工处理也需要花费时间。

        我们 不能仅仅看生产和消费,要整体地去看待。

        在生产和消费的逻辑中,为了保障线程安全,需要加锁,以及需要同步,此时生产和消费的动作实际上是串型的,这个过程并不高效,那么为什么生产者消费者模型是高效的呢?因为我们还需要看获取数据和加工处理数据的过程。当消费者消费数据时,生产者可以获取数据,这样,生产者访问非临界资源而消费者访问临界资源,这一过程是并发执行的,因此高效。

2.线程伪唤醒问题 

        当我们有多个生产者和消费者的时候,就有可能出现线程伪唤醒问题,这里以多生产者为例。 

        假设我们现在有3个生产者,1个消费者。最开始的时候阻塞队列已经满了,因此三个生产者经过判断都进入了休眠。现在消费者正好消费了一次,阻塞队列有一个空位,那么此时消费者应该唤醒休眠的生产者线程,可是此时消费者线程误把全部的生产者线程都唤醒了(或者多次执行唤醒单个线程的代码)。这时候三个线程中先有一个拿到了锁,释放锁后,它正常唤醒消费者线程,可是我们先前已经唤醒了另外的两个生产者进程,他们有可能先竞争到锁并向后执行,可是此时是不满足push的条件的。这种就属于伪唤醒的情况。为了解决这种情况,我们判断时不采用if判断,而是采用while进行循环判断。

3.多生产多消费代码 

        我们只需要加一个for循环即可,为什么这样就可以了呢?本质是因为321原则,这些线程都是共用一个阻塞队列的,不需要更改,生产者和消费者两方也没有变动,并且我们也维持了三组关系,由于我们使用的是同一个锁,所以三组关系内部的互斥和同步还是被维持的 。综上我们并不需要改变其它东西

 八、POSIX信号量
 

信号量接口 

         POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步

 初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()

        上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):

 基于环形队列的生产消费模型

 

         要弄清楚P和C关注的分别是什么资源!

信号量 环形队列代码 

Task.hpp

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

 RingQueue.hpp

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>

const static int defaultcap = 5;

template<class T>
class RingQueue{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
    {
        sem_init(&cdata_sem_, 0, 0);
        sem_init(&pspace_sem_, 0, cap);

        pthread_mutex_init(&c_mutex_, nullptr);
        pthread_mutex_init(&p_mutex_, nullptr);
    }
    void Push(const T &in) // 生产
    {
        P(pspace_sem_);

        Lock(p_mutex_); // ?
        ringqueue_[p_step_] = in;
        // 位置后移,维持环形特性
        p_step_++;
        p_step_ %= cap_;
        Unlock(p_mutex_); 

        V(cdata_sem_);

    }
    void Pop(T *out)       // 消费
    {
        P(cdata_sem_);

        Lock(c_mutex_); // ?
        *out = ringqueue_[c_step_];
        // 位置后移,维持环形特性
        c_step_++;
        c_step_ %= cap_;
        Unlock(c_mutex_); 

        V(pspace_sem_);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);

        pthread_mutex_destroy(&c_mutex_);
        pthread_mutex_destroy(&p_mutex_);
    }
private:
    std::vector<T> ringqueue_;
    int cap_;

    int c_step_;       // 消费者下标
    int p_step_;       // 生产者下标

    sem_t cdata_sem_;  // 消费者关注的数据资源
    sem_t pspace_sem_; // 生产者关注的空间资源

    pthread_mutex_t c_mutex_;
    pthread_mutex_t p_mutex_;
};

Makefile

RingQueueTest:Main.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f RingQueueTest

 Main.cpp

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"

using namespace std;

struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};

void *Productor(void *args)
{
    // sleep(3);
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;
    int len = opers.size();
    while (true)
    {
        // 1. 获取数据
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        // 2. 生产数据
        rq->Push(t);
        cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;

        sleep(1);
    }
    return nullptr;
}

void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;

    while (true)
    {
        // 1. 消费数据
        Task t;
        rq->Pop(&t);
       
        // 2. 处理数据
        t();
        cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
        // sleep(1);

    }
    return nullptr;
}

int main()
{
    srand(time(nullptr) ^ getpid());
    RingQueue<Task> *rq = new RingQueue<Task>(50);

    pthread_t c[5], p[3];

    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Productor-" + std::to_string(i);

        pthread_create(p + i, nullptr, Productor, td);
    }
    for (int i = 0; i < 1; i++)
    {
        ThreadData *td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(c + i, nullptr, Consumer, td);
    }

    for (int i = 0; i < 1; i++)
    {
        pthread_join(p[i], nullptr);
    }
    for (int i = 0; i < 1; i++)
    {
        pthread_join(c[i], nullptr);
    }

    return 0;
}

        单生产者单消费者的情况下,信号量是可以帮我们维护同步和互斥的。 

        在多生产者消费者的情况下,为了维护线程安全,以及生产者消费者模型,我们需要让各生产者之间保持互斥关系,同时各消费者之间也要保持互斥关系。因此我们需要两把锁来各自维护生产者和消费者的互斥关系。

        现在我们来思考一下,到底是信号量的获取放在锁的获取释放里面,还是锁的获取释放在信号量的里面。

        把锁的获取释放放在里面是更优的,原因有如下几个方面

        1.信号量本身的获取是原子的,并不需要被保护,并且锁内的代码量应该越短越好。

         2.如果我们把信号量的获取放在锁内,那么每个线程都需要先获取锁,然后才能获取信号量,是一个串型的过程,而如果把信号量的获取放在外面,则其它线程可以在持有锁的线程访问临界资源时,进行信号量的获取,可以提高并发度。

        P(pspace_sem_);

        Lock(p_mutex_); // ?
        ringqueue_[p_step_] = in;
        // 位置后移,维持环形特性
        p_step_++;
        p_step_ %= cap_;
        Unlock(p_mutex_); 

        V(cdata_sem_);

九、线程池

一个小注意点

        这里的HandlerTask如果放在类内的话,会有一个隐藏的this指针,参数类型就对不上了,所以我们需要用static修饰,这样它就不是成员函数了,就是静态成员函数了,就没有this指针了。

        但是静态的函数不能直接访问类内的成员函数,所以我们同时需要将this指针传进去。 

线程池代码 (单例模式)

Task.hpp

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

 ThreadPool.hpp

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>

struct ThreadInfo
{
    pthread_t tid;
    std::string name;
};

static const int defalutnum = 5;

template <class T>
class ThreadPool
{
public:
    void Lock()
    {
        pthread_mutex_lock(&mutex_);
    }
    void Unlock()
    {
        pthread_mutex_unlock(&mutex_);
    }
    void Wakeup()
    {
        pthread_cond_signal(&cond_);
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&cond_, &mutex_);
    }
    bool IsQueueEmpty()
    {
        return tasks_.empty();
    }
    std::string GetThreadName(pthread_t tid)
    {
        for (const auto &ti : threads_)
        {
            if (ti.tid == tid)
                return ti.name;
        }
        return "None";
    }

public:
    static void *HandlerTask(void *args)
    {
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        std::string name = tp->GetThreadName(pthread_self());
        while (true)
        {
            tp->Lock();

            while (tp->IsQueueEmpty())
            {
                tp->ThreadSleep();
            }
            T t = tp->Pop();
            tp->Unlock();

            t();
            std::cout << name << " run, "
                      << "result: " << t.GetResult() << std::endl;
        }
    }
    void Start()
    {
        int num = threads_.size();
        for (int i = 0; i < num; i++)
        {
            threads_[i].name = "thread-" + std::to_string(i + 1);
            pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
        }
    }
    T Pop()
    {
        T t = tasks_.front();
        tasks_.pop();
        return t;
    }
    void Push(const T &t)
    {
        Lock();
        tasks_.push(t);
        Wakeup();
        Unlock();
    }
    static ThreadPool<T> *GetInstance()
    {
        if (nullptr == tp_) // ???
        {
            pthread_mutex_lock(&lock_);
            if (nullptr == tp_)
            {
                std::cout << "log: singleton create done first!" << std::endl;
                tp_ = new ThreadPool<T>();
            }
            pthread_mutex_unlock(&lock_);
        }

        return tp_;
    }

private:
    ThreadPool(int num = defalutnum) : threads_(num)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }
    ThreadPool(const ThreadPool<T> &) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c
private:
    std::vector<ThreadInfo> threads_;
    std::queue<T> tasks_;

    pthread_mutex_t mutex_;
    pthread_cond_t cond_;

    static ThreadPool<T> *tp_;
    static pthread_mutex_t lock_;
};

template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

这个代码处

  static ThreadPool<T> *GetInstance()
    {
        if (nullptr == tp_) // ???
        {
            pthread_mutex_lock(&lock_);
            if (nullptr == tp_)
            {
                std::cout << "log: singleton create done first!" << std::endl;
                tp_ = new ThreadPool<T>();
            }
            pthread_mutex_unlock(&lock_);
        }

        return tp_;
    }

        我们需要加锁保证多线程同时获取单例的时候的线程安全,但是从此之后,每次其它线程进入进行判断时,都会不断进行加锁解锁,这给系统造成的负担比较大。因此外面把最外面也加一个if的条件判断,让不满足条件的线程不需要加锁解锁,而是直接返回,而此时即使有多线程并发访问,最后获取锁并且new一块空间的只有一个线程能做,其它线程也就能并发获取单例了。

 Makefile

ThreadPool:Main.cpp
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ThreadPool

Main.cpp

#include <iostream>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"


pthread_spinlock_t slock;


int main()
{
    // pthread_spin_init(&slock, 0);
    // pthread_spin_destroy(&slock);

    // 如果获取单例对象的时候,也是多线程获取的呢?
    std::cout << "process runn..." << std::endl;
    sleep(3);
    // ThreadPool<Task> *tp = new ThreadPool<Task>(5);
    ThreadPool<Task>::GetInstance()->Start();
    srand(time(nullptr) ^ getpid());

    while(true)
    {
        //1. 构建任务
        int x = rand() % 10 + 1;
        usleep(10);
        int y = rand() % 5;
        char op = opers[rand()%opers.size()];

        Task t(x, y, op);
        ThreadPool<Task>::GetInstance()->Push(t);
        //2. 交给线程池处理
        std::cout << "main thread make task: " << t.GetTask() << std::endl;

        sleep(1);
    }
}

十、封装原生线程

封装原生线程代码

 Thread.hpp

#pragma once

#include <iostream>
#include <string>
#include <ctime>
#include <pthread.h>

typedef void (*callback_t)();
static int num = 1;

class Thread
{
public:
    static void *Routine(void *args)
    {
        Thread* thread = static_cast<Thread*>(args);
        thread->Entery();
        return nullptr;
    }
public:
    Thread(callback_t cb):tid_(0), name_(""), start_timestamp_(0), isrunning_(false),cb_(cb)
    {}
    void Run()
    {
        name_ = "thread-" + std::to_string(num++);
        start_timestamp_ = time(nullptr);
        isrunning_ = true;
        pthread_create(&tid_, nullptr, Routine, this);
    }
    void Join()
    {
        pthread_join(tid_, nullptr);
        isrunning_ = false;
    }
    std::string Name()
    {
        return name_;
    }
    uint64_t StartTimestamp()
    {
        return start_timestamp_;
    }
    bool IsRunning()
    {
        return isrunning_;
    }
    void Entery()
    {
        cb_();
    }
    ~Thread()
    {}
private:
    pthread_t tid_;
    std::string name_;
    uint64_t start_timestamp_;
    bool isrunning_;

    callback_t cb_;
};

Makefile

Thread:Main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f Thread

Main.cc

#include <iostream>
#include <unistd.h>
#include <vector>
#include "Thread.hpp"

using namespace std;

void Print()
{
    while(true)
    {
        printf("haha, 我是一个封装的线程...\n");
        sleep(1);
    }
}

int main()
{
    std::vector<Thread> threads;

    for(int i = 0 ;i < 10; i++)
    {
        threads.push_back(Thread(Print));
    }

    for(auto &t : threads)
    {
        t.Run();
    }

    
    for(auto &t : threads)
    {
        t.Join();
    }
    // Thread t(Print);
    // t.Run();

    // cout << "是否启动成功: " << t.IsRunning() << endl;
    // cout << "启动成功时间戳: " << t.StartTimestamp() << endl;
    // cout << "线程的名字: " << t.Name() << endl;

    // t.Join();

    return 0;
}

十一、 线程安全的单例模式 

 什么是单例模式

 单例模式是一种 "经典的, 常用的, 常考的" 设计模式.

 什么是设计模式

大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式 

单例模式的特点 

         某些类, 只应该具有一个对象(实例), 就称之为单例.
        例如一个男人只能有一个媳妇.
        在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.

饿汉实现方式和懒汉实现方式 

[洗完的例子]
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式. 

懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度 

总的消耗的时间不会改变,但是会改变时间分配结构。 

知识补充

        全局变量,静态变量,代码编译之后,只要刚变成进程,这些全局变量肯定就都已经有了,不像栈区,你需要的时候才有。全局变量,静态变量,加载的时候就已经有了

 

饿汉方式实现单例模式

 

template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};

只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例.

懒汉方式实现单例模式

 

template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};

存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了.

懒汉方式实现单例模式(线程安全版本)

// 懒汉模式, 线程安全
template <typename T>
class Singleton {
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance() {
if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst == NULL) {
inst = new T();
}
lock.unlock();
}
return inst;
}
};

 注意事项:
1. 加锁解锁的位置
2. 双重 if 判定, 避免不必要的锁竞争
3. volatile关键字防止过度优化

十二、 STL,智能指针和线程安全

 STL中的容器是否是线程安全的?

 不是.
        原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.
        而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
        因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.

智能指针是否是线程安全的? 

1.对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
2.对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数. 

十三、其他常见的各种锁 

1.悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
2.乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
3.CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
4.自旋锁,公平锁,非公平锁? 

自旋锁 

我们可以先回忆一下我们之前学习过的互斥锁

        使用lock时如果申请锁不成功,就会挂起,而trylock如果申请锁不成功的话就会出错返回 。所以如果我想基于应用层实现这个自选锁,我们可以打一个while死循环,然后再trylock申请锁。

 

不过系统实际上已经给我们设计好了

 

 

 

        系统的自旋锁接口和互斥锁是高度类似的,使用方法也一样。 

        它也有阻塞(lock)和非阻塞(trylock)两种实现方式。

        如果是lock,它在申请失败的时候是不断继续申请的,也就是说它在函数里面已经封装了一个while循环了,而trylock则和互斥锁里面的trylock一样,如果没有申请到,那就直接返回。

十四、 读者写者问题

 读写锁

        在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。 

 

 注意:写独占,读共享,读锁优先级高

读写锁接口 

设置读写优先

 

int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/

        默认是读者优先,但是可能会导致写者饥饿情况,但是实际上这本来就是正常的情况,因为一般情况 下读的需求本就远远大于写的需求。

初始化

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);

 销毁

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

加锁和解锁

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

解锁是共用接口的

 

 

         读者写者问题和消费者生产者问题比较类似,唯一比较不同的是读者之间是共享的,为什么消费者之间是互斥的,而读者之间是共享的呢?

        因为消费者会将数据拿走,而读者仅仅只读数据。 

下面有一段伪代码,帮助我们理解读者优先。

        读者优先就是如果读者和写者一起来了,那么读者先进入,等到前面所有读者读完,然后写者才去写。

        写者优先则是,如果读者和写者一起来了,那么读者和写者都先不要进入,等到前面所有读者读完,写者先写,然后读者再进去读。

1.如果开始的时候有读者要来读,就给写者加个锁, 不过往后再有读者来读就不需要加锁了。

此时写者有两种情况,一种是写者还没申请锁,将要申请锁,那么写者申请的时候就会被阻塞,一直得等到所有读者都读完,读者才会把写者的锁释放,写者才能获取锁,进行写操作

另一种是写者此时已经申请了锁了,那么这时候这第一个读者就会被阻塞住,一直等到写着写完,它才立马申请锁,继续执行下去。

2.在中间读取数据部分,各读者是并发执行的

3.在读者count--操作时,也要加锁保证原子。

相关文章:

  • Tauri+React+Ant Design跨平台开发环境搭建指南
  • Maven
  • B3DM转换成PLY
  • Spark之数据倾斜调优
  • 【后端】Flask vs Django vs Node.js 对比分析
  • Linux系统(以Ubuntu为例)安装高版本nodejs
  • 爬虫:一文掌握 Celery 分布式爬虫,及对应实战案例
  • 《AI模型变形记:从绿巨人到Hello Kitty的魔幻减肥营》
  • 【计算机网络——概述】
  • 【3D格式转换SDK】HOOPS Exchange技术概览(二):3D数据处理高级功能
  • 如何合理设置请求间隔?
  • 如何优化百度下拉框?下拉框展示规则是怎样的?
  • JavaEE--计算机是如何工作的
  • 优选算法的智慧之光:滑动窗口专题(二)
  • 如何将一台服务器的pip环境迁移到另一个机器?
  • 【开源-常用C/C++命令行解析库对比】
  • jvm内存不够,怎么重新分配
  • 蓝桥杯4T平台(串口打印电压值)
  • 【Prometheus】prometheus如何监控k8s集群
  • 工程化与框架系列(16)--前端路由实现
  • 印度最新发声:对所有敌对行动均予以反击和回应,不会升级冲突
  • 85后清华博士黄佐财任湖北咸宁市咸安区委副书记、代区长
  • 习近平《在庆祝中华全国总工会成立100周年暨全国劳动模范和先进工作者表彰大会上的讲话》单行本出版
  • 公积金利率降至历史最低!多项房地产利好政策落地,购房者置业成本又降了
  • 于东来再次回应玉石质疑:邀请前往胖东来深入考察,随时欢迎各方调查
  • 演员扎堆音乐节,是丰富了舞台还是流量自嗨?