Linux线程互斥与同步
多个线程能看到的资源叫共享资源,但是我们需要对一部分共享资源进行保护,因此引入互斥和同步两种方法。
线程互斥
相关概念
临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完 成
抢票代码:
#pragma
#include<pthread.h>
#include<iostream>
typedef void (*func_t)(std::string);
class Thread
{
public:Thread(std::string name,func_t func){this->name = name;this->func = func;}void Excute(){is_running = true;func(name);}std::string Status(){if(is_running)return "running";elsereturn "sleep";}static void* ThreadRoutime(void* args){Thread *self = static_cast<Thread *>(args);self->Excute();return nullptr;}bool Start(){int n=::pthread_create(&tid, nullptr, ThreadRoutime, this);if(n!=0)return false;elsereturn true;}void Stop(){if(is_running){is_running = false;::pthread_cancel(tid);}}void Join(){if(is_running){::pthread_join(tid, nullptr);}}~Thread(){Stop();}private:std::string name;pthread_t tid;bool is_running;func_t func;
};
#include"Thread.hpp"
#include<iostream>
#include<unistd.h>
using namespace std;
int tickets = 10;
void Run(string name)
{int ct = 0;while (1){if(tickets>0){usleep(1000);tickets--;ct++;}elsebreak;}cout << name << " get tickets:" << ct << endl;
}
int main()
{Thread t("syx", Run);Thread t1("syx1", Run);Thread t2("syx2", Run);t.Start();t1.Start();t2.Start();t.Join();t1.Join();t2.Join();return 0;
}
最后发现票数加起来竟然超过原有的10张:
因此需要对tickets进行保护.
加锁
接口:
#include<pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
//局部锁
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict mutex);
//全局定义锁
pthread_mutex_t mutex=PTHREAD_MUTEX_INITALIZER;
pthread_mutex_t是互斥锁类型,同一时间只有一个能访问资源,全局锁会自动释放,局部锁要destroy。
加锁:
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
把访问临界资源的代码叫临界区,对临界资源的保护,本质是对临界区代码进行保护。
每次只允许一个线程访问临界区.
加锁原则:
1.加锁的范围、粒度一定要小
2.任何线程,要进行抢票都得先申请锁,不能有例外
3.所有线程申请锁,都得看到这把锁,锁本身也是共享资源,加锁的过程必须是原子的
4.原子性:要么不做,要么做完,没有中间状态
5.锁申请成功了会继续往后运行,失败了会被阻塞
6.执行临界区代码可以被切换走,但是其他线程无法进入,因为锁没被释放
访问临界区的线程对于其他线程就是原子的,因此上述抢票代码可以改成:
//Thread.hpp
#pragma
#include<pthread.h>
#include<iostream>class ThreadDate
{
public:ThreadDate(std::string name, pthread_mutex_t *lock):_name(name),_lock(lock){}std::string _name;pthread_mutex_t *_lock;
};
typedef void (*func_t)(ThreadDate* td,std::string);class Thread
{
public:Thread(std::string name,func_t func,ThreadDate*td){this->name = name;this->func = func;this->_td = td;}void Excute(){is_running = true;func(_td,name);}std::string Status(){if(is_running)return "running";elsereturn "sleep";}static void* ThreadRoutime(void* args){Thread *self = static_cast<Thread *>(args);self->Excute();return nullptr;}bool Start(){int n=::pthread_create(&tid, nullptr, ThreadRoutime, this);if(n!=0)return false;elsereturn true;}void Stop(){if(is_running){is_running = false;::pthread_cancel(tid);}}void Join(){if(is_running){::pthread_join(tid, nullptr);}}~Thread(){Stop();}private:std::string name;pthread_t tid;bool is_running;func_t func;ThreadDate* _td;
};
//LockGuard.hpp
#pragma
#include"Thread.hpp"
class LockGuard
{
public:LockGuard(ThreadDate* td):_td(td){pthread_mutex_lock(_td->_lock);}~LockGuard(){pthread_mutex_unlock(_td->_lock);}private:ThreadDate *_td;
};
#include"LockGuard.hpp"
#include<iostream>
#include<unistd.h>
#include<vector>
using namespace std;
volatile int tickets = 10;
void Run(ThreadDate* td,string name)
{int ct = 0;while (1){LockGuard ld(td);//RAII风格锁if (tickets > 0){usleep(1000);tickets--;ct++;}else{break;}}cout << name << " get tickets:" << ct << endl;
}
int main()
{pthread_mutex_t mutex;pthread_mutex_init(&mutex, nullptr);vector<Thread> v;int n = 3;for (int i = 0; i < n; i++){string name = "syx" + to_string(i + 1);ThreadDate *td = new ThreadDate(name, &mutex);v.emplace_back(name, Run, td);}for(auto &i:v){i.Start();}for(auto &i:v){i.Join();}cout << tickets << endl;pthread_mutex_destroy(&mutex);return 0;
}
从原理角度理解:
申请锁成功就是pthread_mutex_lock()会返回,失败就是函数不返回,线程被阻塞,pthread_mutex_lock内部被唤醒,就重新申请锁。
从实践角度理解:
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单 元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一 个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
把数据从内存一道CPU寄存器中,本质是把数据从共享变为线程私有(寄存器里的数据是每个线程私有的,但是内存中的数据是共享的),mutex在内存中,一个线程抢到锁,其他线程看到的mutex都是0。
线程同步
上述抢票我们会发现,一号线程获得票最多,这是因为竞争能力不同,同步就是为了让访问资源具有合理性,顺序性,一号线程解锁后就要重新排队。
条件变量
条件变量需要一个线程队列,而且要有通知机制。
创建接口:
//局部
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
int pthread_cond_destroy(pthread_cond_t *cond);
//全局
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
等待:
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
唤醒:
//所有
int pthread_cond_broadcast(pthread_cond_t *cond);
//一个
int pthread_cond_signal(pthread_cond_t *cond);
实例代码:
#include<iostream>
using namespace std;
#include<vector>
#include<pthread.h>
#include<unistd.h>
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
void *Run(void *args)
{string name = static_cast<char *>(args);while(1){pthread_mutex_lock(&gmutex);pthread_cond_wait(&gcond, &gmutex);cout << name << endl;pthread_mutex_unlock(&gmutex);usleep(10000);}
}
int main()
{int n = 5;vector<pthread_t> v;for (int i = 0; i < n; i++){pthread_t id;char *name = new char[128];snprintf(name, 128, "thread %d", i + 1);pthread_create(&id, nullptr, Run, (void*)name);v.emplace_back(id);}while(1){pthread_cond_signal(&gcond);//主线程唤醒cout << "唤醒" << endl;usleep(10000);}for(auto i:v){pthread_join(i,nullptr);}
}
结果是唤醒的线程有顺序性。
生产消费模型
超市就像一个缓存,是消费者拿到商品的过渡,实际上是一个多执行流并发的模型。该模型优点:协调忙闲不均,效率高,解耦。
原则:
1.一个交易场所(特定数据结构形式存在的一段内存空间)
2.两种角色(生产角色,消费角色),生产线程和消费现场
3.三种关系(生产和生产,消费和消费,生产和消费),互斥关系,但是生产和消费有一定的同步关系
基于BlockingQueue的生产消费模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程 操作时会被阻塞).
//BlockQueue.hpp
#pragma
#include<iostream>
#include<queue>
#include<string>
using namespace std;
#include<pthread.h>
const static int defaultcap = 5;
template <typename T>
class BlockQueue
{
public:BlockQueue(int cap = defaultcap):_max_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}void Equeue(const T& in){pthread_mutex_lock(&_mutex);while(ifFull()){//被调用时,除了让自己继续排队,还会自己释放传入的锁,返回时必须参与锁的竞争,重新加上锁pthread_cond_wait(&_pcond, &_mutex);}//需要补充了_block_queue.push(in);pthread_mutex_unlock(&_mutex);//通知消费者可以消费pthread_cond_signal(&_ccond);}void Pop(T* out){pthread_mutex_lock(&_mutex);while(ifEmpty()){pthread_cond_wait(&_ccond, &_mutex);}//有货,可以买*out = _block_queue.front();_block_queue.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_pcond);}private:bool ifFull(){return _block_queue.size() == _max_cap;}bool ifEmpty(){return _block_queue.empty();}queue<T> _block_queue;int _max_cap;pthread_mutex_t _mutex;pthread_cond_t _pcond;pthread_cond_t _ccond;
};
//Main.cpp
#include"BlockQueue.hpp"
#include<time.h>
#include<unistd.h>
void* Buy(void* argv)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(argv);while(1){int data = 0;bq->Pop(&data);cout << "Consumer->" << data << endl;sleep(1);}
}
void* Put(void* argv)
{srand(time(nullptr));BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(argv);while(1){int data = rand() % 10 + 1;bq->Equeue(data);cout << "Productor->" << data << endl;}
}
int main()
{BlockQueue<int> *bq = new BlockQueue<int>();pthread_t c, p;pthread_create(&c, nullptr, Buy, bq);pthread_create(&p, nullptr, Put, bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
当然,货物可以不只是整数,还可以是别的,比如封装成结构体的任务,然后让消费者处理任务就行。