当前位置: 首页 > news >正文

Day30 多线程编程 同步与互斥 任务队列调度

day30 多线程编程 同步与互斥 任务队列调度

主线程与工作线程协作模型

主线程负责初始化数据结构和创建线程,工作线程执行具体计算任务。主线程负责回收线程资源并汇总显示结果。

#include <stdio.h>
#include <pthread.h> 
#include <stdlib.h>
#include <string.h>
#include <unistd.h>// 线程函数1:随机执行1-5次打印操作
void* th1(void* arg) {int i = rand() % 5 + 1;  // 生成1-5的随机循环次数while (i--) {printf("th1 ,tid:%lu\n", pthread_self());  // 打印线程IDsleep(1);  // 休眠1秒}return NULL;
}// 线程函数2:随机执行1-5次打印操作(错误地使用了th1的打印信息)
void* th2(void* arg) {int i = rand() % 5 + 1;  // 生成1-5的随机循环次数while (i--) {printf("th1 ,tid:%lu\n", pthread_self());  // 打印线程ID(应为th2)sleep(1);  // 休眠1秒}return NULL;
}// 线程函数3:随机执行1-5次打印操作
void* th3(void* arg) {int i = rand() % 5 + 1;  // 生成1-5的随机循环次数while (i--) {printf("th2 ,tid:%lu\n", pthread_self());  // 打印线程IDsleep(1);  // 休眠1秒}return NULL;
}// 函数指针类型定义,指向线程处理函数
typedef void* (*PFUN)(void* arg);int main(int argc, char **argv) {pthread_t tid[10];  // 线程ID数组// 线程函数指针数组,按指定顺序排列PFUN th[] = {th1, th3, th2, th1, th3, th2, th1, th3, th2, th1};int i = 0;// 创建10个线程for (i = 0; i < 10; i++) {pthread_create(&tid[i], NULL, th[i], NULL); }// 等待所有线程结束for (i = 0; i < 10; i++) {pthread_join(tid[i], NULL);}printf("main th will exit...\n");  // 主线程退出提示return 0;
}

代码运行理想结果

th1 ,tid:140735900123456
th2 ,tid:140735891730752
th1 ,tid:140735883338048
th1 ,tid:140735874945344
th2 ,tid:140735866552640
th1 ,tid:140735858159936
th1 ,tid:140735849767232
th2 ,tid:140735841374528
th1 ,tid:140735832981824
th1 ,tid:140735824589120
main th will exit...

注:实际输出顺序和线程ID会因随机数和系统调度而不同,但会包含10组线程输出,最后显示主线程退出信息


线程控制:互斥与同步

互斥机制

概念

  • 互斥:在多线程环境中对临界资源的排他性访问控制
  • 互斥锁:实现临界资源访问控制的核心机制

互斥锁操作框架

定义互斥锁 → 初始化锁 → 加锁 → 解锁 → 销毁锁
互斥锁核心操作
操作步骤函数原型功能说明返回值
定义pthread_mutex_t mutex;声明互斥锁变量-
初始化int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);初始化互斥锁(attr通常为NULL)成功返回0,失败返回非0
加锁int pthread_mutex_lock(pthread_mutex_t *mutex);获取锁并阻塞直至成功(原子操作开始)成功返回0,失败返回非0
解锁int pthread_mutex_unlock(pthread_mutex_t *mutex);释放锁(原子操作结束)成功返回0,失败返回非0
销毁int pthread_mutex_destroy(pthread_mutex_t *mutex);销毁不再使用的互斥锁成功返回0,失败返回非0
非阻塞加锁int pthread_mutex_trylock(pthread_mutex_t *mutex);尝试获取锁(立即返回,不阻塞)成功返回0,失败返回EBUSY
互斥锁应用示例:计数器保护
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>pthread_mutex_t mutex;  // 定义互斥锁
int count = 0;  // 共享资源(临界资源)// 线程函数:对共享计数器进行5000次递增
void* th(void* arg) {int i = 5000;while (i--) {pthread_mutex_lock(&mutex);  // 加锁(进入临界区)int tmp = count;  // 读取当前值printf("count:%d\n", tmp + 1);  // 打印新值count = tmp + 1;  // 更新计数器pthread_mutex_unlock(&mutex);  // 解锁(退出临界区)}return NULL;
}int main(int argc, char** argv) {pthread_t tid1, tid2;pthread_mutex_init(&mutex, NULL);  // 初始化互斥锁// 创建两个线程pthread_create(&tid1, NULL, th, NULL);pthread_create(&tid2, NULL, th, NULL);// 等待线程结束pthread_join(tid1, NULL);pthread_join(tid2, NULL);pthread_mutex_destroy(&mutex);  // 销毁互斥锁return 0;
}

代码运行理想结果

count:1
count:2
...
count:10000

注:输出严格按1-10000顺序递增,无重复或跳号,证明互斥锁有效保护了共享资源

互斥锁应用示例:资源池管理
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>int WIN = 3;  // 可用资源数量(临界资源)
pthread_mutex_t mutex;  // 互斥锁// 线程函数:模拟资源获取与释放
void* th(void* arg) {while (1) {pthread_mutex_lock(&mutex);  // 加锁if (WIN > 0) {  // 检查资源可用性WIN--;  // 占用资源pthread_mutex_unlock(&mutex);  // 解锁(允许其他线程检查)printf("get win...\n");  // 获取资源成功sleep(rand() % 5 + 1);  // 模拟资源使用时间(1-5秒)printf("release win...\n");  // 释放资源pthread_mutex_lock(&mutex);  // 重新加锁WIN++;  // 释放资源pthread_mutex_unlock(&mutex);  // 解锁break;} else {pthread_mutex_unlock(&mutex);  // 无资源时释放锁}}return NULL;
}int main(int argc, char** argv) {pthread_t tid[10] = {0};srand(time(NULL));  // 初始化随机种子pthread_mutex_init(&mutex, NULL);  // 初始化互斥锁// 创建10个线程for (int i = 0; i < 10; i++) {pthread_create(&tid[i], NULL, th, NULL);}// 等待所有线程结束for (int i = 0; i < 10; i++) {pthread_join(tid[i], NULL);}pthread_mutex_destroy(&mutex);  // 销毁互斥锁return 0;
}

代码运行理想结果

get win...
get win...
get win...
release win...
get win...
release win...
release win...
get win...
...
release win...

注:任意时刻最多3个线程同时持有资源,资源获取/释放顺序可能不同,但不会出现超过3个"get win"连续出现


线程同步:信号量机制

同步机制

概念

  • 同步:对资源的有顺序要求的排他性访问
  • 信号量:实现线程同步的核心机制(POSIX标准)
  • 与互斥区别:互斥仅保证排他访问,同步在互斥基础上增加执行顺序控制

在这里插入图片描述

信号量操作框架

定义信号量 → 初始化 → P操作(申请) → V操作(释放) → 销毁

在这里插入图片描述

信号量核心操作
操作步骤函数原型功能说明返回值
定义sem_t sem;声明信号量变量-
初始化int sem_init(sem_t *sem, int pshared, unsigned int value);初始化信号量(pshared=0表示线程间)成功返回0,失败返回-1
P操作int sem_wait(sem_t *sem);申请资源(值减1,若≤0则阻塞)成功返回0,失败返回-1
V操作int sem_post(sem_t *sem);释放资源(值加1,唤醒等待线程)成功返回0,失败返回-1
销毁int sem_destroy(sem_t *sem);销毁不再使用的信号量成功返回0,失败返回-1
信号量应用示例:线程顺序控制
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <unistd.h>sem_t sem_H, sem_W;  // 定义两个信号量// 线程1:打印"hello, "
void* th1(void* arg) {int i = 10;while (i--) {sem_wait(&sem_H);   // P操作:等待sem_H信号量printf("hello, ");fflush(stdout);  // 立即刷新缓冲区sem_post(&sem_W);   // V操作:释放sem_W信号量}return NULL;
}// 线程2:打印"world\n"
void* th2(void* arg) {int i = 10;while (i--) {sem_wait(&sem_W);  // P操作:等待sem_W信号量printf("world\n");sleep(1);  // 模拟处理延迟sem_post(&sem_H);  // V操作:释放sem_H信号量}return NULL;
}int main(int argc, char **argv) {pthread_t tid1, tid2;sem_init(&sem_H, 0, 1);  // 初始化sem_H=1(初始允许th1运行)sem_init(&sem_W, 0, 0);  // 初始化sem_W=0(初始阻塞th2)// 创建线程pthread_create(&tid1, NULL, th1, NULL);pthread_create(&tid2, NULL, th2, NULL);// 等待线程结束pthread_join(tid1, NULL);pthread_join(tid2, NULL);// 销毁信号量sem_destroy(&sem_H);sem_destroy(&sem_W);return 0;
}

代码运行理想结果

hello, world
hello, world
hello, world
...
hello, world

注:严格交替输出"hello, world",共10组,无乱序或重复

信号量应用示例:资源池管理(计数信号量)
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>sem_t sem_WIN;  // 定义信号量// 线程函数:使用信号量控制资源访问
void* th(void* arg) {sem_wait(&sem_WIN);  // P操作:申请资源(阻塞直至有资源)printf("get win...\n");sleep(rand() % 5 + 1);  // 模拟资源使用时间(1-5秒)printf("release win...\n");sem_post(&sem_WIN);  // V操作:释放资源return NULL;
}int main(int argc, char** argv) {pthread_t tid[10] = {0};srand(time(NULL));  // 初始化随机种子sem_init(&sem_WIN, 0, 3);  // 初始化信号量=3(3个可用资源)// 创建10个线程for (int i = 0; i < 10; i++) {pthread_create(&tid[i], NULL, th, NULL);}// 等待所有线程结束for (int i = 0; i < 10; i++) {pthread_join(tid[i], NULL);}sem_destroy(&sem_WIN);  // 销毁信号量return 0;
}

代码运行理想结果

get win...
get win...
get win...
release win...
get win...
release win...
release win...
get win...
...
release win...

注:任意时刻最多3个"get win"连续出现,资源释放后立即有新线程获取


综合应用:任务队列调度系统

顺序队列实现(头文件)

#ifndef __SEQQUE__H__
#define __SEQQUE__H__// 任务数据类型
typedef struct {char task_name[50];  // 任务名称int task_time;       // 任务执行时间(秒)
} DATATYPE;// 顺序队列结构
typedef struct {DATATYPE* array;  // 任务数组int head;         // 队头索引int tail;         // 队尾索引int tlen;         // 队列总长度
} SeqQue;// 队列操作函数声明
SeqQue* CreateSeqQue(int len);          // 创建队列
int EnterSeqQue(SeqQue* sq, DATATYPE* data);  // 入队
int QuitSeqQue(SeqQue* sq);             // 出队
DATATYPE* GetHeadSeqQue(SeqQue* sq);    // 获取队头
int IsEmptySeqQue(SeqQue* sq);          // 队空判断
int IsFullSeqQue(SeqQue* sq);           // 队满判断
int DestroySeqQue(SeqQue* sq);          // 销毁队列#endif  // !__SEQQUE__H__

顺序队列实现(源文件)

#include "seqque.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>// 创建指定长度的队列
SeqQue* CreateSeqQue(int len) {SeqQue* sq = malloc(sizeof(SeqQue));  // 分配队列结构内存if (NULL == sq) {perror("CreateSeqQue malloc");return NULL;}sq->array = malloc(sizeof(DATATYPE) * len);  // 分配任务数组内存if (NULL == sq->array) {perror("CreateSeqQue malloc2");free(sq);return NULL;}sq->head = 0;     // 初始化队头sq->tail = 0;     // 初始化队尾sq->tlen = len;   // 设置队列长度return sq;
}// 任务入队
int EnterSeqQue(SeqQue* sq, DATATYPE* data) {if (IsFullSeqQue(sq)) {  // 检查队列是否已满printf("queue is full\n");return 1;}memcpy(&sq->array[sq->tail], data, sizeof(DATATYPE));  // 复制任务数据sq->tail = (sq->tail + 1) % sq->tlen;  // 循环队列:队尾后移return 0;
}// 任务出队
int QuitSeqQue(SeqQue* sq) {if (IsEmptySeqQue(sq)) {  // 检查队列是否为空printf("queue is empty\n");return 1;}sq->head = (sq->head + 1) % sq->tlen;  // 循环队列:队头后移return 0;
}// 获取队头任务
DATATYPE* GetHeadSeqQue(SeqQue* sq) {return &sq->array[sq->head];  // 返回队头指针
}// 队列空判断
int IsEmptySeqQue(SeqQue* sq) {return sq->head == sq->tail;  // 头尾指针重合表示空队列
}// 队列满判断
int IsFullSeqQue(SeqQue* sq) {// 循环队列:队尾+1模长度等于队头表示满return (sq->tail + 1) % sq->tlen == sq->head;
}// 销毁队列
int DestroySeqQue(SeqQue* sq) {free(sq->array);  // 释放任务数组free(sq);         // 释放队列结构return 0;
}

任务调度系统

在这里插入图片描述

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "seqque.h"sem_t sem_task;         // 任务信号量
pthread_mutex_t mutex;  // 队列互斥锁// 预定义任务列表
DATATYPE data[] = {{"cooking", 5},      // 烹饪任务(5秒){"washing", 9},      // 洗涤任务(9秒){"dohomewoing", 3},  // 作业任务(3秒){"over", 3},         // 结束任务
};// 任务选择界面
int show_task() {int i = 0;for (i = 0; i < 4; i++) {printf("%d %s\n", i, data[i].task_name);  // 显示任务列表}char tmp[5] = {0};fgets(tmp, sizeof(tmp), stdin);  // 读取用户输入int num = atoi(tmp);  // 转换为整数return num;
}// 工作线程:处理任务队列
void* th(void* arg) {SeqQue* sq = (SeqQue*)arg;  // 获取队列指针DATATYPE task;while (1) {bzero(&task, sizeof(task));  // 清空任务结构sem_wait(&sem_task);  // P操作:等待任务信号量pthread_mutex_lock(&mutex);  // 加锁保护队列DATATYPE* tmp = GetHeadSeqQue(sq);  // 获取队头任务memcpy(&task, tmp, sizeof(DATATYPE));  // 复制任务数据QuitSeqQue(sq);  // 任务出队pthread_mutex_unlock(&mutex);  // 解锁// 检查结束任务if (0 == strcmp(task.task_name, "over")) {break;}// 执行任务(每秒打印一次)while (task.task_time--) {printf("i'm proccessing %s \n", task.task_name);sleep(1);}}return NULL;
}int main(int argc, char** argv) {pthread_t tid;SeqQue* sq = CreateSeqQue(10);  // 创建任务队列sem_init(&sem_task, 0, 0);      // 初始化任务信号量=0pthread_mutex_init(&mutex, NULL);  // 初始化队列互斥锁pthread_create(&tid, NULL, th, sq);  // 创建工作线程// 主线程:接收用户任务输入while (1) {int num = show_task();  // 显示任务菜单if (num < 0 || num > 3) {  // 无效输入处理continue;}pthread_mutex_lock(&mutex);  // 加锁保护队列EnterSeqQue(sq, &data[num]);  // 任务入队pthread_mutex_unlock(&mutex);  // 解锁sem_post(&sem_task);  // V操作:增加任务信号量// 检查结束任务if (3 == num) {break;}}// 清理资源pthread_join(tid, NULL);sem_destroy(&sem_task);pthread_mutex_destroy(&mutex);DestroySeqQue(sq);return 0;
}

代码运行理想结果

0 cooking
1 washing
2 dohomewoing
3 over
0
i'm proccessing cooking 
i'm proccessing cooking 
i'm proccessing cooking 
i'm proccessing cooking 
i'm proccessing cooking 
0 cooking
1 washing
2 dohomewoing
3 over
1
i'm proccessing washing 
i'm proccessing washing 
...(持续9秒)...
0 cooking
1 washing
2 dohomewoing
3 over
3

注:用户选择任务后,工作线程按队列顺序执行任务,每个任务按预设时间逐秒输出执行状态


核心概念回顾

线程控制机制对比

特性互斥锁 (Mutex)信号量 (Semaphore)
核心目的保证临界资源的排他性访问实现有顺序要求的资源访问
资源控制仅保证单一资源访问权可管理多个资源实例(计数信号量)
操作主体同一线程必须完成加锁/解锁申请§和释放(V)可由不同线程执行
典型场景保护共享数据结构(如计数器)控制执行顺序(如生产者-消费者)
资源数量本质是二值信号量(0/1)支持多值(0-N)
原子区域应尽可能短小可包含一定延迟操作

互斥与同步的本质区别

  1. 顺序要求:同步机制强制执行顺序(如A必须在B前),互斥仅保证单一访问
  2. 操作主体:互斥锁要求同一线程加锁/解锁;信号量中P/V操作可由不同线程执行
  3. 资源管理
    • 二值信号量用于同步:申请和释放由不同线程完成
    • 计数信号量用于互斥:申请和释放由同一线程完成

死锁产生的四个必要条件

  1. 互斥条件:资源一次仅能被一个线程使用
  2. 请求与保持条件:线程阻塞时仍持有已获取资源
  3. 不剥夺条件:已分配资源不能被系统强行回收
  4. 循环等待条件:线程间形成环形资源等待链

关键提示:避免死锁需破坏至少一个必要条件,常见策略包括资源有序分配、超时重试机制等。在实际编程中,应确保锁的获取顺序一致,并限制临界区执行时间。

http://www.dtcms.com/a/354505.html

相关文章:

  • ArcGIS学习-12 实战-综合案例
  • Unity游戏打包——iOS打包pod的重装和使用
  • Flutter:ios打包ipa,证书申请,Xcode打包,完整流程
  • Intern-S1-mini模型结构
  • SpringBoot系列之实现高效批量写入数据
  • 专项智能练习(图形图像基础)
  • 文本处理与模型对比:BERT, Prompt, Regex, TF-IDF
  • 高精度惯性导航IMU价格与供应商
  • [sys-BlueChi] docs | BluechiCtl命令行工具
  • 【C#/Cpp】CLR项目搭建的内联和托管两选项
  • IPv4和IPv6的主要区别,以及常见的过渡策略有哪些
  • OpenCV计算机视觉实战(22)——图像拼接详解
  • 机器视觉学习-day07-图像镜像旋转
  • 【开题答辩全过程】以 基于Spring Boot农产品运输服务平台为例,包含答辩的问题和答案
  • MapStruct用法和实践
  • 【笔记ing】大模型算法架构
  • android studio 同步慢问题解决
  • Logstash数据迁移之mysql-to-kafka.conf两种路由决策对比
  • WebRTC音频QoS方法五(音频变速算法之Accelerate、FastAccelerate、PreemptiveExpand算法实现)
  • Kafka、RabbitMQ 与 RocketMQ 在高并发场景下的高可用与性能对比分析
  • 游戏使用云手机在线运行怎么样?
  • 小白成长之路-k8s原理(二)
  • 【在 macOS 系统上使用 Docker 启动 Kafka 的完整指南】
  • 点评项目(Redis中间件)第二部分Redis基础
  • ArtCAM 2008安装教程
  • React 业务场景使用相关封装(hooks 使用)
  • 【AI自动化】VSCode+Playwright+codegen+nodejs自动化脚本生成
  • Git 删除文件
  • WINTRUST!_ExplodeMessag函数中的pCatAdd
  • 【大前端】React useEffect 详解:从入门到进阶