【学习记录】c完整线程池实现
前言
c实现线程池,初步测试无误,采用双端队列,和任务偷取,任务优先加入自身队列。为了避免极端情况下,某一个线程的两队列中任务过于多,而空闲线程又无事可做。加入了空闲队列和任务偷取,提交任务的时候不仅唤醒自身,还会唤醒一个空闲线程,让空闲线程尝试取偷取高负载的线程任务。
正常情况,提交任务是会平均的分配给几个线程队列,目前我能想到的是同时初始化很多线程,线程中提交任务,提交完之后关闭。这样确实有可能导致任务积压到一个线程里面(不过无需担心可以有任务偷取缓解)。
如果错误欢迎斧正
头文件
//
// Created by Administrator on 2025/8/22.
//#ifndef THREADPOOL_H
#define THREADPOOL_H
#ifndef MAXTHREADNUM
#define MAXTHREADNUM 64
#endif// 任务函数类型
typedef void *(*task_func_t)(void *);// 前向声明
typedef struct thread_pool thread_pool_t;// 任务结构
typedef struct {task_func_t func;void *arg;
} task_t;// 本地任务队列(双端队列)
typedef struct {task_t *queue;int capacity;int head; // 从 head 取任务(本地线程用)int tail; // 从 tail 放任务(本地线程用)pthread_mutex_t lock; // 窃取时用pthread_cond_t notify; // 窃取通知
} local_queue_t;// 线程池
struct thread_pool {local_queue_t *queues; // 每个线程的本地队列int num_workers;pthread_t *threads;int shutdown;// 空闲线程管理int idle_workers[MAXTHREADNUM]; // 存储空闲 worker 的索引(最大支持 MAXTHREADNUM 个线程)int idle_count; // 当前空闲线程数量pthread_mutex_t idle_lock; // 保护 idle_workers 和 idle_count
};typedef struct {int index;thread_pool_t * pool;
}thread_pool_c;// 初始化线程池
thread_pool_t* thread_pool_init(int num_workers);
// 提交任务放入某个线程的本地队列
int thread_pool_submit(thread_pool_t *pool, task_func_t func, void *arg);
// 销毁线程池
void thread_pool_destroy(thread_pool_t *pool);
#endif //THREADPOOL_H
源文件
//
// Created by Administrator on 2025/8/22.
//
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include "threadpool.h"// === 双端队列操作 ===// 队列是否为空
static int queue_empty(local_queue_t *q) {return q->head == q->tail;
}// 队列是否满
static int queue_full(local_queue_t *q) {return (q->tail + 1) % q->capacity == q->head;
}// 非线程安全,仅用于本线程查询自己的队列(由本线程调用)
int local_queue_size_fast(local_queue_t *q) {if (!q) return -1;int size = (q->tail - q->head + q->capacity) % q->capacity;return size;
}// 向本地队列尾部添加任务(由本线程调用)
static int queue_push_tail(local_queue_t *q, task_t task) {if (queue_full(q)) return -1;q->queue[q->tail] = task;q->tail = (q->tail + 1) % q->capacity;return 0;
}// 从本地队列头部获取任务(由本线程调用)
static int queue_pop_head(local_queue_t *q, task_t *task) {if (queue_empty(q)) return -1;*task = q->queue[q->head];q->head = (q->head + 1) % q->capacity;return 0;
}// 从本地队列尾部窃取任务(由其他线程调用)
static int queue_steal(local_queue_t *q, task_t *task) {pthread_mutex_lock(&q->lock);if (queue_empty(q)) {pthread_mutex_unlock(&q->lock);return -1;}// 从 tail-1 处窃取(队尾)int tail = q->tail;if (tail == q->head) {pthread_mutex_unlock(&q->lock);return -1;}// *task = q->queue[(tail - 1) % q->capacity];// q->tail = (tail - 1) % q->capacity;int new_tail = (tail - 1 + q->capacity) % q->capacity;*task = q->queue[new_tail];q->tail = new_tail;// 唤醒可能等待的线程pthread_cond_signal(&q->notify);pthread_mutex_unlock(&q->lock);return 0;
}// === 线程池实现 ===// 工作者线程主函数
void* worker_routine(void *arg) {thread_pool_c *poolc = (thread_pool_c*)arg;thread_pool_t *pool = poolc->pool;int self_id = poolc->index; //自身的序号free(poolc);int num_tries = 0;const int max_tries = pool->num_workers * 2;// printf("线程%d启动成功 id=%lu\n", self_id,pthread_self());while (!pool->shutdown) {task_t task;int executed = 0;// printf("线程[%d] 非空等待任务剩余 %d... \n", self_id,local_queue_size_fast(&pool->queues[self_id]));// 1. 先尝试执行自己的任务if (queue_pop_head(&pool->queues[self_id], &task) == 0) {// printf("线程[%d] 开始执行自身任务 %s \n", self_id, (char*)task.arg);task.func(task.arg);executed = 1;}// 2. 自己的空了,去偷别人的else {for (int i = 0; i < pool->num_workers && !executed && num_tries < max_tries; i++) {int thief_id = (self_id + i + 1) % pool->num_workers;if (thief_id == self_id) continue;if (queue_steal(&pool->queues[thief_id], &task) == 0) {// printf("线程[%d] 开始执行他人[%d]任务 %s \n", self_id,thief_id, (char*)task.arg);task.func(task.arg);executed = 1;}num_tries++;}}// 3. 都没有任务,等待if (!executed) {// === 注册自己为“空闲” ===pthread_mutex_lock(&pool->idle_lock);if (!pool->shutdown && pool->idle_count < MAXTHREADNUM) { // MAXTHREADNUM 是上限pool->idle_workers[pool->idle_count++] = self_id;}pthread_mutex_unlock(&pool->idle_lock);pthread_mutex_lock(&pool->queues[self_id].lock);// 再次检查,避免遗漏if (queue_empty(&pool->queues[self_id]) && !pool->shutdown) {// struct timespec ts;// clock_gettime(CLOCK_REALTIME, &ts);// ts.tv_sec += 1; // 每1秒醒来一次检查pthread_cond_wait(&pool->queues[self_id].notify, &pool->queues[self_id].lock);// pthread_cond_timedwait(&pool->queues[self_id].notify, &pool->queues[self_id].lock, &ts);}pthread_mutex_unlock(&pool->queues[self_id].lock);}if(executed){num_tries = 0;}}return NULL;
}// 初始化线程池
thread_pool_t* thread_pool_init(int num_workers) {if (num_workers <= 0) return NULL;thread_pool_t *pool = calloc(1, sizeof(thread_pool_t));if (!pool) goto fail;pool->num_workers = num_workers;pool->threads = calloc(num_workers, sizeof(pthread_t));pool->queues = calloc(num_workers, sizeof(local_queue_t));if (!pool->threads || !pool->queues) {goto fail;}if (pthread_mutex_init(&pool->idle_lock, NULL) != 0) {goto fail;}pool->idle_count = 0;// 初始化每个本地队列for (int i = 0; i < num_workers; i++) {local_queue_t *q = &pool->queues[i];q->capacity = MAXTHREADNUM;q->queue = calloc(q->capacity, sizeof(task_t));q->head = q->tail = 0;if (pthread_mutex_init(&q->lock, NULL) != 0 ||pthread_cond_init(&q->notify, NULL) != 0 ||!q->queue) {goto fail;}}pool->shutdown = 0;// 创建线程for (int i = 0; i < num_workers; i++) {thread_pool_c *pool_c = calloc(1,sizeof(thread_pool_c));if(!pool_c) {pool->shutdown = 1;for (int j = 0; j < i; j++) {pthread_cond_signal(&pool->queues[j].notify); // 唤醒等待中的线程}for (int j = 0; j < i; j++) {pthread_join(pool->threads[j], NULL);}goto fail;}pool_c->pool = pool;pool_c->index = i;if (pthread_create(&pool->threads[i], NULL, worker_routine, pool_c) != 0) {pool->shutdown = 1;for (int j = 0; j < i; j++) {pthread_cond_signal(&pool->queues[j].notify); // 唤醒等待中的线程}for (int j = 0; j < i; j++) {pthread_join(pool->threads[j], NULL);}goto fail;}}return pool;fail:// if(pool_c) free(pool_c);if (pool) {for (int i = 0; i < num_workers; i++) {if (pool->queues && pool->queues[i].queue) free(pool->queues[i].queue);pthread_mutex_destroy(&pool->queues[i].lock);pthread_cond_destroy(&pool->queues[i].notify);}pthread_mutex_destroy(&pool->idle_lock);free(pool->queues);free(pool->threads);free(pool);}return NULL;
}// 提交任务放入某个线程的本地队列
int thread_pool_submit(thread_pool_t *pool, task_func_t func, void *arg) {if (!pool || !func) return -1;task_t task = { .func = func, .arg = arg };// 简单轮询分配static __thread int last_idx = 0; // 每个线程有自己的 last_idxint start = last_idx;int idx = start;do {local_queue_t *q = &pool->queues[idx];pthread_mutex_lock(&q->lock);if (queue_push_tail(&pool->queues[idx], task) == 0) {last_idx = (idx + 1) % pool->num_workers;// 唤醒目标线程(如果它在等)pthread_cond_signal(&pool->queues[idx].notify);pthread_mutex_unlock(&q->lock);// === 尝试唤醒一个空闲线程 ===pthread_mutex_lock(&pool->idle_lock);if (pool->idle_count > 0) {pool->idle_count-- ; //例如 1的时候应该取 0int target_id = pool->idle_workers[pool->idle_count];// 唤醒那个空闲线程,让它去偷任务pthread_cond_signal(&pool->queues[target_id].notify);}pthread_mutex_unlock(&pool->idle_lock);// printf("任务 %s 放入队列[%d]成功\n", (char*)arg, idx);return 0;}else{// printf("任务%d 队列已满\n", idx);}pthread_mutex_unlock(&q->lock);idx = (idx + 1) % pool->num_workers;} while (idx != start);// printf("Task rejected: all local queues are full\n");return -1;
}// 销毁线程池
void thread_pool_destroy(thread_pool_t *pool) {if (!pool) return;pool->shutdown = 1;// 唤醒所有线程for (int i = 0; i < pool->num_workers; i++) {pthread_cond_broadcast(&pool->queues[i].notify);}for (int i = 0; i < pool->num_workers; i++) {pthread_join(pool->threads[i], NULL);}// 清理资源for (int i = 0; i < pool->num_workers; i++) {free(pool->queues[i].queue);pthread_mutex_destroy(&pool->queues[i].lock);pthread_cond_destroy(&pool->queues[i].notify);}free(pool->queues);free(pool->threads);free(pool);
}// void* test_task(void *arg) {
// printf("Task executed by thread %lu: %s\n", pthread_self(), (char*)arg);
// free(arg);
// usleep(100000);
// return NULL;
// }
// int main() {
// thread_pool_t *pool = thread_pool_init(4);
// if (!pool) return 1;
// sleep(1);
// for (int i = 0; i < 500; i++) {
// char *msg = malloc(32);
// sprintf(msg, "Task %d",i);
// int ret = thread_pool_submit(pool, test_task, msg);
// // if (ret!=0){
// // printf("任务%d提交失败\n", i);
// // }else{
// // printf("任务%d提交成功\n", i);
// // }
// }
//
// sleep(30);
// thread_pool_destroy(pool);
// return 0;
// }