线程池111
线程池框图
C语言线程池详解:从基础到实现
通俗理解线程池
想象你开了一家快递站,每天要处理很多包裹派送:
- 没有线程池:每来一个包裹就雇一个新快递员,送完就解雇
- 问题:频繁招聘解雇成本高(线程创建销毁开销大)
- 有线程池:固定雇佣5个快递员,包裹来了就分配给空闲的快递员
- 优点:快递员复用,效率高,管理方便
线程池就是这样的"快递员管理系统",预先创建一组线程,有任务时分配给空闲线程执行。
完整C语言线程池实现(带详细注释)
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#define THREAD_NUM 5 // 线程池中线程数量// 任务结构体(相当于快递站的"包裹")
typedef struct {void (*function)(void *); // 任务函数指针void *arg; // 函数参数
} Task;// 线程池结构体(相当于"快递站")
typedef struct {Task *task_queue; // 任务队列(存放待处理的任务)int queue_capacity; // 队列容量int queue_size; // 当前队列中任务数量int queue_front; // 队首索引int queue_rear; // 队尾索引pthread_t *threads; // 工作线程数组(相当于"快递员")pthread_mutex_t mutex; // 互斥锁,保护任务队列pthread_cond_t cond; // 条件变量,线程等待信号int shutdown; // 线程池关闭标志
} ThreadPool;// 创建线程池
ThreadPool* thread_pool_create(int capacity) {ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));// 初始化任务队列pool->queue_capacity = capacity;pool->task_queue = (Task *)malloc(sizeof(Task) * capacity);pool->queue_size = 0;pool->queue_front = 0;pool->queue_rear = 0;// 初始化线程数组pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * THREAD_NUM);// 初始化互斥锁和条件变量pthread_mutex_init(&pool->mutex, NULL);pthread_cond_init(&pool->cond, NULL);// 设置线程池运行状态pool->shutdown = 0;// 创建工作线程for (int i = 0; i < THREAD_NUM; i++) {pthread_create(&pool->threads[i], NULL, worker, (void *)pool);}return pool;
}// 工作线程函数(相当于"快递员的工作流程")
void* worker(void* arg) {ThreadPool *pool = (ThreadPool *)arg;while (1) {pthread_mutex_lock(&pool->mutex); // 加锁保护任务队列// 当任务队列为空且线程池未关闭时,线程等待while (pool->queue_size == 0 && !pool->shutdown) {pthread_cond_wait(&pool->cond, &pool->mutex); // 等待条件变量}// 如果线程池已关闭且任务已处理完,线程退出if (pool->shutdown && pool->queue_size == 0) {pthread_mutex_unlock(&pool->mutex);pthread_exit(NULL);}// 从任务队列中取出一个任务Task task;task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;// 更新队列状态pool->queue_front = (pool->queue_front + 1) % pool->queue_capacity;pool->queue_size--;pthread_mutex_unlock(&pool->mutex); // 解锁// 执行任务(快递员开始派送包裹)printf("Thread %lu start working...\n", pthread_self());task.function(task.arg); // 调用任务函数printf("Thread %lu finish work!\n", pthread_self());}return NULL;
}// 添加任务到线程池
void thread_pool_add(ThreadPool *pool, void (*func)(void *), void *arg) {pthread_mutex_lock(&pool->mutex); // 加锁保护任务队列// 如果队列已满,等待(这里简单处理,实际可以扩容或返回错误)while (pool->queue_size == pool->queue_capacity && !pool->shutdown) {pthread_cond_wait(&pool->cond, &pool->mutex);}if (pool->shutdown) {pthread_mutex_unlock(&pool->mutex);return;}// 添加任务到队尾pool->task_queue[pool->queue_rear].function = func;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_capacity;pool->queue_size++;pthread_cond_signal(&pool->cond); // 唤醒一个等待的线程pthread_mutex_unlock(&pool->mutex); // 解锁
}// 销毁线程池
void thread_pool_destroy(ThreadPool *pool) {if (pool == NULL) return;// 设置关闭标志pool->shutdown = 1;// 唤醒所有等待的线程pthread_cond_broadcast(&pool->cond);// 等待所有线程退出for (int i = 0; i < THREAD_NUM; i++) {pthread_join(pool->threads[i], NULL);}// 释放资源free(pool->threads);free(pool->task_queue);pthread_mutex_destroy(&pool->mutex);pthread_cond_destroy(&pool->cond);free(pool);
}// 示例任务函数1
void task_function1(void *arg) {int num = *(int *)arg;printf("Task1 processing number: %d\n", num);sleep(1); // 模拟耗时操作
}// 示例任务函数2
void task_function2(void *arg) {char *str = (char *)arg;printf("Task2 processing string: %s\n", str);sleep(2); // 模拟耗时操作
}int main() {// 创建线程池,任务队列容量为10ThreadPool *pool = thread_pool_create(10);// 添加任务到线程池for (int i = 0; i < 10; i++) {int *num = (int *)malloc(sizeof(int));*num = i;thread_pool_add(pool, task_function1, (void *)num);}// 添加不同类型的任务char *str = "Hello ThreadPool";thread_pool_add(pool, task_function2, (void *)str);// 等待所有任务完成sleep(5);// 销毁线程池thread_pool_destroy(pool);return 0;
}
关键组件详细解释
1. 任务队列(Task Queue)
typedef struct {void (*function)(void *); // 函数指针void *arg; // 函数参数
} Task;
- 作用:存储待执行的任务
- 原理:使用环形队列实现,避免频繁内存分配
- 操作:
- 队尾添加任务
- 队首取出任务
2. 线程池管理(ThreadPool)
typedef struct {Task *task_queue; // 任务队列int queue_capacity; // 队列容量int queue_size; // 当前任务数// ...其他成员
} ThreadPool;
- 作用:管理线程和任务队列
- 关键成员:
threads
:工作线程数组mutex
:保护任务队列的互斥锁cond
:线程间通信的条件变量
3. 工作线程(Worker Thread)
void* worker(void* arg) {while (1) {// 1. 加锁并检查任务队列// 2. 无任务时等待条件变量// 3. 取出任务并执行// 4. 解锁}
}
- 工作流程:
- 检查任务队列
- 无任务则等待
- 有任务则取出执行
- 循环处理
4. 任务添加(thread_pool_add)
void thread_pool_add(ThreadPool *pool, void (*func)(void *), void *arg) {// 1. 加锁// 2. 检查队列状态// 3. 添加任务到队列// 4. 唤醒一个等待线程// 5. 解锁
}
- 作用:向线程池提交新任务
- 关键点:
- 队列满时等待
- 添加后唤醒工作线程
5. 线程池销毁(thread_pool_destroy)
void thread_pool_destroy(ThreadPool *pool) {// 1. 设置关闭标志// 2. 唤醒所有线程// 3. 等待线程退出// 4. 释放资源
}
- 作用:安全关闭线程池
- 关键点:
- 先通知所有线程
- 等待线程自然退出
线程池工作流程图
主线程:[创建线程池] → [添加任务] → [销毁线程池]| |↓ ↓
任务队列: [任务1][任务2][任务3]...|↓
工作线程:[线程1取任务] → [执行] → [取下一个任务][线程2取任务] → [执行] → [取下一个任务]...
为什么需要这些机制?
互斥锁(mutex):
- 防止多个线程同时访问任务队列导致数据混乱
条件变量(cond):
- 当任务队列为空时,让工作线程休眠等待
- 当有新任务时唤醒线程,避免忙等待
环形队列:
- 高效利用内存,避免频繁内存分配
- 先进先出(FIFO)的任务处理顺序
线程复用:
- 避免频繁创建销毁线程的开销
- 控制并发数量,防止系统过载
实际应用场景
- 网络服务器:处理大量客户端请求
- 文件处理:批量处理大量文件
- 数据计算:并行计算任务
- GUI应用:后台耗时任务处理
扩展改进建议
- 动态调整线程数:根据负载自动增减线程
- 任务优先级:支持高优先级任务插队
- 任务取消:支持取消已提交但未执行的任务
- 超时机制:设置任务执行超时时间
- 任务结果获取:提供获取任务执行结果的机制