【linux】多线程
一. 线程概念
- 线程是进程内的一个分支。线程的执行粒度要比进程细,粒度可理解为执行单元的规模或精细程度,线程置执行进程中的一部分代码,编译时代码会被分配地址,所以线程能明确知道执行哪一部分代码
- 进程具有独立性,创建进程时会给每个进程分配pcb、地址空间和页表;线程不具有独立性,创建时只会为其创建pcb,在其所属的进程下与进程和其他线程共享地址空间、页表等资源
- 进程不管是用户态还是内核态都是在地址空间内执行代码,所以地址空间是进程的资源窗口!,任何执行流执行都要有资源,线程在进程的地址空间内运行
- 有了很多线程,操作系统就需要管理,就需要先组织再描述,不同的操作系统会有不同的解决方案,比如windows会为线程再创建一个struct tcb来管理所有线程,但Linux中认为线程和进程本质上都是执行流都是一样的,所以就复用了进程的数据结构和管理算法来管理线程(struct tast_struct),Linux中没有真正一一上的线程,而是用进程的内核数据结构模拟的线程
- 为了避免混淆,可以把进程和线程都看作为执行流,在cpu调度时对其而言线程和进程都是执行流,Linux中的执行流是轻量级进程,执行流的轻量级程度(线程<=执行流<=进程)
1.重新定义线程和进程
-
线程是操作系统调度的基本单位
-
重新理解进程:
之前认为进程=内核数据结构+代码和数据,先阶段以内核中角度来理解,进程是承担分配系统资源的基本实体 -
线程是进程内部的执行流资源,执行流本省不是资源,但它是对资源的使用者+管理对象
2.重谈进程地址空间(4)
- 线程目前分配资源,本质就是分配地址空间范围
- 理解页表:cpu地址总线一次能传输的位数决定了可表示的地址数量,以32位架构(x86)系统下为例,虚拟地址空间大小为2^32字节=4GB,若页表需要完整表示地址空间范围那么就需要(存储每一个地址信息的大小*可表示总地址数大小),这样一个页表所占用的空间大小超过4GB,这还只是一个进程所占用的空间,内核中有那么多进程,那么页表总共所占用的资源超出内存范围,别说还有其他资源也需要空间
- 这就证明页表肯定不是这样设计的!32位系统下虚拟地址也是32位,会将它分为三组,10、10、12位分别,页表也随之分为页目录和二级页表两个部分
所以在语言层面中取变量地址都是最低的起始地址原因相同,都是通过起始地址+类型偏移量来定位物理地址
3.线程周边概念
- 线程比进程更加轻量化:
1.创建和释放更加轻量化:都只需要对线程的pcb进行调度即可
2.切换更加轻量化:在cpu中有一个寄存器cache,专门存储经常访问到的数据,因为进程和同属线程是共享资源,所以cache缓存的是热数据,线程间的切换不需要重新cache数据,因为同属于进程,不需要切换上下文,进程间切换不仅需要重新cache数据还需要带走进程上下文。at /proc/cpuinfo 查看cache size - 线程其他优点:
1.能充分利用多处理器的可并行数量
2.在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
3.计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现。注意;该情况不一定多线程最好,因为频繁切换也需要消耗,一般来说线程数量跟cpu数量对应效率最好
4.I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。 - 线程缺点:
1.性能损失:
一个很少被外部事件阻塞的计算密集型线程往往无法与其他线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
2.健壮性降低:
编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
3.缺乏访问控制:
进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。比如可重入函数
4.编程难度提高:
编写与调试一个多线程程序比单线程程序困难得多 - 线程异常:
进程出现异常时,其下所有线程都会退出,因为进程是线程的生存环境,线程是进程内的执行单元,它的存在和运行完全依赖于进程的资源和上下文。
一个线程异常,若影响了共享资源其他线程也都要退出 - 线程共享进程数据,但也拥有自己的一部分数据:
线程ID
一组寄存器
栈(存储运行产生的临时变量)
errno
信号屏蔽字
调度优先级
- 进程的多个线程共享 同一地址空间,代码段、数据段都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
文件描述符表(关键)
每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)
当前工作目录
用户id和组id
4.线程id和进程地址空间布局
- clone是创建轻量级进程的系统调用,由于参数多并且有些参数用不了的原因,用户不直接使用,clone就被线程库封装了,将clone接口封装在底层,向上层提供ptread系列接口
- 每创建一个线程,都要在底层调用clone,在库中都要为线程开辟维护一个栈
- 线程在底层对应的是一个个执行流(pcb),但用户所需要关心的字段都由线程库来维护,操作系统知道知道这些线程信息嘛?不,因为操作系统中没有线程的概念
- 每个线程都有自己的调用链,所以需要独立的栈结构来存储调用过程中产生的临时变量
- 地址空间中的栈区域,是真进程(主线程)的栈,其他线程的栈区在线程库中,虽然都是各自有一份独立的栈(注意区分独立和私有),但只要知道地址其他线程依旧能访问,因为本质还是在进程地址空间内
- 线程创建从进程加载线程库开始,经过线程库创建线程控制块、准备参数,再到内核通过clone系统调用创建轻量级进程,最后由线程库完成收尾工作,实现了用户空间和内核空间的协同工作,使得新线程能够作为进程内的一个执行单元独立运行,同时又能与其他线程高效共享进程资源。
- 区分TID与LWP:
关联:TID 是 LWP 的唯一内核标识,每个 LWP 对应一个 TID,二者一一绑定
区别:LWP 是内核中可调度的线程实体,TID 是标识这个实体的整数编号。
与用户态的关系:用户空间通过 pthread_t 操作线程,内核通过 TID 管理 LWP,线程库负责二者的映射。 - 区分TID与pthread_tid:
pthread_t tid是用户态线程操作的 “接口标识,值为对应tcb的起始地址,是线程库暴露给用户的 “线程身份证”,仅用于用户态代码对线程的管理操作比如使用pthread_系列函数
TID 是内核为每个轻量级进程LWP分配的唯一整数,仅用于内核对线程的底层管理,存储在线程控制块tcb中 - 验证pthread_t tid为地址值
string toHex(pthread_t tid)//将地址转化为16进制
{char hex[64];snprintf(hex,sizeof(hex),"%p",tid);return hex;
}
void* threadRoutine(void* args)
{while(true){cout<<"thread id:"<<toHex(pthread_self())<<endl;//printf("%p",pthread_self());//也可以手动控制sleep(1);}
}
int main()
{pthread_t tid;pthread_create(&tid,nullptr,threadRoutine,(void*)"thread 1");cout << "main thread create thead done, new thread id : " << toHex(tid) << endl;pthread_join(tid, nullptr);return 0;
}
- 在不同操作系统和 pthread 库的实现中,pthread_t 有不同的底层定义, 并且在打印输出时,我们通常会根据其底层实现,将其转换为合适的进制展示,但 pthread_self() 本身获取的标识符没有固定的进制属性
二. 线程控制
- 由于内核中没有明确的线程概念,只有轻量级进程的概念,所以内核不会直接提供线程的系统调用,只会提供轻量级进程的系统调用clone(下文介绍),作为用户,当然希望能直接使用线程的接口,所以Linux开发者们设计了pthread线程库,在应用层上堆轻量级进程接口进行封装,为用户提供直接线程的接口,几乎所有Linux平台都自带这个库,在Linux中编写多线程代码时需要使用第三方pthread库
- 使用注意:
1.与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是“pthread_”打头的
2.要使用这些函数库,要通过引入头文<pthread.h>
3.链接这些线程函数库时要使用编译器命令的“-lpthread”选项,类似于动态库的链接原理,都是运行时加载到内存中然后通过页表映射到每一个进程或线程共享的地址空间中,区别在于链接时动态库还需要-L -I的选项显示路径,但pthread库已在Linux中安装好,只需表明链接哪一个库即可(-I选项) - 区分gettid与pthread_self
- pthread_t pthread_self(void)是线程库中的函数,工作在用户空间。它返回的是当前线程在用户空间的标识符,用于线程库对线程进行管理,运用pthread_系列函数
- gettid:是一个系统调用,直接与内核交互。它用于获取当前线程在内核中的线程 ID,即轻量级进程 ID(LWP )
1.创建线程
- pthread_create
功能:创建一个新的线程
原型
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(start_routine)(void), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码,线程返回的错误码不设置系统全局变量中的errno,而是将错误码通过返回值返回
- 代码测试:
#include<iostream>
#include<string>
#include<pthread.h>
#include<cstdlib>
#include<unistd.h>using namespace std;int g_val=10;//定义全局变量void* threadRoutine(void* args)
{const char* name=(const char*)args;while(true){printf("%s, pid:%d, g_val:%d, &g_val:%p\n",name,getpid(),g_val,&g_val);sleep(1);}
}
int main()
{pthread_t tid;pthread_create(&tid,nullptr,threadRoutine,(void*)"thread 1");while(true){printf("main thread pid: %d, g_val: %d, &g_val: %p, create new thread:%p\n",getpid(),g_val,&g_val,tid);sleep(1);g_val++;}return 0;
}mythread:mythread.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm mythread
运行结果发现两个执行流分别打印语句,证明线程被创建,是进程的执行分支,同时能共享全局变量。ps -aL查看轻量级进程,LWP与PID相等的为主线程也就是进程,LWP(light weight process轻量级进程)是内核中线程的标识符
2.线程等待
- 为什么需要线程等待?参考进程
已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
创建新的线程不会复用刚才退出线程的地址空间。
功能:等待线程结束
原型
int pthread_join(pthread_t thread, void value_ptr);
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值,作为输出型参数,代表指针的地址所以用二级指针。不关心返回值时可以设为nullptr。类比:若要在函数中修改一个 int 变量,需传递 int;同理,若要修改一个 void 指针(即接收线程返回的 void* 地址),需传递 void**
返回值:成功返回0;失败返回错误码
通过pthread_join得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED(宏定义为-1)。
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
注意:主线程调用时是默认等待的,只需要考虑退出信息,不需要考虑异常,因为一旦出现异常整个进程都退出了,没有意义
3.线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数return,线程退出不影响进程。这种方法对主线程不适用,从main函数return相当于调用exit。
- 线程可以调用pthread_ exit终止自己。
- 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。
注意:不能直接调用exit,exit是用来直接终止进程的,不能用来终止线程
- pthread_exit函数
功能:线程终止
原型:
void pthread_exit(void *value_ptr);
参数:
value_ptr:value_ptr不要指向一个局部变量。需要注意,pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者是用malloc分配的,不能在线程函数的栈上分配,因为当其它线程得到这个返回指针时线程函数已经退出了。
返回值:
无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
4.取消线程
- pthread_cancel函数
功能:取消一个执行中的线程
原型
int pthread_cancel(pthread_t thread);
参数
thread:线程ID
返回值:成功返回0;失败返回错误码
5.代码测试
class Request
{
public:Request(int start,int end,const string &threadname): start_(start),end_(end),threadname_(threadname){}
public:int start_;int end_;string threadname_;
};class Response
{
public:Response(int result,int exitcode):result_(result),exitcode_(exitcode){}
public:int result_;int exitcode_;
};void* sumCount(void *args)// 线程的参数和返回值,不仅仅可以用来进行传递一般参数,也可以传递对象!!
{Request *rq=static_cast<Request*>(args);Response *rsp=new Response(0,0);//在堆上开辟空间,确保退出函数还可以继续访问for(int i=rq->start_;i<=rq->end_;i++){cout<<rq->threadname_<<"is runing,calling "<<i<<endl;rsp->result_+=1;usleep(100000);//微秒级别}delete rq;//pthread_exit((void*)100);//主动退出,这里也只是在合适的位置做演示,若退出会导致创建的rsp资源未传出去导致段错误return rsp;//线程默认退出方式
}
int main()
{pthread_t tid;Request *rq=new Request(1,100,"thread 1");//在sumCount函数中被释放资源pthread_create(&tid,nullptr,sumCount,rq);//pthread_cancel(tid);若此时终止进程会导致段错误,没有及时释放资源void *ret;pthread_join(tid,&ret);//主线程阻塞等待Response *rsp=static_cast<Response *> (ret);cout<<"rsp->result: "<<rsp->result_<<",exitcode"<<rsp->exitcode_<<endl;delete rsp;return 0;
}
- c++11后支持多线程
#include<thread>
using namespace std;void threadrun()
{while(true){cout<<"I am a new thread for c++"<<endl;sleep(1);}
}int main()
{thread t1(threadrun);t1.join();return 0;
}
- 通过验证c++也可以创建线程,在Linux中封装了原生线程库,函数名都很熟悉,记得要包含头文件
- c++的线程,底层封装原生线程库,具有跨平台性,windows下c++底层封装的Windows中线程库,对于上层使用者来说编写代码没有区别,差异在底层封装
6.分离线程
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心返回信息,可以告诉系统线程退出时自动释放线程资源,主线程就不再管了
- 可以在主线程中实现分离,也可以在子线程中实现自己的分离
-分离线程只会自动回收以下资源;
内核级资源:轻量级进程(LWP)的 PCB、寄存器上下文、内核栈等内核管理的资源。
线程库管理的用户态资源:线程库维护的线程控制块(TCB)、线程私有栈(如果由线程库分配)等。
其余资源需要手动释放,如堆上创建的资源。若等到进程退出再由系统回收会造成资源泄漏
#define NUM 3struct threadData
{string threadname;
};string toHex(pthread_t tid)//
{char buffer[128];snprintf(buffer,sizeof(buffer),"0x%x",tid);return buffer;
}
void InitThreadData(threadData*td,int number)
{td->threadname="thread-"+to_string(number);
}void* threadRoutine(void *args)//所有线程都执行该函数,可重入函数
{pthread_detach(pthread_self());//自己分离线程threadData*td=static_cast<threadData*>(args);string tid=toHex(pthread_self());int pid=getpid();int i=0;while(i<NUM){printf("threadname;%s,tid:%s,pid:%d\n",td->threadname.c_str(),tid.c_str(),pid);//sleep(1);i++;}delete td;return nullptr;
}
int main()
{vector<pthread_t>tids;//记录每个线程的标识符信息for(int i=0;i<NUM;i++)//创建多个线程{pthread_t tid;threadData*td=new threadData;InitThreadData(td,i);pthread_create(&tid,nullptr,threadRoutine,td);tids.push_back(tid);}sleep(3
);//确保线程都创建成功// for(auto i:tids)// {// pthread_detach(i);//主线程来分离线程// }for(int i=0;i<tids.size();i++)//等待回收多个线程{int n=pthread_join(tids[i],nullptr);printf("n=%d,who=0x%x,why;%s\n",n,tids[i],strerror(n));}return 0;
}
注意:线程分离和线程等待只能二选一,若pthread_detach后还pthread_join会产生错误,后者的返回值会报错Invalid argument
7.周边概念验证
- 堆是共享的
#include<iostream>
#include<cstdio>
#include<cstring>
#include<vector>
#include<unistd.h>
#include<pthread.h>using namespace std;#define NUM 3struct threadData
{string threadname;
};string toHex(pthread_t tid)//
{char buffer[128];snprintf(buffer,sizeof(buffer),"0x%x",tid);return buffer;
}
void InitThreadData(threadData*td,int number)
{td->threadname="thread-"+to_string(number);
}void* threadRoutine(void *args)//所有线程都执行该函数,可重入函数
{threadData*td=static_cast<threadData*>(args);string tid=toHex(pthread_self());int pid=getpid();int i=0;while(i<NUM){printf("threadname;%s,tid:%s,pid:%d\n",td->threadname.c_str(),tid.c_str(),pid);sleep(1);i++;}delete td;return nullptr;
}
int main()
{vector<pthread_t>tids;//记录每个线程的标识符信息for(int i=0;i<NUM;i++)//创建多个线程{pthread_t tid;threadData*td=new threadData;//在堆上开辟空间,每一个线程都有自己一个独立的空间InitThreadData(td,i);//这里td会将地址拷贝给线程,当这个临时指针变量出栈后被销毁,但其所指向的地址空间还在,避免了非法访问,这就证明了堆空间也是被线程共享的,只是这里让每个线程有了自己的堆空间,知道起始地址就可以互相访问pthread_create(&tid,nullptr,threadRoutine,td);tids.push_back(tid);}for(int i=0;i<tids.size();i++)//等待回收多个线程{int n=pthread_join(tids[i],nullptr);printf("n=%d,who=0x%x,why;%s\n",n,tids[i],strerror(n));}return 0;
}mythread:mythread.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm mythread
每一个被创建的线程都会执行NUM次调用函数,都可以访问栈上创建的变量td所指向的堆空间,每个线程独享一份,但知道地址就可以访问
线程的局部存储
#define NUM 3
__thread int g_val=100;
int *p=nullptr;//用来记录堆空间地址struct threadData
{string threadname;
};
//__thread threadData td;局部存储支支持内置类型
string toHex(pthread_t tid)//
{char buffer[128];snprintf(buffer,sizeof(buffer),"0x%x",tid);return buffer;
}
void InitThreadData(threadData*td,int number)
{td->threadname="thread-"+to_string(number);
}void* threadRoutine(void *args)//所有线程都执行该函数,可重入函数
{pthread_detach(pthread_self());//自己分离线程threadData*td=static_cast<threadData*>(args);if(td->threadname=="thread-2") p=&g_val;//访问线程2的局部变量string tid=toHex(pthread_self());int pid=getpid();int i=0;while(i<NUM){printf("threadname;%s,tid:%s,pid:%d\n,g_val:%d,&g_val:%p\n",td->threadname.c_str(),tid.c_str(),pid,g_val,&g_val);//sleep(1);i++;}delete td;return nullptr;
}
int main()
{vector<pthread_t>tids;//记录每个线程的标识符信息for(int i=0;i<NUM;i++)//创建多个线程{pthread_t tid;threadData*td=new threadData;InitThreadData(td,i);pthread_create(&tid,nullptr,threadRoutine,td);tids.push_back(tid);}sleep(1);//确保线程都创建成功cout<<"main thread get a local value, val:"<<*p<<",&val:"<<p<<endl;// for(auto i:tids)// {// pthread_detach(i);//主线程来分离线程// }for(int i=0;i<tids.size();i++)//等待回收多个线程{int n=pthread_join(tids[i],nullptr);printf("n=%d,who=0x%x,why;%s\n",n,tids[i],strerror(n));}return 0;
}
__thread是编译器的编译选项,称为线程的局部存储,已知全局变量是被所有的线程同时看到并访问的,局部存储就相当于一个私有的全局变量。通过运行结果可得,全局变量g_val被每个线程都私有化了一份,同时如果知道地址就能直接访问每个线程的局部变量。
应用场景:
当创建线程时,可以创建一个局部变量,作为每个线程私有的全局变量,提前存储好线程的相关信息,这样后续可以减少系统调用次数,直接使用局部变量,提高一些效率。
三.线程互斥
- 简单模拟一个抢票器
using namespace std;
#define NUM 3
class threadData
{
public:threadData(int number){threadname="thread-"+to_string(number);}
public:string threadname;
};
int tickets=100;
void *getTickets(void *args)
{threadData *td=static_cast<threadData*>(args);const char*name=td->threadname.c_str();while(true){if(tickets>0){usleep(1000);printf("who=%s,get a ticket: %d\n",name,tickets);tickets--;}else break;}printf("%s ...quit\n",name);return nullptr;
}int main()
{vector<pthread_t> tids;vector<threadData *> thread_datas;for(int i=0;i<NUM;i++)//创建多个线程{pthread_t tid;threadData*td=new threadData(i);thread_datas.push_back(td);pthread_create(&tid,nullptr,getTickets,thread_datas[i]);tids.push_back(tid);}for(auto thread:tids)//统一等待{pthread_join(thread,nullptr);}for(auto td:thread_datas)//统一释放{delete td;}return 0;
}
运行结果如图,已知抢票只能从一张开始,没有0张票,所以出现了数据不一致问题,与多线程的并发访问有关系
- 分析其中操作tickets–:
1.先将tickets读入到cpu寄存器中
2.cpu内部进行–操作
3.将计算结果协会内存
每一步都对应一条汇编操作,在执行这些汇编时,线程是可以被切换的,所以可能导致数据不一致问题。
- 辨析概念:
寄存器中内容不等于寄存器内容,线程子啊执行时候,将共享数据,加载到cpu寄存器本质:把数据内容,变成自己的上下文,以拷贝的方式,给自己单独拿了一份。
cpu中会进行逻辑运算和算数运算,if条件判断就是在cpu中进行逻辑运算,将自己内存中存储的全局变量-票数加载到内存中,但后续进行打印和加减操作时哟要重新从内存中读取数据,这就可能一个线程在执行完if判断后的过程中发生进程切换导致已经计算过的数据没有及时保存,当下一个线程运行时会重新从自己内存中读取数据,这样就造成了数据不一致问题 - 解决方法:对共享数据的任何访问,保证任何时候只有一个执行流访问,这就是互斥(目标方式)—锁(工具),Linux上提供的这把锁叫互斥量mutex,是锁的一种具体实现
1.互斥量的接口
-
mutex介绍:
在多线程编程中,mutex(全称为 mutual exclusion,互斥体) 是一种同步机制,用于保证多个线程对共享资源的互斥访问,即同一时间只允许一个线程操作共享资源,从而避免数据竞争和不一致的问题。 -
1.静态分配:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER,不需要手动销毁
-
2.动态分配:int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
参数:mutex:要初始化的互斥量 ;attr:attr 用于定制互斥锁的行为(类型、共享范围等),传入 NULL 则使用默认属性(普通锁,仅进程内私有)
- 3.销毁互斥量:int pthread_mutex_destroy(pthread_mutex_t *mutex); 不要销毁一个已经加锁的互斥量。已经销毁的互斥量,要确保后面不会有线程再尝试加锁
- 4.互斥量的枷锁和解锁:
- int pthread_mutex_lock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
可能会遇到以下情况:返回值:成功返回0,失败返回错误号,互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
- int pthread_mutex_unlock(pthread_mutex_t *mutex);
- 代码测试:
using namespace std;
//pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//全局变量方式
pthread_mutex_t lock;
#define NUM 3
class threadData
{
public:threadData(int number,pthread_mutex_t *mutex){threadname="thread-"+to_string(number);lock=mutex;}
public:string threadname;pthread_mutex_t *lock;
};
int tickets=1000;
void *getTickets(void *args)
{threadData *td=static_cast<threadData*>(args);const char*name=td->threadname.c_str();while(true){//pthread_mutex_lock(&lock);//全局变量方式pthread_mutex_lock(td->lock);if(tickets>0){usleep(1000);printf("who=%s,get a ticket: %d\n",name,tickets);tickets--;//pthread_mutex_unlock(&lock);//全局变量方式pthread_mutex_unlock(td->lock);}else {//pthread_mutex_unlock(&lock);//全局变量方式pthread_mutex_unlock(td->lock);break;//要在解锁后}}printf("%s ...quit\n",name);return nullptr;
}int main()
{pthread_mutex_init(&lock,nullptr);vector<pthread_t> tids;vector<threadData *> thread_datas;for(int i=0;i<NUM;i++)//创建多个线程{pthread_t tid;threadData*td=new threadData(i,&lock);thread_datas.push_back(td);pthread_create(&tid,nullptr,getTickets,thread_datas[i]);tids.push_back(tid);}for(auto thread:tids)//统一等待{pthread_join(thread,nullptr);}for(auto td:thread_datas)//统一释放{delete td;}pthread_mutex_destroy(&lock);return 0;
}
对临界区加锁后,受票系统可以正常工作
2.锁的相关概念
枷锁的本质:是用时间来换取安全
枷锁的表现:线程对于临界区代码串,行执行
加锁原则:尽量要保证临界区代码越少越好,这样其他线程饥饿的时间就少
- 锁本身就是共享资源,所以申请锁和释放锁本身就被设计为了原子性操作,与共享内存中的信号量原理相同
- 申请锁成功,才能往后执行,不成功阻塞等待,执行完临界区中代码后需要释放锁,其他线程才能继续访问临界区
- 在临界区中,线程可以被切换的,在线程被切出去的时候,是持有锁被切走的,我不在期间照样没有人能进入临界区访问临界资源
- 对于其他线程来讲,一个线程要么没有锁要么释放锁,当前线程访问临界区的过程对于其他线程是原子的
- 生活场景理解:
可以认为有一个单人自习室,外面挂 着一把钥匙,只有拿着钥匙才能进去,进去后其他人只能排队等待。为了确保资源使用的有序性,要求外面来的,必须排队,同时出来的人,不能立马重新申请锁,必须排到队列的尾部。纯互斥场景,如果锁分配不够合理,容易导致其他线程的饥饿问题,所以适合纯互斥的场景就用互斥。
3.锁的原理
-
背景知识:
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
-
lock就是pthread_mutex_lock原理:
核心就是第二条xchqb交换语句,交换的本质是把内存中的数据交换到线程的硬件上下文中,也就是把一个共享的锁,让一个线程以一条汇编的方式,交换到自己的上下文中,线程的硬件上下文和栈都是私有的,当前线程就持有锁了 -
举例子理解:
已知寄存器内容不等于寄存器,假如这里有一个线程1和2,线程1先执行汇编,执行完第一条汇编将寄存器中值置0后被线程二切换,线程1会带走cpu中硬件上下文信息,记得下次回来继续执行第二条汇编;此时线程2来执行,执行第二条汇编后将内存中mutex值换到cpu中,申请锁成功,同时内存中mutex被置0,线程2再被线程1切换回来,继续执行刚刚保存的第二条汇编,进行cpu与内存交换发现值为0,申请锁失败,挂起等待。此时锁在线程2手中等到释放其他线程才能继续使用 -
unlock解锁:
为什么第一条汇编语句不是将某个线程的硬件上下文,即将cpu中保存的值1交换到内存mutex中去呢,因为这样就限定了只能拥有锁的人解锁,万一死锁了就所有线程无法访问资源,这时就需要其他线程来帮忙解锁,所以不能这样交换,应该将内存nutex值直接写为1,这样也不会影响申请锁的操作,因为申请锁操作中第一条汇编就是将寄存器置0,不管里面有没有值
4.锁的应用–封装
5.可重入与线程安全
-
概念:
1.线程安全:
多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
2.重入:
同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。 -
常见的线程不安全的情况
1.不保护共享变量的函数
2.函数状态随着被调用,状态发生变化的函数(比如模拟的抢票系统)
3.返回指向静态变量指针的函数(可能引发数据竞争问题)
4.调用线程不安全函数的函数 -
常见的线程安全的情况
1.每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
2.类或者接口对于线程来说都是原子操作
3.多个线程之间的切换不会导致该接口的执行结果存在二义性 -
常见不可重入的情况
1.调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
2.调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
3.可重入函数体内使用了静态的数据结构 -
常见可重入情况
1.不使用全局变量或静态变量
2.不使用用malloc或者new开辟出的空间
3.不调用不可重入函数
4.不返回静态或全局数据,所有数据都有函数的调用者提供
5.使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据 -
可重入与线程安全联系
1.函数是可重入的,那就是线程安全的
2.函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
3.如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。 -
可重入与线程安全区别
1.可重入函数是线程安全函数的一种
2.线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
3.如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的
6.锁的封装
#pragma once#include<pthread.h>
class Mutex
{
public:Mutex(pthread_mutex_t *lock) :lock_(lock) {}void Lock(){pthread_mutex_lock(lock_);}void unLock(){pthread_mutex_unlock(lock_);}
private:pthread_mutex_t *lock_;
};class LockGuard
{
public:LockGuard(pthread_mutex_t *lock):mutex_(lock){mutex_.Lock();}~LockGuard(){mutex_.unLock();}
private:
Mutex mutex_;
};
- Mutex类将 pthread_mutex_lock 和 pthread_mutex_unlock 这两个 C 函数封装为类的成员函数 Lock() 和 Unlock(),使代码更符合 C++ 的面向对象风格,使用更简洁。后续若需要修改锁的行为(如添加日志、错误处理),只需修改 Mutex 类的实现,无需改动所有调用处
- LockGuard类,利用RALL自动管理锁的生命周期,当LockGuard对象被创建时。构造函数会自动加锁,当出作用域对象销毁时析构函数会自动解锁
代码中只要有定义变量和创建对象两行代码,就能完成上述锁的初始化以及申请释放工作
pthread_mutex_t lock;
LockGuard lockguard(&lock);
7.死锁
- 概念:
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 - 死锁的四个必要条件:
1.互斥条件:一个资源每次只能被一个执行流使用
2.请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
3.不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
4.循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系 - 如何避免死锁:
1.破坏死锁的四个必要条件
2.加锁顺序一致
3.避免锁未释放的场景
4.资源一次性分配
四.线程同步
- 条件变量概念:
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。 - 什么是同步:
在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步 - 了解竞态条件:
因为时序问题,而导致程序异常,我们称之为竞态条件。比如在线程场景下,同时向输出流文件写入数据
1.条件变量函数
- 初始化与销毁
参数:
cond:要初始化的条件变量
attr:自定义条件变量的属性,NULL代表默认属性
全局变量的定义方式和mutex互斥量一样,不需要我们初始化和手动释放了 - 等待条件满足–线程去休眠
- pthread_cond_wait
程调用该函数后,会原子性地:释放传入的互斥锁 mutex,避免饥饿问题;进入等待状态,直到被 pthread_cond_signal(唤醒一个等待线程)或 pthread_cond_broadcast(唤醒所有等待线程)通知。被唤醒后,重新获取 mutex 并继续执行。
几个问题:
1.为什么需要互斥锁?
条件的检查(是否满足)和等待操作必须是原子的,否则可能出现 条件刚被满足,但线程还没开始等待,就被切换了进程错过了通知的情况。互斥锁保证了 “检查条件 → 进入等待” 的原子性
2.如何得知要让一个线程去休眠?
去休眠一定是临界资源不就绪,临界资源也是有状态的,临界资源是否就绪是我们判断出来的,对于判断来说也是访问临界资源,所以判断必须发生在加锁之后,所以等待函数必须在加锁之后等待
- pthread_cond_timedwait
与 pthread_cond_wait 类似,但增加了超时机制:线程等待条件满足,直到被通知或超过指定的绝对时间 abstime。如果超时,函数返回错误码(ETIMEDOUT),线程不再等待避免永久挂起
参数:
abstime:一个 timespec 结构体,指定绝对超时时间 - 唤醒等待的线程
五.生产者消费模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
- 优势:
解耦、支持并发、支持忙闲不均 - 记忆方式:
321原则
3.三种关系:生产者与生产者的互斥关系、消费者与消费者的互斥关系、生产者与消费者的互斥同步关系
2.两种角色-生产和消费
1.一个交易场所-特定结构的内存空间
不能只看到中间生产消费部分,还要注意生产前的准备和消费后的处理工作,这才是消费者模型多线程并发高效的原因所在,因为生产消费过程为同步互斥关系,一个交互容器(阻塞队列)只能由一个线程访问,所以其他线程要么等待要么处理自己的准备善后工作,实现高效访问
1.阻塞队列
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元
素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进行操作时会被阻塞)
管道也是类似原理
2.代码实现
- BlockQueue.hpp
#pragma once
#include<queue>
#include<iostream>
#include<cstdio>
#include<pthread.h>
using namespace std;
template<class T>
class BlockQueue
{static const int defaultnum=10;
public:BlockQueue(int num=defaultnum):maxnum(num){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&c_cond,nullptr);pthread_cond_init(&p_cond,nullptr);}T Pop(){pthread_mutex_lock(&mutex_);while(q.size()==0) //若阻塞队列为0.进入等待{pthread_cond_wait(&c_cond,&mutex_);}T tmp=q.front();//满足消费条件才能消费q.pop();pthread_cond_signal(&p_cond);//消费完叫生产者进行生产pthread_mutex_unlock(&mutex_);return tmp;}void push(const T &num)//注意传参方式,左右值都能接收,不能只用引用否则传入临时变量会有未定义行为{pthread_mutex_lock(&mutex_);while(q.size()==maxnum)//阻塞队列满了,进入等待{pthread_cond_wait(&p_cond,&mutex_);}q.push(num);pthread_cond_signal(&c_cond);//生产完,叫消费者来消费pthread_mutex_unlock(&mutex_);}~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond);pthread_cond_destroy(&p_cond);}
private:queue<T> q;//共享资源,这里被当作整体使用,也可以被看作多份int maxnum;//阻塞队列最大资源数量pthread_mutex_t mutex_;pthread_cond_t c_cond;//消费者条件变量pthread_cond_t p_cond;//生产者
};
- main.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<unistd.h>
#include<ctime>void *Consumer(void *args)
{BlockQueue<Task>*bq=static_cast<BlockQueue<Task>*>(args);while(true){//进行消费Task t=bq->Pop();//执行任务t();cout<<"处理任务:"<<t.GetTask()<<"运算结果是:"<<t.GetResult()<<endl;}
}
void *Productor(void*args)
{BlockQueue<Task>*bq=static_cast<BlockQueue<Task>*>(args);int len=opers.size();while(true){//模拟生产者获取数据int data1=rand()%10+1;usleep(1000);int data2=rand()%10;char op=opers[rand()%len];Task t(data1,data2,op);//生产bq->push(t);cout<<"生产了一个任务: "<<t.GetTask()<<"thread id:"<<pthread_self()<<endl;sleep(1);}
}int main()
{srand(time(nullptr));BlockQueue<Task> *bq=new BlockQueue<Task>();//内部可以传自定义类型和内置类型pthread_t c[3],p[5];//多线程场景for(int i=0;i<3;i++)//创建消费者线程{pthread_create(c+i,nullptr,Consumer,bq);}for(int i=0;i<5;i++)//创建生产者线程{pthread_create(p+i,nullptr,Productor,bq);}for(int i=0;i<3;i++)//等待回收{pthread_join(c[i],nullptr);}for(int i=0;i<5;i++){pthread_join(p[i],nullptr);}delete bq;return 0;
}
- Task.hpp
#pragma once
#include<iostream>
#include<string>using namespace std;
string opers="+-*/%";//集中存储合法的运算符,通过find接口快速检查enum{DivZero=1,ModZero,Unknown
};class Task
{
public:Task(int x,int y,char c):data1(x),data2(y),op(c),result(0),exitcode(0){}void run(){switch(op){case '+':result=data1+data2;break;case '-':result=data1-data2;break;case '*':result=data1*data2;break;case '/':{if(data2==0) exitcode=DivZero;else result=data1/data2;}break;case '%':{if(data2==0) exitcode=ModZero;else result=data1%data2;}break;default:exitcode=Unknown;break;}}void operator()()//函数重载,让类的对象可以像函数一样被调用{run();}string GetResult(){string r=to_string(data1);r+=op;r+=to_string(data2);r+='=';r+=to_string(result);r+="[code:]";r+=to_string(exitcode);r+=']';return r;}string GetTask(){string r=to_string(data1);r+=op;r+=to_string(data2);r+="=?";return r;}~Task(){}
private:int data1;int data2;char op;int result;int exitcode;
};
用BlockQueue.hpp来实现阻塞队列,对互斥量和条件变量的库函数进行封装以及阻塞队列细节的实现,用Task》hpp进行任务类的封装,main.cc进行线程控制,整体实现生产者消费模型的多线程并发实现
伪唤醒情况
伪唤醒的后果是,可能多个消费者或生产者被唤醒,但阻塞队列中资源已经为空或者为满,但依然回去访问,这就造成了非法访问的问题,if条件判断调用信号量等待会有问题,因为多个线程被唤醒是在被调用的原来位置,直接向下执行,所以需要while循环,被唤醒后再判断一次阻塞队列中资源是否满足情况
3.POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
- 初始化:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值 - 销毁信号量
- 等待信号量
- wait:
功能:对信号量 sem 执行 “P 操作”(即申请资源)。如果信号量的值大于 0,则将其值减 1 并立即返回;如果信号量的值为 0,则调用线程会被阻塞,直到信号量的值大于 0 或者线程被中断。
2.trywait
尝试对信号量 sem 执行 “P 操作”。与 sem_wait 不同的是,若信号量的值为 0,它不会阻塞线程,而是直接返回错误(errno 设为 EAGAIN)
3.timedwait:
在指定的绝对时间 abs_timeout 内,对信号量 sem 执行 “P 操作”。如果在超时时间内信号量的值仍为 0,则返回错误(errno 设为 ETIMEDOUT) - 发布信号量
会将信号量 sem 的值加 1,在生产消费者模型中,生产者生产完资源后,通过sem_post释放信号,通知消费者可以来消费了
六.基于环形队列的生产消费模型
- 当队列为空或满的时候head和tail指向同一个位置,所以无法判断是空还是满,可以采用预留一个位置来判断空的情况。通过图来理解,若head为生产者,生产了一圈阻塞队列满了,无法再生产,回到起始位置与tail相同。到消费者进行消费,黑色圆圈代表消费者路径,每次消费当前位置后前移一位,把最后一个位置消费完后发现前面位置为空,则证明阻塞队列为空不能再消费了
- 当不空和不满时生产者与消费者一定指向不同位置,可以同时访问。有以下原则:1.指向同一个位置时只能一个人访问 2.消费者不能超过生产者,即绕圈时不能超过它,不能将它套圈
- 信号量的PV操作
信号量 S 的值可以是负数,其绝对值表示 “因信号量不足而阻塞的进程 / 线程数量”
P:将信号量的值减1,然后检查信号量值,大于0说明由可用资源,可以进入临界区访问共享资源,若小于0则进入该信号量的等待队列中,等待其他线程释放资源
V:将信号量的值加1,然后检查信号量,>0说明没有因该信号量阻塞的线程,直接返回。若小于0表示等待队列中有线程因为申请该资源而处于阻塞状态,此时需要从等待队列中唤醒一个阻塞的进程或线程,让其有机会重新尝试执行 P 操作,获取资源。
-
生产者关注阻塞队列中还有多少剩余空间spacesem,消费者关注阻塞队列中还有多少剩余数据dataseem,spacesem初始化为N最大可用空间大小,dataseem初始化为0,这就保证了开始一定时生产者先生产数据 .
-
生产前对spacesem进行P操作(可用空间减1),生产后对datasem进行V操作(资源数量加1)
-
消费前对datasem减1(可消费资源减1),消费后对spacesem加1(资源消耗了,空间就空出来了,加1)
1.代码实现
-
生产者的 P 操作是 “等待空闲空间”,当空间不足时阻塞; 消费者的 V 操作是 “释放空闲空间”,当释放后发现有等待的生产者时,唤醒它。
这种 “阻塞 - 唤醒” 的联动,本质是通过信号量的数值变化(pspace_sem 的增减)传递 “空间资源是否可用”的状态,确保生产者只在有空间时生产,消费者只在有数据时消费,最终实现供需平衡。保证了生产者与消费者的同步互斥关系,再配合互斥锁保护临界区(队列下标更新、数据读写),最终实现线程安全的环形队列的生产者消费者模型。 -
相较于普通的基于阻塞队列的生产者消费模型,信号量本身实现了 “等待 - 唤醒” 逻辑,无需手动判断队列空 / 满(使用条件变量pthread_cond_wait),且通过 P/V 操作的原子性,保证了多线程下资源计数的准确性。
-
RingQueue.hpp
#pragma once
#include <iostream>
#include<vector>
#include<semaphore.h>//POSIX头文件
#include<pthread.h>
using namespace std;
const static int defaultcap=5;template<class T>
class RingQueue
{
private://对信号量的pv操作和互斥量加锁解锁进行封装void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void UnLock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}
public:RingQueue(int cap=defaultcap):cap_(cap),ringqueue_(cap),c_step(0),p_step(0){pthread_mutex_init(&c_mutex,nullptr);pthread_mutex_init(&p_mutex,nullptr);sem_init(&cdata_sem,0,0);//空间中数据资源初始化为0sem_init(&pspace_sem,0,cap);//空间资源初始化为满}void push(const T& in)//生产者生产{//这里先申请信号量在加锁可以提高效率,申请到信号量在去申请锁,当解锁后就可以直接//申请锁不用再等待申请信号量,若信号量申请完了直接去休眠等待唤醒P(pspace_sem);Lock(p_mutex);ringqueue_[p_step++]=in;p_step%=cap_;//保证数据一直在环形队列中;UnLock(p_mutex);V(cdata_sem);}void Pop(T*out){P(cdata_sem);Lock(c_mutex);*out=ringqueue_[c_step++];c_step%=cap_;UnLock(c_mutex);V(pspace_sem);}~RingQueue(){pthread_mutex_destroy(&c_mutex);pthread_mutex_destroy(&p_mutex);sem_destroy(&cdata_sem);sem_destroy(&pspace_sem);}
private:vector<T> ringqueue_;//后面加_都代表为类内元素int cap_;int c_step;//消费者下标int p_step;//生产者下标sem_t cdata_sem;//消费者关注的数据资源sem_t pspace_sem;//生产者关注的空间资源pthread_mutex_t c_mutex;pthread_mutex_t p_mutex;
};
- main.cpp
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<ctime>
#include "RingQueue.hpp"
#include "Task.hpp"using namespace std;
struct ThreadData//为线程函数封装传递多参数数据,线程只允许接收单参数
{RingQueue<Task>*rq;//指向环形队列的指针string threadname;//线程名称
};void *Productor(void* args)
{ThreadData* td=static_cast<ThreadData*>(args);//RingQueue<Task> *rq=td->rq;//这样子也行int len=opers.size();while(true){//生产者获取数据int data1=rand()%10+1;int data2=rand()%10;char op=opers[rand()%len];Task t(data1,data2,op);//生产数据td->rq->push(t);//rq->push(t);cout<<"Productor task done, task is:"<<t.GetTask()<<" who: "<<td->threadname<<endl;sleep(1);}return nullptr;
}void *Consumer(void*args)
{ThreadData* td=static_cast<ThreadData*>(args);RingQueue<Task> *rq=td->rq;while(true){Task t;//为接收队列中生产者的任务,提前准备的 “空载体”rq->Pop(&t);//进行消费t();//处理数据cout<<"Consumer get task, task is: "<<t.GetTask()<<" name:"<<td->threadname<<" result is: "<<t.GetResult()<<endl;sleep(1);}return nullptr;
}
int main()
{srand(time(nullptr));RingQueue<Task> *rq=new RingQueue<Task>(20);pthread_t c[5],p[3];for(int i=0;i<3;i++)//创建生产者线程{ThreadData*td=new ThreadData();td->rq=rq;td->threadname="Productor-"+to_string(i);pthread_create(p+i,nullptr,Productor,td);}for(int i=0;i<5;i++)//创建消费者线程{ThreadData*td=new ThreadData();td->rq=rq;td->threadname="Consumer-"+to_string(i);pthread_create(c+i,nullptr,Consumer,td);}for(int i=0;i<3;i++)//回收生产者线程{pthread_join(p[i],nullptr);}for(int i=0;i<5;i++)//回收消费者线程{pthread_join(c[i],nullptr);}return 0;
}
- Task.hpp于五.生产消费模型代码实现中相同
ringqueue:main.cppg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringqueue;
七.线程池
类似于之前实现过的进程池。是一种使用模式。本质是以空间换时间,每次只能向任务队列中加入一个任务,由一个线程池中线程去其中拿一个运行,任务队列中可能有多个任务按顺序存放好,务队列支持并发添加多个任务,工作线程并发竞争执行任务,两者都由调度、锁和条件变量决定。线程池本质上就是生产消费模型。
- 线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 - 线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
*3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,
出现错误.
1.代码实现
基于线程池的任务调度系统
- ThreadPool.hpp
用于定义线程池相关的类和函数,它的作用是实现对线程的管理和复用,提高程序执行效率,避免频繁创建和销毁线程带来的开销。类包含了线程池的各种成员变量和成员函数。
#pragma once
#include<iostream>
#include<vector>
#include<string>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;struct ThreadInfo
{pthread_t tid;string name;
};static const int defaultnum=5;
template<class T>
class ThreadPool
{
public://默认函数、条件变量与互斥量的接口封装ThreadPool(int num=defaultnum):threads_(num){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&cond_,nullptr);}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_,&mutex_);}bool IsQueueEmpty(){return tasks_.empty();}string GetThreadName(pthread_t tid){for(const auto &t:threads_){if(t.tid==tid) return t.name;}return "None";}
public:void Push(const T&in)//生产资源,创建任务,插入任务队列由线程池中排队的线程执行{Lock();tasks_.push(in);Wakeup();Unlock();}T Pop()//消费资源,由于Handler函数中加锁覆盖了Pop的整个生命周期所以不需要加锁这里;也就是执行任务,从任务队列中读取{T t=tasks_.front();tasks_.pop();return t;}void Start()//创建并运行线程{int num=threads_.size();for(int i=0;i<num;i++){//类内成员函数第一个参数默认为this指针,是隐式的,相当于参数默认+1,与规定参数数量的函数不符;//Handler函数传入this指针就是传入当前对象实例的地址,通过其可以访问类内成员与函数,实现对线程池的操作threads_[i].name="thread-"+to_string(i);pthread_create(&threads_[i].tid,nullptr,HandlerTask,this);}}//pthread_create 要求线程函数是普通的全局函数或者静态成员函数,而 HandlerTask //是类的非静态成员函数。非静态成员函数有一个隐藏的 this 指针参数static void *HandlerTask(void*args){//static_cast 仅在编译时检查转换是否 “语法合法”,不保证运行时安全ThreadPool<T>*tp=static_cast<ThreadPool<T>*>(args);string name=tp->GetThreadName(pthread_self());while(true){tp->Lock();//while循环防止误唤醒while(tp->IsQueueEmpty()) tp->ThreadSleep();//代替Pop处理判空问题T t=tp->Pop();tp->Unlock();t();cout<<name<<" [run, "<<"result:"<<t.GetResult()<<endl;}}
private:vector<ThreadInfo> threads_;queue<T> tasks_;pthread_mutex_t mutex_;pthread_cond_t cond_;};
- main.cpp
#include<iostream>
#include<ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"int main()
{ThreadPool<Task>*tp=new ThreadPool<Task>(5);tp->Start();srand(time(nullptr));while(true){//构建任务int x=rand()%10+1;usleep(1000);int y =rand()%5;int len=opers.size();char op=opers[rand()%len];Task t(x,y,op);tp->Push(t);//交给线程池处理cout<<"main thread make a task: "<<t.GetTask()<<endl;sleep(1);}return 0;
}
ThreadPool<…>:指定线程池是一个模板类实例,(5):调用线程池的带参构造函数,传入参数 5,表示初始化线程池时会创建 5 个工作线程,tp 是指向新创建线程池对象的指针,后续可以通过 tp 调用线程池的成员函数,实现对线程池的控制和任务提交。
主要功能是模拟创建任务插入任务队列中交由线程池中线程运行。
- Task.hpp与前文代码实现一致
2.线程的面向对象封装
3.基于C++的多线程任务池
与1.代码实现中的基于线程池的任务调度系统不同的是,1.中用vector模拟的队列存储的是线程的信息,线程的操作函数需要外部来管理,而vector threads_存储的一个类,封装了完整生命周期和行为,运用接口将线程操作函数封装在了内部,外部直接调用接口使用更加方便。
- ThreadPool.hpp
管理一个vector来存储所有工作线程,同时维护一个queue任务队列,存储待执行的运算任务,提供生产接口和消费调度接口
#pragma once
#include<iostream>
#include<vector>
#include<string>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;
#include"Thread.hpp"// struct ThreadInfo
// {
// pthread_t tid;
// string name;
// };static const int defaultnum=5;
template<class T>
class ThreadPool
{
public://默认函数、条件变量与互斥量的接口封装ThreadPool(int num=defaultnum):threads_(num){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&cond_,nullptr);}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);//等待所有线程结束for(auto& thread:threads_){//回收正在运行的if(thread.IsRunning()) thread.Join();}}void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_,&mutex_);}bool IsQueueEmpty(){return tasks_.empty();}string GetThreadName(pthread_t tid){for( auto t:threads_){if(t.Gettid()==tid) return t.Name();}return "None";}void Push(const T&in)//生产资源{Lock();tasks_.push(in);Wakeup();Unlock();}T Pop()//消费资源,由于Handler函数中加锁覆盖了Pop的整个生命周期所以不需要加锁这里{T t=tasks_.front();tasks_.pop();return t;}void Start()//创建并运行线程{int num=threads_.size();for(int i=0;i<num;i++){//1.设置线程名称threads_[i].SetName("Thread-"+to_string(i));//2.绑定线程的任务处理函数,threads_ 中的 Thread 对象是默认构造的(未绑定任何入口函数)threads_[i]=Thread(HandlerTask,this);threads_[i].Run();}}static void *HandlerTask(void*args){ThreadPool<T>*tp=static_cast<ThreadPool<T>*>(args);string name=tp->GetThreadName(pthread_self());while(true){tp->Lock();//while循环防止误唤醒,线程在此处被唤醒后会重新判断条件是否成立while(tp->IsQueueEmpty()) tp->ThreadSleep();//代替Pop处理判空问题T t=tp->Pop();tp->Unlock();t();cout<<name<<" [run, "<<"result:"<<t.GetResult()<<endl;}}
private:vector<Thread> threads_;queue<T> tasks_;pthread_mutex_t mutex_;pthread_cond_t cond_;};
- Thread.hpp
封装线程的操作接口,支持绑定任务回调函数,管理线程的相关属性
#pragma once
#include<iostream>
#include<string>
#include<ctime>
#include<pthread.h>using namespace std;
//回调函数的本质是 “将函数作为参数传递,让其他代码在合适时机调用”,核心价值是解耦和灵活扩展
// 修正:回调函数需要接收线程池指针(否则无法访问线程池资源)
typedef void* (*callback_t)(void*args);//定义函数指针
static int num=1;class Thread
{
public:static void*Routine(void*args){Thread* td=static_cast<Thread*>(args);td->Entery();return nullptr;}//函数重载默认构造Thread():cb_(nullptr),name_(""),isrunning_(false),start_timestamp_(0),tid_(0){}//Thread 类需要同时接收回调函数和回调参数(即线程池指针)Thread(callback_t cb,void*args):cb_(cb),cb_args(args),name_(""),isrunning_(0),start_timestamp_(0),tid_(0){}void Run(){//Thread*td=new Thread();//name_="Thread-"+to_string(num++);//避免与线程池中业务层的命名冲突start_timestamp_=time(nullptr);isrunning_=true;pthread_create(&tid_,nullptr,Routine,this);}void Join(){if(isrunning_){isrunning_=false;pthread_join(tid_,nullptr);}}uint64_t StartTimeStamp(){return start_timestamp_;}bool IsRunning(){return isrunning_;}void Entery(){if(cb_){cb_(cb_args);// 回调时传入当前Thread实例(供线程池获取线程信息)}}pthread_t Gettid(){return tid_;}string Name()//获取名称{return name_;}void SetName(const string&name)//设置名称{name_=name;}~Thread(){}
private:pthread_t tid_;string name_;uint64_t start_timestamp_;bool isrunning_;callback_t cb_;void *cb_args;
};
- main.cc
main函数创建线程池对象,通过指针来直接管理,同时不断生产任务放入队列中等待执行
#include<iostream>
#include<unistd.h>
#include<vector>
#include"Thread.hpp"
#include"ThreadPool.hpp"
#include"Task.hpp"using namespace std;
int main() {ThreadPool<Task>* tp = new ThreadPool<Task>(5);tp->Start(); // Start 方法中为每个线程绑定 HandlerTask 并启动srand(time(nullptr));while (true) {// 生成任务并推入线程池int x = rand() % 10 + 1;usleep(1000);int y = rand() % 5;int len = opers.size();char op = opers[rand() % len];Task t(x, y, op);tp->Push(t);cout << "main thread make a task: " << t.GetTask() << endl;sleep(1);}return 0;
}
- makefile(改一个可执行文件即可)、Task.hpp与前文代码相同
八.设计模式
- 设计模式是解决软件设计问题的成熟思路,不是强制规则而是一种工具,简单来说就是针对一些经典的常见场景,给定了一些对应的解决方案。
1.单例模式
-
某些类, 只应该具有一个对象(实例), 就称之为单例.例如一个男人只能有一个媳妇.在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据.
-
饿汉方式
在程序启动(类加载阶段)就创建单例实例,后续无论是否使用该实例,实例都已存在。类比 “提前做好饭,饿了直接吃”。
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
优点是线程安全,程序启动时只会执行一次静态成员初始化,不存在多线程竞争问题,也无需处理线程同步,返回已存在的实例快。缺点是当未使用程序启动时创建好的单例,可能造成资源浪费
- 懒汉方式
直到第一次调用GetInstance()时,才创建单例实例,后续调用直接返回已创建的实例。类比 “饿了才做饭,不饿不做”。
懒汉模式是将申请和使用的过程分开了,整体使用时间没变,但是各过程的时间配比发生变化,在局部上效率提高了
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
优点是按需创建,不占用资源避免内存浪费,缺点是线程天然不安全,需要加锁处理双重检查问题
安全版本:
// 懒汉模式, 线程安全
template <typename T>
class Singleton {
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance() {
if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst == NULL) {
inst = new T();
}
lock.unlock();
}
return inst;
}
};
注意事项:- 加锁解锁的位置- 双重 if 判定, 避免不必要的锁竞争- volatile关键字防止过度优化
2.单例模式下的线程池
- ThreadPool.hpp
#pragma once
#include<iostream>
#include<vector>
#include<string>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;struct ThreadInfo
{pthread_t tid;string name;
};static const int defaultnum=5;
template<class T>
class ThreadPool
{
public:void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_,&mutex_);}bool IsQueueEmpty(){return tasks_.empty();}string GetThreadName(pthread_t tid){for(const auto &t:threads_){if(t.tid==tid) return t.name;}return "None";}
public:void Push(const T&in)//生产资源{Lock();tasks_.push(in);Wakeup();Unlock();}T Pop()//消费资源,由于Handler函数中加锁覆盖了Pop的整个生命周期所以不需要加锁这里{T t=tasks_.front();tasks_.pop();return t;}void Start()//创建并运行线程{int num=threads_.size();for(int i=0;i<num;i++){//类内成员函数第一个参数默认为this指针,是隐式的,相当于参数默认+1,与规定参数数量的函数不符;//Handler函数传入this指针就是传入当前对象实例的地址,通过其可以访问类内成员与函数,实现对线程池的操作threads_[i].name="thread-"+to_string(i);pthread_create(&threads_[i].tid,nullptr,HandlerTask,this);}}//pthread_create 要求线程函数是普通的全局函数或者静态成员函数,而 HandlerTask //是类的非静态成员函数。非静态成员函数有一个隐藏的 this 指针参数static void *HandlerTask(void*args){ThreadPool<T>*tp=static_cast<ThreadPool<T>*>(args);string name=tp->GetThreadName(pthread_self());while(true){tp->Lock();//while循环防止误唤醒while(tp->IsQueueEmpty()) tp->ThreadSleep();//代替Pop处理判空问题T t=tp->Pop();tp->Unlock();t();cout<<name<<" [run, "<<"result:"<<t.GetResult()<<endl;}}static ThreadPool<T> *GetInstance()//单例模式{if(nullptr==tp_)//避免每次调用都加锁,只有第一次需要加锁,后面直接返回{pthread_mutex_lock(&lock_);if(nullptr==tp_){cout<<"singleton create done first!"<<endl;tp_=new ThreadPool<T>();}pthread_mutex_unlock(&lock_);}return tp_;}
private://单例模式ThreadPool(int num=defaultnum):threads_(num){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&cond_,nullptr);}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}ThreadPool(const ThreadPool<T>&) =delete;const ThreadPool<T>& operator=(const ThreadPool<T>&)=delete;
private:vector<ThreadInfo> threads_;queue<T> tasks_;pthread_mutex_t mutex_;pthread_cond_t cond_;//单例模式懒汉方式实现static ThreadPool<T> *tp_;static pthread_mutex_t lock_;
};template<class T>
ThreadPool<T> *ThreadPool<T>::tp_=nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::lock_=PTHREAD_MUTEX_INITIALIZER;
注意:
1.类中静态成员属于类,不属于对象,全局就只有一份
2.全局变量或静态变量在加载时就已经存在了
3.全局变量的全局性就体现在创建时进程地址空间中已经加载好,随进程结束释放
4. c++知识复习,静态成员在类外初始化,静态成员函数只能访问静态成员变量
- main.cpp
#include<iostream>
#include<ctime>
#include "ThreadPool.hpp"
#include "Task.hpp" int main()
{//ThreadPool<Task>*tp=new ThreadPool<Task>(5);//tp->Start();ThreadPool<Task>::GetInstance()->Start();//单例模式srand(time(nullptr));while(true){//构建任务int x=rand()%10+1;usleep(1000);int y =rand()%5;int len=opers.size();char op=opers[rand()%len];Task t(x,y,op);ThreadPool<Task>::GetInstance()->Push(t);//交给线程池处理cout<<"main thread make a task: "<<t.GetTask()<<endl;sleep(1);}return 0;
}
Task.hpp与makefile都与上文相同
区别:
3.STL容器是否为线程安全的
不是:
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响.而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.
4.智能指针是否为线程安全的
- 对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
- 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这 个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.
注意:引用计数的原子操作仅保证 shared_ptr自身管理的线程安全(即 “对象何时释放” 是安全的),shared_ptr 指向的对象本身的访问仍需手动同步(如加锁),标准库不负责保护对象的线程安全;
九.其他常见的各种锁
- 悲观锁:
在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:
每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS操作:
当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
- 自旋锁:
核心特点是线程获取锁失败时不会被挂起,而是在用户态循环重试(忙等) 直到获取锁成功。它适用于 “锁持有时间极短” 的场景,能避免线程内核态切换的开销。
从等待行为角度来看,锁可以分为挂起等待锁和自旋锁,具体使用取决于线程执行临界区的时长
- 接口介绍:
pthread_spin_lock:
适合在临界区执行时间非常短的场景下使用,因为自旋等待的时间很短,避免了线程上下文切换带来的开销。但如果临界区执行时间较长,会造成 CPU 资源的浪费。
pthread_spin_trylock:
适用于不确定锁是否可用,不想因等待锁而阻塞的场景。比如在多线程程序中,某些代码块希望在锁可用时才执行临界区操作,不可用时则继续执行其他操作,此时pthread_spin_trylock就比较合适。
pthread_mutex_trylock:
操作的是互斥锁。当尝试获取的互斥锁已被占用时,调用线程不会等待,而是直接返回一个表示获取失败的错误码(通常是EBUSY) ,线程继续执行后续代码,不会像自旋锁那样持续占用 CPU 进行等待。
其中init第二个参数代表自旋锁的共享属性:PTHREAD_PROCESS_PRIVATE:表示自旋锁仅在当前进程内的线程间共享(私有锁),宏定义的值为 0。
PTHREAD_PROCESS_SHARED:表示自旋锁可在多个进程的线程间共享(共享锁),宏定义的值通常为 1(具体依赖系统实现,但一定非 0)。
1.读者写者问题
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
总共3种关系:写写(互斥竞争)、写读(互斥同步)、读读(共享,因为不会修改资源)
2种角色:读者、写者
1个交易场所:数据交换地点
口诀:写独占,读共享,读锁优先级高
写独占保证了写操作的安全性(修改时无干扰);
读共享提高了读操作的并发度(多个读线程同时工作);
读锁优先级高避免了 “写锁饥饿读操作”(即写操作频繁时,读操作长期无法行)
- 简单了解操作: