Day30 多线程编程 同步与互斥 任务队列调度
day30 多线程编程 同步与互斥 任务队列调度
主线程与工作线程协作模型
主线程负责初始化数据结构和创建线程,工作线程执行具体计算任务。主线程负责回收线程资源并汇总显示结果。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>// 线程函数1:随机执行1-5次打印操作
void* th1(void* arg) {int i = rand() % 5 + 1; // 生成1-5的随机循环次数while (i--) {printf("th1 ,tid:%lu\n", pthread_self()); // 打印线程IDsleep(1); // 休眠1秒}return NULL;
}// 线程函数2:随机执行1-5次打印操作(错误地使用了th1的打印信息)
void* th2(void* arg) {int i = rand() % 5 + 1; // 生成1-5的随机循环次数while (i--) {printf("th1 ,tid:%lu\n", pthread_self()); // 打印线程ID(应为th2)sleep(1); // 休眠1秒}return NULL;
}// 线程函数3:随机执行1-5次打印操作
void* th3(void* arg) {int i = rand() % 5 + 1; // 生成1-5的随机循环次数while (i--) {printf("th2 ,tid:%lu\n", pthread_self()); // 打印线程IDsleep(1); // 休眠1秒}return NULL;
}// 函数指针类型定义,指向线程处理函数
typedef void* (*PFUN)(void* arg);int main(int argc, char **argv) {pthread_t tid[10]; // 线程ID数组// 线程函数指针数组,按指定顺序排列PFUN th[] = {th1, th3, th2, th1, th3, th2, th1, th3, th2, th1};int i = 0;// 创建10个线程for (i = 0; i < 10; i++) {pthread_create(&tid[i], NULL, th[i], NULL); }// 等待所有线程结束for (i = 0; i < 10; i++) {pthread_join(tid[i], NULL);}printf("main th will exit...\n"); // 主线程退出提示return 0;
}
代码运行理想结果:
th1 ,tid:140735900123456
th2 ,tid:140735891730752
th1 ,tid:140735883338048
th1 ,tid:140735874945344
th2 ,tid:140735866552640
th1 ,tid:140735858159936
th1 ,tid:140735849767232
th2 ,tid:140735841374528
th1 ,tid:140735832981824
th1 ,tid:140735824589120
main th will exit...
注:实际输出顺序和线程ID会因随机数和系统调度而不同,但会包含10组线程输出,最后显示主线程退出信息
线程控制:互斥与同步
互斥机制
概念:
- 互斥:在多线程环境中对临界资源的排他性访问控制
- 互斥锁:实现临界资源访问控制的核心机制
互斥锁操作框架:
定义互斥锁 → 初始化锁 → 加锁 → 解锁 → 销毁锁
互斥锁核心操作
操作步骤 | 函数原型 | 功能说明 | 返回值 |
---|---|---|---|
定义 | pthread_mutex_t mutex; | 声明互斥锁变量 | - |
初始化 | int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr); | 初始化互斥锁(attr通常为NULL) | 成功返回0,失败返回非0 |
加锁 | int pthread_mutex_lock(pthread_mutex_t *mutex); | 获取锁并阻塞直至成功(原子操作开始) | 成功返回0,失败返回非0 |
解锁 | int pthread_mutex_unlock(pthread_mutex_t *mutex); | 释放锁(原子操作结束) | 成功返回0,失败返回非0 |
销毁 | int pthread_mutex_destroy(pthread_mutex_t *mutex); | 销毁不再使用的互斥锁 | 成功返回0,失败返回非0 |
非阻塞加锁 | int pthread_mutex_trylock(pthread_mutex_t *mutex); | 尝试获取锁(立即返回,不阻塞) | 成功返回0,失败返回EBUSY |
互斥锁应用示例:计数器保护
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>pthread_mutex_t mutex; // 定义互斥锁
int count = 0; // 共享资源(临界资源)// 线程函数:对共享计数器进行5000次递增
void* th(void* arg) {int i = 5000;while (i--) {pthread_mutex_lock(&mutex); // 加锁(进入临界区)int tmp = count; // 读取当前值printf("count:%d\n", tmp + 1); // 打印新值count = tmp + 1; // 更新计数器pthread_mutex_unlock(&mutex); // 解锁(退出临界区)}return NULL;
}int main(int argc, char** argv) {pthread_t tid1, tid2;pthread_mutex_init(&mutex, NULL); // 初始化互斥锁// 创建两个线程pthread_create(&tid1, NULL, th, NULL);pthread_create(&tid2, NULL, th, NULL);// 等待线程结束pthread_join(tid1, NULL);pthread_join(tid2, NULL);pthread_mutex_destroy(&mutex); // 销毁互斥锁return 0;
}
代码运行理想结果:
count:1
count:2
...
count:10000
注:输出严格按1-10000顺序递增,无重复或跳号,证明互斥锁有效保护了共享资源
互斥锁应用示例:资源池管理
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>int WIN = 3; // 可用资源数量(临界资源)
pthread_mutex_t mutex; // 互斥锁// 线程函数:模拟资源获取与释放
void* th(void* arg) {while (1) {pthread_mutex_lock(&mutex); // 加锁if (WIN > 0) { // 检查资源可用性WIN--; // 占用资源pthread_mutex_unlock(&mutex); // 解锁(允许其他线程检查)printf("get win...\n"); // 获取资源成功sleep(rand() % 5 + 1); // 模拟资源使用时间(1-5秒)printf("release win...\n"); // 释放资源pthread_mutex_lock(&mutex); // 重新加锁WIN++; // 释放资源pthread_mutex_unlock(&mutex); // 解锁break;} else {pthread_mutex_unlock(&mutex); // 无资源时释放锁}}return NULL;
}int main(int argc, char** argv) {pthread_t tid[10] = {0};srand(time(NULL)); // 初始化随机种子pthread_mutex_init(&mutex, NULL); // 初始化互斥锁// 创建10个线程for (int i = 0; i < 10; i++) {pthread_create(&tid[i], NULL, th, NULL);}// 等待所有线程结束for (int i = 0; i < 10; i++) {pthread_join(tid[i], NULL);}pthread_mutex_destroy(&mutex); // 销毁互斥锁return 0;
}
代码运行理想结果:
get win...
get win...
get win...
release win...
get win...
release win...
release win...
get win...
...
release win...
注:任意时刻最多3个线程同时持有资源,资源获取/释放顺序可能不同,但不会出现超过3个"get win"连续出现
线程同步:信号量机制
同步机制
概念:
- 同步:对资源的有顺序要求的排他性访问
- 信号量:实现线程同步的核心机制(POSIX标准)
- 与互斥区别:互斥仅保证排他访问,同步在互斥基础上增加执行顺序控制
信号量操作框架:
定义信号量 → 初始化 → P操作(申请) → V操作(释放) → 销毁
信号量核心操作
操作步骤 | 函数原型 | 功能说明 | 返回值 |
---|---|---|---|
定义 | sem_t sem; | 声明信号量变量 | - |
初始化 | int sem_init(sem_t *sem, int pshared, unsigned int value); | 初始化信号量(pshared=0表示线程间) | 成功返回0,失败返回-1 |
P操作 | int sem_wait(sem_t *sem); | 申请资源(值减1,若≤0则阻塞) | 成功返回0,失败返回-1 |
V操作 | int sem_post(sem_t *sem); | 释放资源(值加1,唤醒等待线程) | 成功返回0,失败返回-1 |
销毁 | int sem_destroy(sem_t *sem); | 销毁不再使用的信号量 | 成功返回0,失败返回-1 |
信号量应用示例:线程顺序控制
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h>sem_t sem_H, sem_W; // 定义两个信号量// 线程1:打印"hello, "
void* th1(void* arg) {int i = 10;while (i--) {sem_wait(&sem_H); // P操作:等待sem_H信号量printf("hello, ");fflush(stdout); // 立即刷新缓冲区sem_post(&sem_W); // V操作:释放sem_W信号量}return NULL;
}// 线程2:打印"world\n"
void* th2(void* arg) {int i = 10;while (i--) {sem_wait(&sem_W); // P操作:等待sem_W信号量printf("world\n");sleep(1); // 模拟处理延迟sem_post(&sem_H); // V操作:释放sem_H信号量}return NULL;
}int main(int argc, char **argv) {pthread_t tid1, tid2;sem_init(&sem_H, 0, 1); // 初始化sem_H=1(初始允许th1运行)sem_init(&sem_W, 0, 0); // 初始化sem_W=0(初始阻塞th2)// 创建线程pthread_create(&tid1, NULL, th1, NULL);pthread_create(&tid2, NULL, th2, NULL);// 等待线程结束pthread_join(tid1, NULL);pthread_join(tid2, NULL);// 销毁信号量sem_destroy(&sem_H);sem_destroy(&sem_W);return 0;
}
代码运行理想结果:
hello, world
hello, world
hello, world
...
hello, world
注:严格交替输出"hello, world",共10组,无乱序或重复
信号量应用示例:资源池管理(计数信号量)
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>sem_t sem_WIN; // 定义信号量// 线程函数:使用信号量控制资源访问
void* th(void* arg) {sem_wait(&sem_WIN); // P操作:申请资源(阻塞直至有资源)printf("get win...\n");sleep(rand() % 5 + 1); // 模拟资源使用时间(1-5秒)printf("release win...\n");sem_post(&sem_WIN); // V操作:释放资源return NULL;
}int main(int argc, char** argv) {pthread_t tid[10] = {0};srand(time(NULL)); // 初始化随机种子sem_init(&sem_WIN, 0, 3); // 初始化信号量=3(3个可用资源)// 创建10个线程for (int i = 0; i < 10; i++) {pthread_create(&tid[i], NULL, th, NULL);}// 等待所有线程结束for (int i = 0; i < 10; i++) {pthread_join(tid[i], NULL);}sem_destroy(&sem_WIN); // 销毁信号量return 0;
}
代码运行理想结果:
get win...
get win...
get win...
release win...
get win...
release win...
release win...
get win...
...
release win...
注:任意时刻最多3个"get win"连续出现,资源释放后立即有新线程获取
综合应用:任务队列调度系统
顺序队列实现(头文件)
#ifndef __SEQQUE__H__
#define __SEQQUE__H__// 任务数据类型
typedef struct {char task_name[50]; // 任务名称int task_time; // 任务执行时间(秒)
} DATATYPE;// 顺序队列结构
typedef struct {DATATYPE* array; // 任务数组int head; // 队头索引int tail; // 队尾索引int tlen; // 队列总长度
} SeqQue;// 队列操作函数声明
SeqQue* CreateSeqQue(int len); // 创建队列
int EnterSeqQue(SeqQue* sq, DATATYPE* data); // 入队
int QuitSeqQue(SeqQue* sq); // 出队
DATATYPE* GetHeadSeqQue(SeqQue* sq); // 获取队头
int IsEmptySeqQue(SeqQue* sq); // 队空判断
int IsFullSeqQue(SeqQue* sq); // 队满判断
int DestroySeqQue(SeqQue* sq); // 销毁队列#endif // !__SEQQUE__H__
顺序队列实现(源文件)
#include "seqque.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>// 创建指定长度的队列
SeqQue* CreateSeqQue(int len) {SeqQue* sq = malloc(sizeof(SeqQue)); // 分配队列结构内存if (NULL == sq) {perror("CreateSeqQue malloc");return NULL;}sq->array = malloc(sizeof(DATATYPE) * len); // 分配任务数组内存if (NULL == sq->array) {perror("CreateSeqQue malloc2");free(sq);return NULL;}sq->head = 0; // 初始化队头sq->tail = 0; // 初始化队尾sq->tlen = len; // 设置队列长度return sq;
}// 任务入队
int EnterSeqQue(SeqQue* sq, DATATYPE* data) {if (IsFullSeqQue(sq)) { // 检查队列是否已满printf("queue is full\n");return 1;}memcpy(&sq->array[sq->tail], data, sizeof(DATATYPE)); // 复制任务数据sq->tail = (sq->tail + 1) % sq->tlen; // 循环队列:队尾后移return 0;
}// 任务出队
int QuitSeqQue(SeqQue* sq) {if (IsEmptySeqQue(sq)) { // 检查队列是否为空printf("queue is empty\n");return 1;}sq->head = (sq->head + 1) % sq->tlen; // 循环队列:队头后移return 0;
}// 获取队头任务
DATATYPE* GetHeadSeqQue(SeqQue* sq) {return &sq->array[sq->head]; // 返回队头指针
}// 队列空判断
int IsEmptySeqQue(SeqQue* sq) {return sq->head == sq->tail; // 头尾指针重合表示空队列
}// 队列满判断
int IsFullSeqQue(SeqQue* sq) {// 循环队列:队尾+1模长度等于队头表示满return (sq->tail + 1) % sq->tlen == sq->head;
}// 销毁队列
int DestroySeqQue(SeqQue* sq) {free(sq->array); // 释放任务数组free(sq); // 释放队列结构return 0;
}
任务调度系统
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "seqque.h"sem_t sem_task; // 任务信号量
pthread_mutex_t mutex; // 队列互斥锁// 预定义任务列表
DATATYPE data[] = {{"cooking", 5}, // 烹饪任务(5秒){"washing", 9}, // 洗涤任务(9秒){"dohomewoing", 3}, // 作业任务(3秒){"over", 3}, // 结束任务
};// 任务选择界面
int show_task() {int i = 0;for (i = 0; i < 4; i++) {printf("%d %s\n", i, data[i].task_name); // 显示任务列表}char tmp[5] = {0};fgets(tmp, sizeof(tmp), stdin); // 读取用户输入int num = atoi(tmp); // 转换为整数return num;
}// 工作线程:处理任务队列
void* th(void* arg) {SeqQue* sq = (SeqQue*)arg; // 获取队列指针DATATYPE task;while (1) {bzero(&task, sizeof(task)); // 清空任务结构sem_wait(&sem_task); // P操作:等待任务信号量pthread_mutex_lock(&mutex); // 加锁保护队列DATATYPE* tmp = GetHeadSeqQue(sq); // 获取队头任务memcpy(&task, tmp, sizeof(DATATYPE)); // 复制任务数据QuitSeqQue(sq); // 任务出队pthread_mutex_unlock(&mutex); // 解锁// 检查结束任务if (0 == strcmp(task.task_name, "over")) {break;}// 执行任务(每秒打印一次)while (task.task_time--) {printf("i'm proccessing %s \n", task.task_name);sleep(1);}}return NULL;
}int main(int argc, char** argv) {pthread_t tid;SeqQue* sq = CreateSeqQue(10); // 创建任务队列sem_init(&sem_task, 0, 0); // 初始化任务信号量=0pthread_mutex_init(&mutex, NULL); // 初始化队列互斥锁pthread_create(&tid, NULL, th, sq); // 创建工作线程// 主线程:接收用户任务输入while (1) {int num = show_task(); // 显示任务菜单if (num < 0 || num > 3) { // 无效输入处理continue;}pthread_mutex_lock(&mutex); // 加锁保护队列EnterSeqQue(sq, &data[num]); // 任务入队pthread_mutex_unlock(&mutex); // 解锁sem_post(&sem_task); // V操作:增加任务信号量// 检查结束任务if (3 == num) {break;}}// 清理资源pthread_join(tid, NULL);sem_destroy(&sem_task);pthread_mutex_destroy(&mutex);DestroySeqQue(sq);return 0;
}
代码运行理想结果:
0 cooking
1 washing
2 dohomewoing
3 over
0
i'm proccessing cooking
i'm proccessing cooking
i'm proccessing cooking
i'm proccessing cooking
i'm proccessing cooking
0 cooking
1 washing
2 dohomewoing
3 over
1
i'm proccessing washing
i'm proccessing washing
...(持续9秒)...
0 cooking
1 washing
2 dohomewoing
3 over
3
注:用户选择任务后,工作线程按队列顺序执行任务,每个任务按预设时间逐秒输出执行状态
核心概念回顾
线程控制机制对比
特性 | 互斥锁 (Mutex) | 信号量 (Semaphore) |
---|---|---|
核心目的 | 保证临界资源的排他性访问 | 实现有顺序要求的资源访问 |
资源控制 | 仅保证单一资源访问权 | 可管理多个资源实例(计数信号量) |
操作主体 | 同一线程必须完成加锁/解锁 | 申请§和释放(V)可由不同线程执行 |
典型场景 | 保护共享数据结构(如计数器) | 控制执行顺序(如生产者-消费者) |
资源数量 | 本质是二值信号量(0/1) | 支持多值(0-N) |
原子区域 | 应尽可能短小 | 可包含一定延迟操作 |
互斥与同步的本质区别
- 顺序要求:同步机制强制执行顺序(如A必须在B前),互斥仅保证单一访问
- 操作主体:互斥锁要求同一线程加锁/解锁;信号量中P/V操作可由不同线程执行
- 资源管理:
- 二值信号量用于同步:申请和释放由不同线程完成
- 计数信号量用于互斥:申请和释放由同一线程完成
死锁产生的四个必要条件
- 互斥条件:资源一次仅能被一个线程使用
- 请求与保持条件:线程阻塞时仍持有已获取资源
- 不剥夺条件:已分配资源不能被系统强行回收
- 循环等待条件:线程间形成环形资源等待链
关键提示:避免死锁需破坏至少一个必要条件,常见策略包括资源有序分配、超时重试机制等。在实际编程中,应确保锁的获取顺序一致,并限制临界区执行时间。