线程同步和互斥
线程互斥
进程间、线程间的互斥相关背景概念
互斥量mutex
• ⼤部分情况,线程使⽤的数据都是局部变量,变量的地址在线程栈的空间内,这种情况,变量
归属单个线程,其他线程⽆法访问这些变量。
• 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,又叫临界资源,包括全局变量和静态变量,可以通过数据的共享,完成线程之间的交互。
• 多个线程同时进入临界区,执行临界区的代码操作临界资源,会带来并发问题
案例:
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>int ticket=1000;
void* routine(void* args)
{char* id= (char*)args;while(1){if(ticket >0){usleep(1000);--ticket;printf("%s 买了个票,还剩 %d 张\n",id, ticket);}else{break;}}return nullptr;}int main()
{pthread_t t1,t2,t3,t4;pthread_create(&t1, nullptr, routine, (void*)"thread 1");pthread_create(&t2, nullptr, routine, (void*)"thread 2");pthread_create(&t3, nullptr, routine, (void*)"thread 3");pthread_create(&t4, nullptr, routine, (void*)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);return 0;
}
当剩下一张票的时候,四个线程可能会同时判断条件if(ticket >0),然后进入临界区,最后票变成负的,这就有问题
线程并发问题
线程并发问题是由于多个线程同时进入临界区,执行代码操作共享资源导致的问题
在上面的示例中,由于cpu自身切换调度线程,还有usleep休眠,以及对共享资源--ticket操作是多指令的,这三个因素都导致了临界区代码不可能一下子执行完,于是一个线程在临界区执行代码时,另一个线程就有很大概率也在此时进入临界区,从而有并发问题
如何理解 --ticket不是原子的?
取出--ticket部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax #
600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) #
600b34 <ticket>从这里我们可以看到--ticket是由多条指令组成的,所以一个线程执行--ticket时就可能被cpu线程切换调度机制给打断,所以--ticket不是原子操作。
当然了,原子操作如果操作的是共享资源,那么多个线程都执行原子操作也是会有并发问题的,也就是原子操作和并发问题没关系
单指令操作肯定是原子操作,这是cpu切换调度机制决定的,当然也有多指令的原子操作
为避免线程并发问题,linux提供了名叫互斥量的锁,来保证最多只有一个线程可以进入临界区

