【Linux系统与网络编程】13:线程同步
线程同步
OVERVIEW
- 线程同步
- 线程同步
- 线程同步-锁
- 互斥锁
- 读写锁
- 死锁
- 线程同步-条件变量
- 条件变量condition
- 生产者消费者模型
- 线程同步-信号量
- 信号量semaphore
- 生产者消费者模型
- 实际应用案例
- 1.多线程求和实现
- 2.多线程处理任务队列
- mylog
- task_queue
- task_queue_test
- make_task_queue.sh
- 结果校验
- PushMessage.log
- PopMessage.log
线程同步
线程间竞争:
- 产生原因:同个进程的线程共享内存空间独立调度,并发执行时重叠执行,以不可预期的顺序执行。
- 产生条件:两个或更多的线程对 共享资源非同步访问。
- 产生位置:需要同步的代码区称为临界区
线程间同步:
-
线程同步:线程间协同步调,按预定的先后次序运行。
某个线程发出某功能调用时,在没有得到返回前该调用不返回,同时其他线程为保证数据一致性,不调用该功能。
-
线程同步是解决竞争的主要方式,即让临界资源的访问顺序变得确定。
-
同步方式:在临界区前加上互斥锁,在临界区后释放互斥锁,某个线程占用互斥锁时,其他线程就会被阻塞。
线程同步-锁
建议锁,对公共数据进行保护,所有线程在访问公共数据之前应该先拿到锁再进行访问,锁不具有强制性。
- 如果公共数据存放在栈空间上,线程间不可互相访问(栈空间各线程独立),
- 如果公共数据存放在堆区,线程间可互相访问
- 如果公共数据存放在全局数据区,线程间可互相访问
互斥锁
linux中提供了互斥锁/互斥量 pthread_mutex_t
,
- pthread_mutex_init
- pthread_mutex_destroy
- pthread_mutex_lock:阻塞等锁
- pthread_mutex_trylock:不阻塞等锁
- pthread_mutex_unlock
尽量保证锁的粒度越小越好,访问资源前加锁,访问结束后立即解锁。
- 互斥锁本质上是结构体(看成整数初值为1)
- 加锁:互斥量 -1,并将其他线程阻塞
- 解锁:互斥量 +1,并唤醒阻塞在锁上的其他线程
- try锁:尝试加锁,成功 -1,失败返回并设置errno为EBUSY
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <sys/types.h>pthread_mutex_t mutex_for_stdout;void *pfunc()
{srand(time(NULL));while (true) {pthread_mutex_lock(&mutex_for_stdout);printf("hello ");sleep(rand() % 3);printf("world\n");pthread_mutex_unlock(&mutex_for_stdout);sleep(rand() % 3);}
}int main()
{pthread_t tid;srand(time(NULL));int ret = pthread_mutex_init(&mutex_for_stdout, NULL);if (ret != 0) {fprintf(stderr, "pthread_mutex_init error: %s\n", strerror(ret));exit(1);}pthread_create(&tid, NULL, pfunc, NULL);while (true) {pthread_mutex_lock(&mutex_for_stdout);printf("HELLO ");sleep(rand() % 3);printf("WORLD\n");pthread_mutex_unlock(&mutex_for_stdout);sleep(rand() % 3);}pthread_join(tid, NULL);pthread_mutex_destroy(&mutex_for_stdout);return 0;
}
读写锁
linux中提供了读写锁 pthread_rwlock_t
,
- pthread_rwlock_init
- pthread_rwlock_destroy
- pthread_rwlock_rdlock:读模式加锁
- pthread_rwlock_wrlock:写模式加锁
- pthread_rwlock_tryrdlock:不阻塞等锁
- pthread_rwlock_trywrlock:不阻塞等锁
- pthread_rwlock_unlock
读写锁非常适合于对数据结构读的次数,远大于写次数的情况。
读写锁也叫共享独占锁(写独占\读共享),当读写锁以读模式锁住时,其是以共享模式锁住的,当读写锁以写模式锁住时,其是以独占模式锁住。
- 当读写锁为写模式加锁时:
- 解锁前所有对该锁,进行加锁的线程都会被阻塞
- 当读写锁为读模式加锁时:
- 如果线程以读模式对其加锁会成功,以写模式对其加锁会阻塞
- 如果同时有读模式\写模式进行加锁的线程,读写锁会阻塞随后的读模式锁请求,优先满足写模式锁请求(写锁优先级更高)。
注:相较于互斥锁而言,读写锁只有在读线程较多的时候,可以提高访问效率。
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <sys/types.h>int g_counter = 0;
pthread_rwlock_t g_rwlock;void* thread_write(void* arg)
{int i = (int)arg;while (true) {pthread_rwlock_wrlock(&g_rwlock);int tmp = g_counter;usleep(1000);printf("+++thread_write[%d: %lu]: counter=%d, ++counter=%d.\n", i, pthread_self(), tmp, ++g_counter);pthread_rwlock_unlock(&g_rwlock);usleep(10 * 1000);}return NULL;
}void* thread_read(void* arg)
{int i = (int)arg;while (true) {pthread_rwlock_rdlock(&g_rwlock);printf("---thread_read[%d: %lu]: counter=%d.\n", i, pthread_self(), g_counter);pthread_rwlock_unlock(&g_rwlock);usleep(2 * 1000);}return NULL;
}int main()
{pthread_t tid_arr[8] = { 0 };pthread_rwlock_init(&g_rwlock, NULL);// 3个线程不定时写同个全局资源for (int i = 0; i < 3; ++i)pthread_create(&tid_arr[i], NULL, thread_write, (void*)i);// 5个线程不定时读同个全局资源for (int i = 3; i < 8; ++i)pthread_create(&tid_arr[i], NULL, thread_read, (void*)i);// 线程加入for (int i = 0; i < 8; ++i)pthread_join(tid_arr[i], NULL);pthread_rwlock_destroy(&g_rwlock);return 0;
}
死锁
在使用锁不恰当时会出现死锁的现象,有如下几种情况:
- 单个线程反复加锁
- 两个线程互相等待锁
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/types.h>int g_vara = 1;
int g_varb = 5;
pthread_mutex_t g_mutex_a, g_mutex_b;
void* dead_lock_tfunc(void* arg)
{int i = (int)arg;if (i == 1) {pthread_mutex_lock(&g_mutex_a); // 先获取变量ag_vara = 11;sleep(1);pthread_mutex_lock(&g_mutex_b);g_varb = 51;pthread_mutex_unlock(&g_mutex_a);pthread_mutex_unlock(&g_mutex_b);} else if (i == 2) {pthread_mutex_lock(&g_mutex_b); // 先获取变量bg_vara = 12;sleep(1);pthread_mutex_lock(&g_mutex_a);g_varb = 52;pthread_mutex_unlock(&g_mutex_a);pthread_mutex_unlock(&g_mutex_b);}printf("thread[%d] finished.\n", i);pthread_exit(NULL);return NULL;
}void dead_lock()
{pthread_t tid1, tid2;pthread_mutex_init(&g_mutex_a, NULL);pthread_mutex_init(&g_mutex_b, NULL);pthread_create(&tid1, NULL, dead_lock_tfunc, (void*)1);pthread_create(&tid2, NULL, dead_lock_tfunc, (void*)2);sleep(3);printf("current: g_vara = %d, g_varb = %d.\n", g_vara, g_varb);int ret1 = pthread_mutex_destroy(&g_mutex_a);int ret2 = pthread_mutex_destroy(&g_mutex_b);if (ret1 == 0 && ret2 == 0)printf("destroy mutex finished.\n");pthread_join(tid1, NULL);pthread_join(tid2, NULL);
}int g_num = 100;
void dead_dlock()
{pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_mutex_lock(&mutex);g_num = 777;pthread_mutex_lock(&mutex);pthread_mutex_unlock(&mutex);printf("dead_lock: g_num = %d.", g_num);pthread_mutex_destroy(&mutex);
}int main()
{dead_lock();// dead_dlock();return 0;
}
线程同步-条件变量
条件变量condition
条件变量不是锁,但其也可以造成线程阻塞,通常配合互斥锁同时使用,为多线程提供会合的场所。
linux中提供了条件变量 pthread_cond_t
,
- pthread_cond_init:动态初始化条件变量
- pthread_cond_wait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex):
- 阻塞线程,等待条件变量cond满足
- 对互斥锁解锁,等价于
pthread_mutex_unlock(&mutex);
- 解除线程阻塞,当条件被满足后
- 对互斥锁加锁,等价于
pthread_mutex_lock(&mutex);
- pthread_cond_timedwait:阻塞
- pthread_cond_signal:通知
- pthread_cond_broadcast:通知
- pthread_cond_destroy
生产者消费者模型
通过条件变量实现生产者、消费者模型。
消费者行为:
- 创建并初始化互斥锁,
- 互斥锁上锁
pthread_mutex_lock(&mutex)
- 等待条件满足:
pthread_cond_wait(&cond, &mutex)
- 阻塞等条件变量
- 解锁unlock
- 被唤醒后加锁
- 开始进行消费
生产者行为:
- 生产数据
- 加锁
pthread_mutex_lock(&mutex)
- 将生产的数据放置到公共区域中
- 解锁
pthread_mutex_unlock(&mutex)
- 通知阻塞在条件变量上的线程
- 循环生产后续数据
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <sys/types.h>void err_thread(int ret, char* str)
{if (ret != 0) {fprintf(stderr, "%s:%s\n", str, strerror(ret));pthread_exit(NULL);}
}typedef struct node_st NODE, *PNODE;
typedef struct node_st {int data_;PNODE next_;
}NODE, *PNODE;
// 链表作为临界区
// typedef stuct link_list_st {
// int size_;
// PNODE head_;
// pthread_cond_t cond_;
// pthread_mutex_t mutex_;
// }LINK_LIST, *PLINK_LIST;
PNODE head_ = NULL;
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;// 消费者
void* func_consumer(void* p)
{PNODE tmp_node;for (;;) {pthread_mutex_lock(&mutex_);while (head_ == NULL) {pthread_cond_wait(&cond_, &mutex_);}// 消费操作tmp_node = head_;head_ = tmp_node->next_;pthread_mutex_unlock(&mutex_);printf("Consumer[%lu]: consume a product[%d].\n", pthread_self(), tmp_node->data_);free(tmp_node);sleep(rand()%3);}
}// 生产者
void* func_producer()
{PNODE tmp_node;for (;;) {// 生产操作tmp_node = malloc(sizeof(NODE));tmp_node->data_ = rand()%1000 + 1;// 头插法pthread_mutex_lock(&mutex_);tmp_node->next_ = head_;head_ = tmp_node;pthread_mutex_unlock(&mutex_);pthread_cond_signal(&cond_);printf("Producer[%lu]: produce a product[%d].\n", pthread_self(), tmp_node->data_);sleep(rand()%3);}
}int main()
{int ret = 0;srand(time(NULL));// 生产者进程pthread_t pid;ret = pthread_create(&pid, NULL, func_producer, NULL);if (ret != 0) {err_thread(ret, "pthread_create pid error");}// 消费者进程pthread_t cid_arr[5];for (int i = 0; i < 5; ++i) {ret = pthread_create(&cid_arr[i], NULL, func_consumer, NULL);if (ret != 0)err_thread(ret, "pthread_create cid error");}pthread_join(pid, NULL);for (int i = 0; i < 5; ++i)pthread_join(cid_arr[i], NULL);return 0;
}
线程同步-信号量
信号量semaphore
使用互斥量后多线程对临界资源的访问顺序,变成了串行,这没有发挥多线程并发执行的优势、降低了执行效率。
以及借助信号量来处理该问题:信号量相当于初始化值为N的 互斥量,N表示可以同时访问共享数据区的线程数量(占用信号量的个数),
linux中提供了信号量 sem_t
,
-
sem_init(sem_t* sem, int pshared, unsigned int value):
- pshared:0线程间同步、1进程间同步
- value:信号量初始值 N
-
sem_wait:加锁,每次调用信号量 -1,直到为0时线程阻塞
-
sem_trywait:加锁,每次调用信号量 -1,直到为0时线程阻塞
-
sem_timedwait:加锁,每次调用信号量 -1,直到为0时线程阻塞
-
sem_post:解锁
-
sem_destroy:
关于PV操作:
PV操作是操作系统中用于进程同步和互斥的一种机制,主要用来管理对共享资源的访问,避免多个进程同时访问共享资源。PV操作基于信号量Semaphore,信号量是一种特殊的变量,只能通过两种原子操作来访问:P操作(也称为wait、down、lock或减操作)和V操作(也称为signal、up、unlock或加操作)。
- P操作 Proberen测试:将信号量值减1;如果信号量值变为负数,则执行P操作的进程被阻塞,加入等待队列,直到其他进程执行了V操作后才有可能被唤醒。
- V操作 Verhogen增加:将信号量值加1;如果信号量的值小于等于0,表示有进程正在等待此信号量,那么系统会从等待队列中选择一个进程将其唤醒。
生产者消费者模型
借助信号量来实现生产者、消费者模型:
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/types.h>#define QUEUE_SIZE 5int g_queue[QUEUE_SIZE] = { 0 };// 环形队列
sem_t g_blank_number; // 空格信号量
sem_t g_product_number; // 产品信号量void* producer(void* arg)
{int i = 0;while (true) {sem_wait(&g_blank_number); // 生产者将空格数 -1 为0则阻塞等待g_queue[i] = rand()%1000 + 1;printf("Producer[%lu]: produce a product[%d].\n", pthread_self(), g_queue[i]);sem_post(&g_product_number);i = (i + 1) % QUEUE_SIZE; // 借助下标 实现环形队列sleep(rand()%1);}
}void* consumer(void* arg)
{int i = 0;while (true) {sem_wait(&g_product_number);printf("Consumer[%lu]: consume a product[%d].\n", pthread_self(), g_queue[i]);g_queue[i] = 0;sem_post(&g_blank_number);i = (i + 1) % QUEUE_SIZE; // 借助下标 实现环形队列sleep(rand()%3);}
}int main()
{pthread_t pid, cid;sem_init(&g_blank_number, 0, QUEUE_SIZE);sem_init(&g_product_number, 0, 0);pthread_create(&pid, NULL, producer, NULL);pthread_create(&cid, NULL, consumer, NULL);pthread_join(pid, NULL);pthread_join(cid, NULL);sem_destroy(&g_blank_number);sem_destroy(&g_product_number);return 0;
}
实际应用案例
1.多线程求和实现
多线程实现前n项求和,利用互斥量保证多线程环境下,数据访问的安全性。
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>
#include <stdbool.h>long long g_cur = 0;
long long g_sum = 0;
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;void *culculate(void *arg) {int num = *(int*)arg;while(true) {if (g_cur > num)break;g_sum += g_cur;g_cur++;}return NULL;
}// 多线程环境下需要加锁
void *safe_culculate(void *arg) {int num = *(int *)arg;while(true) {pthread_mutex_lock(&g_mutex);if (g_cur > num) {pthread_mutex_unlock(&g_mutex);break;}g_sum += g_cur;g_cur++;pthread_mutex_unlock(&g_mutex);}return NULL;
}//实现多线程计算
//./a.out -i thread_num -n num
int main(int argc, char *argv[]) {//1.命令行解析int opt;int thread_num = 5;//线程个数int num = 100;//求和的数字while ((opt = getopt(argc, argv, "i:n:")) != -1) {switch (opt) {case 'i':thread_num = atoi(optarg);break;case 'n':num = atoi(optarg);break;default:fprintf(stderr, "Usage : %s -i thread_num -n num\n", argv[0]);exit(1);}}//2.多线程计算pthread_t *threadarr = calloc(thread_num, sizeof(pthread_t));for (int i = 0; i < thread_num; ++i)pthread_create(&threadarr[i], NULL, safe_culculate, &num);for (int i = 0; i < thread_num; ++i)pthread_join(threadarr[i], NULL);printf("main : now = %lld, sum = %lld\n", g_cur - 1, g_sum); free(threadarr);return 0;
}
多线程编程中的竞态条件(race condition):
如果不使用互斥锁,多个线程同时访问和修改全局变量 g_cur
和 g_sum
,由于缺乏同步机制可能会发生以下情况:
- 读取和写入顺序:A线程读取了
g_cur
,B线程也读取了相同的值,接着两个线程分别增加g_sum
并更新了g_cur
,导致g_sum
被重复增加同个值。 - 状态不一致:A线程正在更新
g_cur
,B线程在同时间开始检查g_cur
是否大于num
,这可能导致某些线程错过终止条件,继续执行不必要的操作。
2.多线程处理任务队列
对于线程来说,处理请求的时间是非常短的,
如果操作系统的请求数量非常大,为每个请求都创建线程处理后进行线程销毁,这是一种非常消耗内存的操作。
这时应该使用线程池,线程工作完成后不进行销毁操作,以减小内存开销。
mylog
#ifndef _MY_LOG_H
#define _MY_LOG_Hvoid initLogFile();
void closeLogFile();
void logPushMessage(pthread_t thread_id, const char *str);#endif
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>#include "mylog.h"FILE *log_file = NULL;
pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;// 初始化日志文件
void initLogFile() {pthread_mutex_lock(&log_mutex);log_file = fopen("PushMessage.log", "a");if (log_file == NULL) {perror("Failed to open log file");}pthread_mutex_unlock(&log_mutex);
}// 关闭日志文件
void closeLogFile() {pthread_mutex_lock(&log_mutex);if (log_file != NULL) {fclose(log_file);log_file = NULL;}pthread_mutex_unlock(&log_mutex);
}// 线程安全的日志打印函数
void logPushMessage(pthread_t thread_id, const char *str) {pthread_mutex_lock(&log_mutex);if (log_file != NULL) {fprintf(log_file, "<Pop>(Thread %ld) : %s", thread_id, str);fflush(log_file);} else {fprintf(stderr, "Log file is not initialized!\n");}pthread_mutex_unlock(&log_mutex);
}
task_queue
- 利用队列、锁、条件变量实现模拟线程池的调度过程:
#ifndef _THREAD_POOL_H
#define _THREAD_POOL_Hstruct taskQueue {int size;//总容量int count;//已使用容量int head;//头指针int tail;//尾指针void **data;//模拟任务pthread_mutex_t mutex;pthread_cond_t cond;
};void taskQueueInit(struct taskQueue *taskq, int size);//队列初始化
void taskQueuePush(struct taskQueue *taskq, char *str);//入队
char *taskQueuePop(struct taskQueue *taskq);//出队#endif
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/types.h>#include "mylog.h"
#include "task_queue.h"//队列初始化
void taskQueueInit(struct taskQueue *taskqtest, int size) {taskqtest->head = 0;taskqtest->tail = 0;taskqtest->count = 0;taskqtest->size = size;taskqtest->data = calloc(size, sizeof(void *));pthread_mutex_init(&taskqtest->mutex, NULL);pthread_cond_init(&taskqtest->cond, NULL);
}//入队
void taskQueuePush(struct taskQueue *taskq, char *str) {pthread_mutex_lock(&taskq->mutex);if (taskq->count == taskq->size) {printf("taskq is full!\n");pthread_mutex_unlock(&taskq->mutex);return;}// 进行入队操作logPushMessage(pthread_self(), str);// printf("<Push>(Thread %ld) : %s\n", pthread_self(), str);// taskq->data[taskq->tail] = str;taskq->data[taskq->tail] = strdup(str);//拷贝taskq->count++;taskq->tail++;if (taskq->tail == taskq->size)//循环队列taskq->tail = 0;pthread_cond_signal(&taskq->cond);pthread_mutex_unlock(&taskq->mutex);
}//出队
char *taskQueuePop(struct taskQueue *taskq) {pthread_mutex_lock(&taskq->mutex);while (taskq->count == 0) {// printf("taskq is empty\n");pthread_cond_wait(&taskq->cond, &taskq->mutex);//wait可以进行入队操作}//进行出队操作char *str = taskq->data[taskq->head];taskq->count--;taskq->head++;printf("<Pop>(Thread %ld) : %s", pthread_self(), str);if (taskq->head == taskq->size)//循环队列taskq->head = 0;pthread_mutex_unlock(&taskq->mutex);return str;
}
task_queue_test
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <sys/types.h>#include "task_queue/mylog.h"
#include "task_queue/task_queue.h"#define INS 5 //线程数
#define QUEUE_SIZE 100 //任务队列大小// 字符串打印
void *doWork(void *arg) {pthread_detach(pthread_self());struct taskQueue *taskq = (struct taskQueue *)arg;while(true) {char *str = taskQueuePop(taskq);free(str);//printf("%ld : %s\n", pthread_self(), str);}
}int main() {initLogFile();// 任务队列初始化struct taskQueue taskq;taskQueueInit(&taskq, QUEUE_SIZE);// struct taskQueue *taskq = calloc(QUEUE_SIZE + 5, sizeof(struct taskQueue));// taskQueueInit(taskq, QUEUE_SIZE);// 多线程处理任务队列pthread_t threadarr[INS + 5];for (int i = 0; i < INS; ++i)pthread_create(&threadarr[i], NULL, doWork, (void *)&taskq);// 读取文件每行数据 并将其加入任务队列中int nTimes = 5;while(nTimes--) {// 打开文件FILE *fp = fopen("./task_queue_test.c", "r");if (fp == NULL) {perror("fopen");exit(1);}// 读取文件中的每行数据并Push到taskQueue中int idx = 0;char buff[QUEUE_SIZE][2048] = {0};while(fgets(buff[idx], 2048, fp) != NULL) {taskQueuePush(&taskq, buff[idx]);//如果i++之后等于QUEUE_SIZE 则表示临时的二维数组满了//将i置为0重复利用二维数组if (idx++ == QUEUE_SIZE)idx = 0;//任务队列已满 自旋等待if (taskq.size == taskq.count) {while(true) {if (taskq.count < taskq.size)break;usleep(1000);}}}fclose(fp);}usleep(1000);closeLogFile();return 0;
}
make_task_queue.sh
#!/bin/bash
rm -f a.out
rm -f PopMessage.log
rm -f PushMessage.log
gcc -g task_queue_test.c ./task_queue/*.c -I ./task_queue -lpthread -o a.out
./a.out > PopMessage.log
结果校验
线程池的最佳大小取决于处理器的数目,或者队列中任务的性质,一般线程的数量设置为略多于处理器核心数量。
检查PopMessage.log文件与PushMessage.log文件,进行内容对比发现线程池运行正常。