linux线程互斥和同步
目录
互斥量封装
封装
线程同步
条件变量
同步概念与竞态条件
条件变量函数
初始化
销毁
等待
唤醒等待
⽣产者消费者模型
基于BlockingQueue的⽣产者消费者模型
互斥量封装
先前说,我们可以不使用全局变量锁,我们可以封装一个面对对象的锁。
代码:
class ThreadData
{
public:ThreadData(const std::string& name,pthread_mutex_t* lock):_name(name),_lock(lock){}std::string _name;pthread_mutex_t* _lock;
};
int ticket = 100;
void *routine(void *agrs)
{ThreadData* td=static_cast<ThreadData*>(agrs);while (1){pthread_mutex_lock(td->_lock); // 加锁if (ticket > 0){usleep(1000);printf("%s buy ticket:%d\n",td->_name.c_str(),ticket);ticket--;pthread_mutex_unlock(td->_lock); // 解锁}else{pthread_mutex_unlock(td->_lock); // 解锁break;}}return nullptr;
}
int main()
{pthread_mutex_t lock;pthread_mutex_init(&lock,nullptr);//初始化//创建新线程pthread_t tid1, tid2, tid3, tid4;ThreadData* td1=new ThreadData("thread-1",&lock);pthread_create(&tid1, nullptr, routine, td1);ThreadData* td2=new ThreadData("thread-2",&lock);pthread_create(&tid2, nullptr, routine, td2);ThreadData* td3=new ThreadData("thread-3",&lock);pthread_create(&tid3, nullptr, routine, td3);ThreadData* td4=new ThreadData("thread-4",&lock);pthread_create(&tid4, nullptr, routine, td4);// 回收pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);pthread_mutex_destroy(&lock);//回收return 0;
}
封装
封装一把锁:
namespace MutexModule
{class Mutex{public:Mutex(){pthread_mutex_init(&_mutex,nullptr);}void Lock(){pthread_mutex_lock(&_mutex);}void UnLock(){pthread_mutex_unlock(&_mutex);}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};
}
用面对对象使用这把锁:
using namespace MutexModule;
//数据
class ThreadData
{
public:ThreadData(const std::string &name, Mutex *lock): _name(name), _lock(lock){}std::string _name;Mutex *_lock;
};
int ticket = 100;
void *routine(void *agrs)
{ThreadData *td = static_cast<ThreadData *>(agrs);while (1){td->_lock->Lock(); // 上锁if (ticket > 0){usleep(1000);printf("%s buy ticket:%d\n", td->_name.c_str(), ticket);ticket--;td->_lock->UnLock(); // 解锁}else{td->_lock->UnLock(); // 解锁break;}}return nullptr;
}
int main()
{Mutex lock; // 这样就已经初始化锁了// 创建新线程pthread_t tid1, tid2, tid3, tid4;ThreadData *td1 = new ThreadData("thread-1", &lock);//初始化数据pthread_create(&tid1, nullptr, routine, td1);ThreadData *td2 = new ThreadData("thread-2", &lock);//初始化数据pthread_create(&tid2, nullptr, routine, td2);ThreadData *td3 = new ThreadData("thread-3", &lock);//初始化数据pthread_create(&tid3, nullptr, routine, td3);ThreadData *td4 = new ThreadData("thread-4", &lock);//初始化数据pthread_create(&tid4, nullptr, routine, td4);// 回收pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);//回收自己会Mutex类自己会析构!return 0;
}
这样不优雅,我们可以继续改进一下:
我们增加一个类:
class LockGuard{public:LockGuard(Mutex& mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.UnLock();}private:Mutex& _mutex;};
调用:
// 数据
class ThreadData
{
public:ThreadData(const std::string &name, Mutex *lock): _name(name), _lock(lock){}std::string _name;Mutex *_lock;
};
int ticket = 100;
void *routine(void *agrs)
{ThreadData *td = static_cast<ThreadData *>(agrs);while (1){LockGuard guard(*(td->_lock));if (ticket > 0){usleep(1000);printf("%s buy ticket:%d\n", td->_name.c_str(), ticket);ticket--; }else{break;}}return nullptr;
}
int main()
{Mutex lock;// 创建新线程pthread_t tid1, tid2, tid3, tid4;ThreadData *td1 = new ThreadData("thread-1", &lock); // 初始化数据pthread_create(&tid1, nullptr, routine, td1);ThreadData *td2 = new ThreadData("thread-2", &lock); // 初始化数据pthread_create(&tid2, nullptr, routine, td2);ThreadData *td3 = new ThreadData("thread-3", &lock); // 初始化数据pthread_create(&tid3, nullptr, routine, td3);ThreadData *td4 = new ThreadData("thread-4", &lock); // 初始化数据pthread_create(&tid4, nullptr, routine, td4);// 回收pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);pthread_join(tid4, nullptr);return 0;
}
这样就不需要自己写解锁了,每次循环结束都会解锁,LockGuard类析构解锁,因为guard变量在循环内是局部变量,每次循环解释都会释放(析构)。
当然我们可以将LockGuard类写成指针:
class LockGuard{public:LockGuard(Mutex* mutex):_mutex(mutex){_mutex->Lock();}~LockGuard(){_mutex->UnLock();}private:Mutex* _mutex;};
互斥解决了线程安全问题,但是不高效,对其他线程不公平,所以引入线程同步来解决这个问题。
线程同步让所有执行流访问临界资源,按照一定顺序进行访问。
线程同步
条件变量
- 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列 中。这种情况就需要⽤到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免 饥饿问题,叫做同步
- 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也 不难理解
条件变量函数
初始化
销毁
等待
唤醒等待
我们继续看买票的例子,加入条件变量:
//全局变量
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int num = 5;
int ticket = 20;
void *routine(void *agrs)
{char *name = static_cast<char *>(agrs);while (1){pthread_mutex_lock(&lock);if (ticket > 0){pthread_cond_wait(&cond, &lock);ticket--;printf("%s 计算ticket:%d\n", name, ticket);pthread_mutex_unlock(&lock);}else{pthread_mutex_unlock(&lock);break;}}return nullptr;
}
int main()
{std::vector<pthread_t> threads;for (int i = 0; i < num; i++){pthread_t tid;char *name = new char[64];snprintf(name, 64, "Thread-%d", i);int n = pthread_create(&tid, nullptr, routine, name);if (n != 0)continue;threads.push_back(tid);}while (1){std::cout<<"唤醒全部线程:"<<std::endl;pthread_cond_broadcast(&cond);// std::cout << "唤醒一个线程..." << std::endl;// pthread_cond_signal(&cond);sleep(1);}for (auto &thread : threads){pthread_join(thread, nullptr);}return 0;
}
理解:
先前知道,上锁,只会有一个线程进入,如果满足条件(pthread_cond_wait),这个线程会被挂起等待,等待之前会释放锁,这也是为什么pthread_cond_wait需要传锁的原因(要释放),所以,这个线程挂起等待,其他线程继续竞争锁,继续进入临界区继续等待,而当被唤醒broadcast或signal,会重新申请锁(竞争锁),如果锁被其他线程申请了或申请失败了呢?这个线程就会在锁上阻塞等待,等待自己申请到了锁,就继续向下执行!
⽣产者消费者模型
为何要使⽤⽣产者消费者模型?
⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间 不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔 给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区, 平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦
的。
⽣产者消费者模型优点?
1.生产过程和消费过程解耦。
2.支持忙闲不均
3.提高效率,体现在将来消费者消费的时候可以并发,同时生产者也可以生产,都是并发的。
4.
3种关系,2个角色,1个交易场所。
生产者之间直接是互斥关系,消费者之间是互斥关系,生产者和消费者之间是互斥和同步关系。
有生产者和消费者俩种角色(线程承担)。
交易场所是以特定结构构成的一个内存空间。
基于BlockingQueue的⽣产者消费者模型
在多线程编程中阻塞队列(BlockingQueue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与 普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元 素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是 基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
我们来写一个。
封装
代码:
const int defaultcap = 5;
template <typename T>
class BlocQueue
{bool IsFull() { return _q.size() >= _cal; }bool IsEmpty() { return _q.empty(); }public:BlocQueue(int cal = defaultcap): _cal(cal),_psleep_num(0),_csleep_num(0){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_full_cond,nullptr);pthread_cond_init(&_empty_cond,nullptr);}void Enqueue(const T &in){pthread_mutex_lock(&_mutex);// 生产者调用while (IsFull()){_psleep_num++;pthread_cond_wait(&_full_cond, &_mutex);_psleep_num--;}_q.push(in);if (_csleep_num > 0)pthread_cond_signal(&_empty_cond);pthread_mutex_unlock(&_mutex);}T Pop(){// 消费者调用pthread_mutex_lock(&_mutex);while (IsEmpty()){_csleep_num++;pthread_cond_wait(&_empty_cond, &_mutex);_csleep_num--;}T data = _q.front();_q.pop();if (_psleep_num > 0)pthread_cond_signal(&_full_cond);pthread_mutex_unlock(&_mutex);return data;}~BlocQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full_cond);pthread_cond_destroy(&_empty_cond);}private:std::queue<T> _q;int _cal; // 大小pthread_mutex_t _mutex;pthread_cond_t _full_cond;pthread_cond_t _empty_cond;int _csleep_num; // 消费者休眠的个数int _psleep_num; // 生产者休眠的个数
};
讲解:
1.
每次生产和消费都只能有一个线程进入临界资源。
2.
假设消费者先跑,消费者只会挂起等待(解锁),生产者申请锁,向队列写入,同时判断唤醒消费者,假设唤醒了,还是拿不到锁,生产者一直生产,队列就会满,挂起等待(解锁),然后消费者消费。
3.
问题:什么时候唤醒?为什么判断要用循环?
生产者生产了(push了),队列就一定有东西,就可以唤醒消费者了,消费者消费了,队列就一定有空位,就可以唤醒生产者了。
当挂起等待的时候,可能挂起等待失败,所以要循环再次判断一次。
调用执行:
我们可以以任务的形式来消费和生产任务:
以类为例:
// 消费者
void *consumer(void *agrs)
{BlocQueue<Task> *bq = static_cast<BlocQueue<Task> *>(agrs);while (1){sleep(1);Task t = bq->Pop();int sum = t.Execut();std::cout << "消费了一个任务..." << sum << std::endl;}
}
// 生产者
void *productor(void *agrs)
{BlocQueue<Task> *bq = static_cast<BlocQueue<Task> *>(agrs);while (1){sleep(1);std::cout << "生产了一个任务..." << std::endl;Task t;bq->Enqueue(t);}
}
int main()
{BlocQueue<Task> *bq = new BlocQueue<Task>();pthread_t c, p;pthread_create(&c, nullptr, consumer, bq);pthread_create(&p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
不断地消费和生产任务。
我们下期见!