互斥量的接⼝
初始化互斥量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const
pthread_mutexattr_t *restrict attr);
参数:
mutex:要初始化的互斥量
attr:互斥量属性为什么互斥量需要初始化,因为互斥量也有对应的资源,比如互斥量的等待队列
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误码调用pthread_mutex_unlock时,会将互斥量解锁,并唤醒互斥量等待队列上的线程
以上的理解仅仅是从锁的角度的理解,也是我们常用的
案例:
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>int ticket=1000;
pthread_mutex_t mutex;void* routine(void* args)
{char* id= (char*)args;while(1){pthread_mutex_lock(&mutex);if(ticket >0){usleep(1000);--ticket;printf("%s 买了个票,还剩 %d 张\n",id, ticket);pthread_mutex_unlock(&mutex);}else{pthread_mutex_unlock(&mutex);break;}}return nullptr;
}int main()
{pthread_t t1,t2,t3,t4;pthread_mutex_init(&mutex, nullptr);pthread_create(&t1, nullptr, routine, (void*)"thread 1");pthread_create(&t2, nullptr, routine, (void*)"thread 2");pthread_create(&t3, nullptr, routine, (void*)"thread 3");pthread_create(&t4, nullptr, routine, (void*)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(&mutex);return 0;
}
我们使用互斥量来保护临界区代码,使进入临界区的线程最多只能有1个,这样就不会有多个线程进入临界区操作共享资源,引发并发问题
互斥量实现原理
• 注意,原子操作和并发问题没有关系,哪怕给互斥量加锁的操作是个原子操作,那么多个线程执行这个原子操作,同样会有多线程访问共享资源互斥量的并发问题,所谓原子操作,是不会因为cpu线程切换调度机制而被打断,而不是执行操作时没有访问共享资源的并发问题

当多个线程同时用互斥量加锁时,线程会将互斥量的值和0交换,如果得到的互斥量的值不是0,那说明互斥量还没有被其他线程加锁,那此时该线程用0和互斥量交换值,就相当于加锁了,然后线程继续执行,不会被阻塞。
因为pthread_mutex_lock不是原子操作,所以会被cpu切换调度线程给打断,我们假设一个线程用0交换互斥量后被切换调度,然后另一个线程被调度,然后这个线程交换过后得到的互斥量的值很显然是0,所以该线程pcb会挂起到互斥量等待队列上,这样,我们就保证了加锁操作虽然是并发的,但最终只会有一个线程会继续执行,其他线程都会挂起阻塞
用互斥量解锁的本质其实就是将互斥量的值赋值为1,然后唤醒互斥量等待队列上的线程
总结:
1、互斥量本身就可以被多个线程操作,互斥量本身就是共享资源
2、互斥量的加锁操作使用了原子的交换指令,从而保证加锁操作虽然是并发的,但却不会有问题
3、原子操作不会因为cpu的线程切换调度机制而被打断,但原子操作可能会访问共享资源,所以原子操作的执行并不保证没有并发问题
4、cpu的线程切换调度机制是当一条指令执行到中断周期时,可能会有时钟中断需要处理,然后线程pcb中的时间片更新耗尽,cpu就会切换调度线程,在此背景下,可以保证一条指令的操作绝对是原子操作
互斥量的封装
#include <pthread.h>namespace LockModule
{class Mutex{public:Mutex(const Mutex&)= delete;Mutex& operator=(const Mutex&)= delete;Mutex(){pthread_mutex_init(&_mutex, nullptr);//pthread_mutex_init初始化的互斥量必须使用pthread_mutex_destroy销毁} void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}pthread_mutex_t* GetMutex(){return &_mutex;}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};template<class T>class LockGuard{public:LockGuard(T& mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:T& _mutex;};
}
线程同步
条件变量
同步概念
条件变量函数
方法一:
pthread_cond_t cond= PTHREAD_COND_INITIALIZER;方法二:int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);
参数:
cond:要初始化的条件变量
attr:条件变量的属性,一般设nullptr
如果使用pthread_cond_init函数初始化的条件变量则需要使用该函数销毁int pthread_cond_destroy(pthread_cond_t *cond)
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict
mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,保证线程挂载到条件变量等待队列上前,条件不会被修改。如果没有互斥量,
那么假如条件不满足,线程要挂载到条件变量等待队列上,结果其他线程在挂载之前将条件直接修改
然后唤醒条件变量等待队列上的线程,此时线程还未挂载到等待队列上,于是就错过了唤醒,
可能会导致后面永远在条件变量上阻塞pthread_cond_wait是条件变量的核心函数,调用线程的pcb会挂载到条件变量的等待队列上,然后释放互斥量,之后线程阻塞等待唤醒。该函数时使用场景是,首先线程使用互斥量加锁,防止其他线程提前唤醒,然后判断条件,满足就直接操作临界资源,不满足就调用pthread_cond_wait,挂载线程pcb到等待队列上,然后释放互斥量,等到被唤醒后,会先尝试获取互斥量,加锁后操作临界资源,最后解锁
条件变量的阻塞等待有两个细节:
1、线程调用条件变量的阻塞等待函数前需要先获取互斥量,互斥量是用来保证线程挂载到条件变量等待队列上之前,其他线程不会提前唤醒,导致该线程错过了唤醒,最后永久阻塞。因为有互斥量的加入,其他线程需要先获取互斥量,然后修改条件,然后才能进行唤醒
2、线程判断条件需要使用while而不是if,因为线程被唤醒时可能是虚假唤醒,也就是其他线程进行了唤醒,但是条件实际上并不满足,因此线程被唤醒并获取互斥量后,需要再判断一次条件,如果确实满足才开始操作临界资源,否则继续挂在到条件变量等待队列上
我们可以看到,单单的pthread_cond_wait的作用其实很弱,还需要程序员做好很多额外工作,才能更好的使用条件变量,从而在保证线程执行临界区代码是互斥的前提下,避免线程的饥饿问题,实现同步
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);案例:
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond= PTHREAD_COND_INITIALIZER;void* routine(void* args)
{char* id= (char*)args;while(1){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond,&mutex);printf("%s 被唤醒\n",id);pthread_mutex_unlock(&mutex);}
}int main()
{pthread_t t1,t2,t3,t4;pthread_create(&t1, nullptr, routine, (void*)"thread 1");pthread_create(&t2, nullptr, routine, (void*)"thread 2");sleep(1);while(1){pthread_cond_broadcast(&cond);sleep(1);}pthread_join(t1, NULL);pthread_join(t2, NULL);return 0;
}
⽣产者消费者模型
321原则
"3" - 三种关系
1. 生产者 vs 生产者:互斥关系
2. 消费者 vs 消费者:互斥关系
3. 生产者 vs 消费者:同步关系
"2" - 两种角色
1. 生产者(Producer)
2. 消费者(Consumer)
"1" - 一个交易场所
阻塞队列
为何要使⽤⽣产者消费者模型
⽣产者消费者模型优点

基于BlockingQueue的⽣产者消费者模型

基于阻塞队列的生产者消费者模型
#include <queue>
#include <pthread.h>using namespace std;template<class T>
class BlockQueue
{
public:BlockQueue(int cap):_cap(cap),_product_wait_num(0),_consum_wait_num(0){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_product_cond,nullptr);pthread_cond_init(&_consum_cond,nullptr);}void Enqueue(const T& in){pthread_mutex_lock(&_mutex);while(_q.size()== _cap){_product_wait_num++;pthread_cond_wait(&_product_cond, &_mutex);_product_wait_num--;}_q.push(in);if(_consum_wait_num> 0){pthread_cond_signal(&_consum_cond);}pthread_mutex_unlock(&_mutex);}void Pop(T* out){pthread_mutex_lock(&_mutex);while(_q.empty()){_consum_wait_num++;pthread_cond_wait(&_consum_cond, &_mutex);_consum_wait_num--;}*out= _q.front();_q.pop();if(_product_wait_num> 0){pthread_cond_signal(&_product_cond);}pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_product_cond);pthread_cond_destroy(&_consum_cond);}private:queue<T> _q;int _cap;//阻塞队列上限pthread_mutex_t _mutex;pthread_cond_t _product_cond;pthread_cond_t _consum_cond;int _product_wait_num;int _consum_wait_num;
};
#include "BlockQueue.hpp"
#include <functional>
#include <iostream>
#include <unistd.h>using namespace std;using task_t= function<void()>;
BlockQueue<task_t> bq(10);void task1()
{printf("处理数据库任务\n");
}void* routine1(void* args)
{char* p= (char*)args;while(1){sleep(1);printf("%s 往阻塞队列添加任务\n",p);bq.Enqueue(task1);}
}void* routine2(void* args)
{char* p= (char*)args;while(1){sleep(1);printf("%s ",p);task_t t;bq.Pop(&t);t();}
}
int main()
{pthread_t t1,t2,t3,t4;pthread_create(&t1, nullptr, routine1, (void*)"thread 1");pthread_create(&t2, nullptr, routine1, (void*)"thread 2");pthread_create(&t3, nullptr, routine2, (void*)"thread 3");pthread_create(&t4, nullptr, routine2, (void*)"thread 4");while(1){sleep(5);}return 0;
}
为什么 pthread_cond_wait 需要互斥量?
条件变量的错误使用
pthread_mutex_lock(&mutex);
while (condition_is_false)
{pthread_mutex_unlock(&mutex);//解锁之后,其他线程可以获取互斥量,然后修改共享资源并唤醒条件变量的等待队列上的阻塞线程,//但是此时该线程可能还没有挂载到等待队列上,导致唤醒可能被错过,使该线程永久阻塞,因此在//该线程挂载到等待队列上之前要一直持有锁,该种写法是不行的pthread_cond_wait(&cond);pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);条件变量使⽤规范
pthread_mutex_lock(&mutex);
while (条件为假)pthread_cond_wait(cond, mutex);修改共享变量;
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
修改共享变量;
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);POSIX信号量
信号量和条件变量一样,也是在线程互斥的前提下,保证线程进入临界区的顺序,实现线程同步,其底层其实可以使用互斥量和条件变量来实现
posix信号量函数
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值
int sem_destroy(sem_t *sem);
功能:如果信号量的计数器不为0,那就将计数器减1,如果信号量的计数器为0,那就
线程挂载到信号量的等待队列上,等待唤醒
int sem_wait(sem_t *sem); //P()
功能:将信号量的计数器加1,并且唤醒信号量等待队列上的最前面的一个线程
int sem_post(sem_t *sem);//V()
struct semaphore {int count;pthread_mutex_t mutex;pthread_cond_t cond;
};void sem_init(semaphore *sem, int initial_count) {sem->count = initial_count;pthread_mutex_init(&sem->mutex, NULL);pthread_cond_init(&sem->cond, NULL);
}void sem_wait(semaphore *sem) {pthread_mutex_lock(&sem->mutex);while (sem->count <= 0) { // 必须用while循环pthread_cond_wait(&sem->cond, &sem->mutex);}sem->count--;pthread_mutex_unlock(&sem->mutex);
}void sem_post(semaphore *sem) {pthread_mutex_lock(&sem->mutex);sem->count++;pthread_cond_signal(&sem->cond); // 或broadcastpthread_mutex_unlock(&sem->mutex);
}信号量的计数器有自己的锁来保护,所以我们在使用信号量进行P(-)和V(+)操作时,不用再自己加锁保护了
基于环形队列的⽣产消费模型

实现代码:
#include <vector>
#include <pthread.h>
#include <semaphore.h>using namespace std;template <class T>
class RingQueue
{
public:RingQueue(int cap):_cap(cap){_q.resize(_cap);_product_step=0;_consum_step=0;pthread_mutex_init(&_product_mutex, nullptr);pthread_mutex_init(&_consum_mutex, nullptr);sem_init(&_room_sem, 0, _cap);//第二个参数,0是线程间共享,非0是进程间共享sem_init(&_data_sem,0, 0);}void Enqueue(const T& in){ sem_wait(&_room_sem);//信号量自个有锁保护计数器pthread_mutex_lock(&_product_mutex);_q[_product_step++]= in;_product_step%= _cap;pthread_mutex_unlock(&_product_mutex);sem_post(&_data_sem);}void Pop(T* out){sem_wait(&_data_sem);//为什么需要锁来保护环形队列,因为初始情况data_sem为cap,//所以会有很多进程同时能在data_sem的wait下不阻塞,同时访问环形队列,因此必须加保护pthread_mutex_lock(&_consum_mutex);*out= _q[_consum_step++];_consum_step%= _cap;pthread_mutex_unlock(&_consum_mutex);sem_post(&_room_sem);}~RingQueue(){pthread_mutex_destroy(&_product_mutex);pthread_mutex_destroy(&_consum_mutex);}private:vector<T> _q;int _cap;//环形队列的大小int _product_step;int _consum_step;pthread_mutex_t _product_mutex;//环形队列的队列本身还有下标是多个线程的共享资源,所以要用锁保护pthread_mutex_t _consum_mutex;sem_t _room_sem;sem_t _data_sem;//3个关系,两个互斥的关系,一个同步的关系
};
#include "RingQueue.hpp"
#include <functional>
#include <iostream>
#include <unistd.h>using namespace std;using task_t= function<void()>;
RingQueue<task_t> bq(2);void task1()
{printf("处理数据库任务\n");
}void task2()
{printf("处理网络任务\n");
}void* routine1(void* args)
{char* p= (char*)args;while(1){sleep(1);printf("%s 往阻塞队列添加任务\n",p);bq.Enqueue(task1);}
}void* routine3(void* args)
{char* p= (char*)args;while(1){sleep(1);printf("%s 往阻塞队列添加任务\n",p);bq.Enqueue(task2);}
}void* routine2(void* args)
{char* p= (char*)args;while(1){sleep(1);printf("%s ",p);task_t t;bq.Pop(&t);t();}
}
int main()
{pthread_t t1,t2,t3,t4;pthread_create(&t1, nullptr, routine1, (void*)"thread 1");pthread_create(&t2, nullptr, routine3, (void*)"thread 2");pthread_create(&t3, nullptr, routine2, (void*)"thread 3");pthread_create(&t4, nullptr, routine2, (void*)"thread 4");while(1){sleep(5);}return 0;
}
总结一下:
关于生产者消费者模型,上面一共展示了两种实现方式
1、以条件变量和互斥锁实现普通阻塞队列作为交易场所
2、以信号量和互斥锁实现环形阻塞队列作为交易场所
设计模式
设计模式是对常见问题的经典解决方案,比如策略模式、工厂模式、单例模式
策略模式的日志设计
⽇志认识
策略模式日志的代码实现
1、内部类写法:
#include <mutex>
#include <string>
#include <time.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <memory>
using namespace std;namespace LogModule
{enum class LogLevel{DEBUG,INFO,WARNNING,ERROR,FATAL};string LogLevel2String(LogLevel level){switch(level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNNING:return "WARNNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";}return "UNKNOWN";}string GetCurrTime(){time_t t= time(nullptr);struct tm time;//localtime(&t); 该函数只有一个参数,以返回值的形式返回struct tm*,//其返回的struct tm是一个静态变量,每次调用localtime都会覆盖该静态变量的值//所以多线程使用localtime是有并发读写问题的localtime_r(&t, &time);//localtime_r是可重入函数,因为需要你自己传入struct tm,//这样就不会多线程共享资源了char buffer[80];snprintf(buffer, sizeof(buffer), "%02d:%02d:%02d",\time.tm_hour, time.tm_min, time.tm_sec);return buffer;}class LogStrategy{public:virtual ~LogStrategy()= default;virtual void SyncLog(const string& message)= 0;//同步日志到控制台或文件};class ConSoleLogStrategy : public LogStrategy{public:void SyncLog(const string& message) override{lock_guard<mutex> lg(_mutex);cerr<< message<< endl;}~ConSoleLogStrategy(){}private:mutex _mutex;//ConsoleLogStrategy对象只会有一份作为多线程的共享资源,//如果某个线程要使用同步日志方法,那就需要先获取锁};class FileLogStrategy : public LogStrategy{public:FileLogStrategy(){}void SyncLog(const string& message) override{lock_guard<mutex> lg(_mutex);string log= _path+ _filename;ofstream out(log.c_str(), ios::app);out<<message<<endl;out.close();}~FileLogStrategy(){}private:string _path= "./";string _filename= "log.txt";mutex _mutex;//日志文件就是共享资源,需要加锁保护};class Logger{public:void UseConsoleLogStrategy(){_strategy= make_unique<ConSoleLogStrategy>();}void UseFileLogStrategy(){_strategy= make_unique<FileLogStrategy>();}Logger(){ UseConsoleLogStrategy();}class LogMessage{public:LogMessage(LogLevel level, string file, int line, const Logger& lg):_level(LogLevel2String(level)),_time(GetCurrTime()),_file(move(file)),_line(line),_lg(lg){stringstream ss;ss<<"["<<_time<<"] "<<"["<<_level<<"] "<<"["<<_file<<":"<<_line<<"] ";_info= ss.str();}template<class T>LogMessage& operator<<(const T& t){stringstream ss;ss<<t;_info+= ss.str();return *this;}~LogMessage(){//允许内部类访问外部类的private成员,因为内部类是外部类天然的友元类//但内部类成员函数没有外部类的this指针,所以必须先获取到外部类对象,才能接着访问其private成员if(_lg._strategy )_lg._strategy->SyncLog(_info);}private:string _time;string _level;string _file;int _line;string _info;const Logger& _lg;};LogMessage operator()(LogLevel level){return LogMessage(level,__FILE__, __LINE__,*this);}private:unique_ptr<LogStrategy> _strategy;};Logger lg;#define LOG(level) lg(level) #define ENABLE_CONSOLE_LOG_STRATEGY() lg.UseConsoleLogStrategy()
#define ENABLE_FILE_LOG_STRATEGY() lg.UseFileLogStrategy()
}
#include "Log.hpp"using namespace LogModule;int main()
{ENABLE_FILE_LOG_STRATEGY();LOG(LogLevel::DEBUG)<<"测试一下日志能不能跑";LOG(LogLevel::INFO)<<"啦啦啦";return 0;
}

#include <mutex>
#include <string>
#include <time.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <memory>
using namespace std;namespace LogModule
{enum class LogLevel{DEBUG,INFO,WARNNING,ERROR,FATAL};string LogLevel2String(LogLevel level){switch(level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNNING:return "WARNNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";}return "UNKNOWN";}string GetCurrTime(){time_t t= time(nullptr);struct tm time;//localtime(&t); 该函数只有一个参数,以返回值的形式返回struct tm*,//其返回的struct tm是一个静态变量,每次调用localtime都会覆盖该静态变量的值//所以多线程使用localtime是有并发读写问题的localtime_r(&t, &time);//localtime_r是可重入函数,因为需要你自己传入struct tm,//这样就不会多线程共享资源了char buffer[80];snprintf(buffer, sizeof(buffer), "%02d:%02d:%02d",\time.tm_hour, time.tm_min, time.tm_sec);return buffer;}class LogStrategy{public:virtual ~LogStrategy()= default;virtual void SyncLog(const string& message)= 0;//同步日志到控制台或文件};class ConSoleLogStrategy : public LogStrategy{public:void SyncLog(const string& message) override{lock_guard<mutex> lg(_mutex);cerr<< message<< endl;}~ConSoleLogStrategy(){}private:mutex _mutex;//ConsoleLogStrategy对象只会有一份作为多线程的共享资源,//如果某个线程要使用同步日志方法,那就需要先获取锁};class FileLogStrategy : public LogStrategy{public:FileLogStrategy(){}void SyncLog(const string& message) override{lock_guard<mutex> lg(_mutex);string log= _path+ _filename;ofstream out(log.c_str(), ios::app);out<<message<<endl;out.close();}~FileLogStrategy(){}private:string _path= "./";string _filename= "log.txt";mutex _mutex;//日志文件就是共享资源,需要加锁保护};class Logger;class LogMessage{public:LogMessage(LogLevel level, string file, int line, Logger& lg):_level(LogLevel2String(level)),_time(GetCurrTime()),_file(move(file)),_line(line),_lg(lg){stringstream ss;ss<<"["<<_time<<"] "<<"["<<_level<<"] "<<"["<<_file<<":"<<_line<<"] ";_info= ss.str();}template<class T>LogMessage& operator<<(const T& t) {stringstream ss;ss<<t;_info+= ss.str();return *this;}~LogMessage();private:string _time;string _level;string _file;int _line;string _info;Logger& _lg;};class Logger{friend class LogMessage;//声明logmessage是该类的友元,所以Logmessage就可以访问该类的private成员public:void UseConsoleLogStrategy(){_strategy= make_unique<ConSoleLogStrategy>();}void UseFileLogStrategy(){_strategy= make_unique<FileLogStrategy>();}Logger(){ UseConsoleLogStrategy();}LogMessage operator()(LogLevel level){return LogMessage(level,__FILE__, __LINE__,*this);}private:unique_ptr<LogStrategy> _strategy;};LogMessage::~LogMessage(){//有Logger的完整定义才可以访问Logger的成员if(_lg._strategy )_lg._strategy->SyncLog(_info);}Logger lg;#define LOG(level) lg(level) #define ENABLE_CONSOLE_LOG_STRATEGY() lg.UseConsoleLogStrategy()
#define ENABLE_FILE_LOG_STRATEGY() lg.UseFileLogStrategy()
}
#include "Log.hpp"using namespace LogModule;int main()
{LOG(LogLevel::DEBUG)<<"测试一下日志能不能跑";LOG(LogLevel::INFO)<<"啦啦啦";return 0;
}
线程池

线程封装
#include <pthread.h>
#include <string>
#include <functional>
#include <sstream>
using namespace std;namespace ThreadModule
{using func_t= function<void()>;int cnt= 1;class Thread{static void* Func(void* args){Thread* p= (Thread*)args;pthread_setname_np(pthread_self(), p->_name.c_str());p->_task();return nullptr;}public:Thread(func_t task)//构造函数只是创建一下上层Thread结构,设置一下:_task(task){stringstream ss;ss<<"thread "<<cnt++;_name= ss.str();}void Start(){pthread_create(&_t, nullptr, Func, this);}void Join(){pthread_join(_t,nullptr);}private://这些数据都在Thread中,所以线程创建需要将Thread作为参数传过去string _name;pthread_t _t;func_t _task;};
}简简单单封装一下,注意Thread构造函数中不会直接启动线程,而是初始化好线程的名字和要执行的函数
线程池实现:
#include "Thread.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include "Log.hpp"using namespace LogModule;
using namespace ThreadModule;using task_t= function<void()>;class ThreadPool
{void HandleTask(){char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO)<<name<<" 开始运行"; while(1){pthread_mutex_lock(&_mutex);if(_q.empty() && _isrunning){//任务队列为空,但线程池还在跑,那该线程就在条件变量上等_waitnum++;pthread_cond_wait(&_cond, &_mutex);_waitnum--;}if(_q.empty() && !_isrunning){pthread_mutex_unlock(&_mutex);break;}//任务队列不为空,那不管线程池跑不跑了都得执行完任务,所以该线程取任务,并执行task_t t= _q.front();_q.pop();pthread_mutex_unlock(&_mutex);t();}}public:ThreadPool(int threadnum):_threadnum(threadnum){LOG(LogLevel::DEBUG)<<"线程池初始化线程的名字,还有要运行的函数";for(int i=0;i<_threadnum;i++)_pool.push_back({bind(&ThreadPool::HandleTask, this)});}void Start(){LOG(LogLevel::DEBUG)<<"线程池开始让线程运行";pthread_mutex_lock(&_mutex);_isrunning= true;for(int i=0;i<_threadnum;i++)_pool[i].Start();pthread_mutex_unlock(&_mutex);}void Join(){LOG(LogLevel::DEBUG)<<"线程池开始回收线程";for(int i=0;i<_threadnum;i++){_pool[i].Join();}}void Stop(){LOG(LogLevel::DEBUG)<<"线程池停止添加任务";pthread_mutex_lock(&_mutex);_isrunning= false;if(_waitnum> 0){pthread_cond_broadcast(&_cond);//唤醒等待的线程}pthread_mutex_unlock(&_mutex);}void Enqueue(task_t t){pthread_mutex_lock(&_mutex);if(_isrunning== false)//读共享资源需要加锁,不然刚读完就被改了,执行流就不对了{pthread_mutex_unlock(&_mutex);return ;}_q.push(t);if(_waitnum> 0){pthread_cond_signal(&_cond);//保持线程同步}pthread_mutex_unlock(&_mutex);}private:int _threadnum;vector<Thread> _pool;queue<task_t> _q;int _waitnum= 0;//在条件变量上等待获取任务的线程个数pthread_mutex_t _mutex= PTHREAD_MUTEX_INITIALIZER;pthread_cond_t _cond= PTHREAD_COND_INITIALIZER;bool _isrunning;
};测试线程池:
#include "ThreadPool.hpp"
#include <stdlib.h>
#include <unistd.h>void task1()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理数据库任务";
}void task2()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理网络任务";}void task3()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理系统任务";}void task4()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理web任务";}int main()
{ThreadPool tp(5);tp.Start();int cnt = 20;task_t a[5];a[0] = task1, a[1] = task2, a[2] = task3, a[3] = task4;srand(time(nullptr));while (cnt--){int t = rand() % 4;tp.Enqueue(a[t]);sleep(1);}tp.Stop();tp.Join();return 0;
}
线程是同步的,符合预期
线程安全的单例模式
单例模式
有些类只能实例出一个对象,这就是单例
单例模式有两种实现方式
1、饿汉实现方式
程序在启动时就已经实例好对象了
#include "Thread.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include "Log.hpp"using namespace LogModule;
using namespace ThreadModule;using task_t= function<void()>;class ThreadPool
{void HandleTask(){char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO)<<name<<" 开始运行"; while(1){pthread_mutex_lock(&_mutex);if(_q.empty() && _isrunning){//任务队列为空,但线程池还在跑,那该线程就在条件变量上等_waitnum++;pthread_cond_wait(&_cond, &_mutex);_waitnum--;}if(_q.empty() && !_isrunning){pthread_mutex_unlock(&_mutex);break;}//任务队列不为空,那不管线程池跑不跑了都得执行完任务,所以该线程取任务,并执行task_t t= _q.front();_q.pop();pthread_mutex_unlock(&_mutex);t();}}public:ThreadPool(int threadnum):_threadnum(threadnum){LOG(LogLevel::DEBUG)<<"线程池初始化线程的名字,还有要运行的函数";for(int i=0;i<_threadnum;i++)_pool.push_back({bind(&ThreadPool::HandleTask, this)});}ThreadPool(const ThreadPool&)= delete;ThreadPool& operator=(const ThreadPool&)= delete;void Start(){LOG(LogLevel::DEBUG)<<"线程池开始让线程运行";pthread_mutex_lock(&_mutex);_isrunning= true;for(int i=0;i<_threadnum;i++)_pool[i].Start();pthread_mutex_unlock(&_mutex);}void Join(){LOG(LogLevel::DEBUG)<<"线程池开始回收线程";for(int i=0;i<_threadnum;i++){_pool[i].Join();}}void Stop(){LOG(LogLevel::DEBUG)<<"线程池停止添加任务";pthread_mutex_lock(&_mutex);_isrunning= false;if(_waitnum> 0){pthread_cond_broadcast(&_cond);//唤醒等待的线程}pthread_mutex_unlock(&_mutex);}void Enqueue(task_t t){pthread_mutex_lock(&_mutex);if(_isrunning== false)//读共享资源需要加锁,不然刚读完就被改了,执行流就不对了{pthread_mutex_unlock(&_mutex);return ;}_q.push(t);if(_waitnum> 0){pthread_cond_signal(&_cond);//保持线程同步}pthread_mutex_unlock(&_mutex);}static ThreadPool* GetInstance(){return &tp;}
private:int _threadnum;vector<Thread> _pool;queue<task_t> _q;int _waitnum= 0;//在条件变量上等待获取任务的线程个数pthread_mutex_t _mutex= PTHREAD_MUTEX_INITIALIZER;pthread_cond_t _cond= PTHREAD_COND_INITIALIZER;bool _isrunning;static ThreadPool tp;//类内声明
}; ThreadPool ThreadPool::tp(5);//类外定义
#include "ThreadPool.hpp"
#include <stdlib.h>
#include <unistd.h>void task1()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理数据库任务";
}void task2()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理网络任务";}void task3()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理系统任务";}void task4()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理web任务";}int main()
{ThreadPool* p= ThreadPool::GetInstance();ThreadPool& tp= *p;tp.Start();int cnt = 20;task_t a[5];a[0] = task1, a[1] = task2, a[2] = task3, a[3] = task4;srand(time(nullptr));while (cnt--){int t = rand() % 4;tp.Enqueue(a[t]);sleep(1);}tp.Stop();tp.Join();return 0;
}
因为饿汉方式实现单例模式,在程序运行前对象就已经实例好了,所以GetInstance返回值一直都是一样的,哪怕是多线程的生产者,访问线程池也没什么问题,不用加锁保护
2、懒汉实现方式
懒汉方式实现单例模式,单例对象等到第一次被使用时再创建
#include "Thread.hpp"
#include <vector>
#include <queue>
#include <mutex>
#include "Log.hpp"using namespace LogModule;
using namespace ThreadModule;using task_t= function<void()>;class ThreadPool
{void HandleTask(){char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO)<<name<<" 开始运行"; while(1){pthread_mutex_lock(&_mutex);if(_q.empty() && _isrunning){//任务队列为空,但线程池还在跑,那该线程就在条件变量上等_waitnum++;pthread_cond_wait(&_cond, &_mutex);_waitnum--;}if(_q.empty() && !_isrunning){pthread_mutex_unlock(&_mutex);break;}//任务队列不为空,那不管线程池跑不跑了都得执行完任务,所以该线程取任务,并执行task_t t= _q.front();_q.pop();pthread_mutex_unlock(&_mutex);t();}}public:ThreadPool(int threadnum):_threadnum(threadnum){LOG(LogLevel::DEBUG)<<"线程池初始化线程的名字,还有要运行的函数";for(int i=0;i<_threadnum;i++)_pool.push_back({bind(&ThreadPool::HandleTask, this)});}ThreadPool(const ThreadPool&)= delete;//单例模式禁用拷贝ThreadPool& operator=(const ThreadPool&)= delete;void Start(){LOG(LogLevel::DEBUG)<<"线程池开始让线程运行";pthread_mutex_lock(&_mutex);_isrunning= true;for(int i=0;i<_threadnum;i++)_pool[i].Start();pthread_mutex_unlock(&_mutex);}void Join(){LOG(LogLevel::DEBUG)<<"线程池开始回收线程";for(int i=0;i<_threadnum;i++){_pool[i].Join();}}void Stop(){LOG(LogLevel::DEBUG)<<"线程池停止添加任务";pthread_mutex_lock(&_mutex);_isrunning= false;if(_waitnum> 0){pthread_cond_broadcast(&_cond);//唤醒等待的线程}pthread_mutex_unlock(&_mutex);}void Enqueue(task_t t){pthread_mutex_lock(&_mutex);if(_isrunning== false)//读共享资源需要加锁,不然刚读完就被改了,执行流就不对了{pthread_mutex_unlock(&_mutex);return ;}_q.push(t);if(_waitnum> 0){pthread_cond_signal(&_cond);//保持线程同步}pthread_mutex_unlock(&_mutex);}static ThreadPool* GetInstance(){pthread_mutex_lock(&mut);if(tp== nullptr){tp =new ThreadPool(5);}pthread_mutex_unlock(&mut);return tp;}
private:int _threadnum;vector<Thread> _pool;queue<task_t> _q;int _waitnum= 0;//在条件变量上等待获取任务的线程个数pthread_mutex_t _mutex= PTHREAD_MUTEX_INITIALIZER;pthread_cond_t _cond= PTHREAD_COND_INITIALIZER;bool _isrunning;static ThreadPool* tp;//类内声明static pthread_mutex_t mut;
}; ThreadPool* ThreadPool::tp= nullptr;
pthread_mutex_t ThreadPool::mut= PTHREAD_MUTEX_INITIALIZER;
#include "ThreadPool.hpp"
#include <stdlib.h>
#include <unistd.h>void task1()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理数据库任务";
}void task2()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理网络任务";}void task3()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理系统任务";}void task4()
{char name[80];pthread_getname_np(pthread_self(), name, 80);LOG(LogLevel::INFO) << name << " 处理web任务";}int main()
{ThreadPool* p= ThreadPool::GetInstance();ThreadPool& tp= *p;tp.Start();int cnt = 20;task_t a[5];a[0] = task1, a[1] = task2, a[2] = task3, a[3] = task4;srand(time(nullptr));while (cnt--){int t = rand() % 4;tp.Enqueue(a[t]);sleep(1);}tp.Stop();tp.Join();return 0;
}
懒汉方式实现的单例模式,由于是生产者第一次获取线程池时才会创建线程池,所以当多个生产者一开始同时获取线程池时,可能会有并发问题,导致线程池最后创建了好几份,所以必须加锁保护,当然在上面的例子中由于测试使用的生产者也就是添加任务的线程只有一个,所以完全不会有并发问题
常见锁概念
死锁

假设现在进入临界区需要获取两把锁,也就是锁1和锁2都要获取

如果线程A已经获取了锁1,线程B已经获取了锁2,现在各自去申请对方的锁,导致的结果就是两个线程pcb各自挂载到对方锁的等待队列上,最终永久阻塞
死锁四个必要条件
避免死锁
一个避免死锁的方法就是要么一次获取所有锁,要么一把锁都不获取,直接阻塞等待
可以使用<mutex>中的std::lock来实现同时加多把锁
