Linux系统编程 | 线程池
1、线程池的基础
线程池(Thread Pool)是一种多线程处理模式,是一种管理线程的机制,通过预先创建一组可复用的线程来执行任务,旨在优化资源利用、提升响应速度并降低系统开销。
一个进程中的线程就好比是一家公司里的员工,员工的数目应该根据公司的业务多少来定,太少了忙不过来,但是太多了也浪费资源,最理想的情况是:让进程有一些初始数目的线程(所谓的线程池),当没有任务的时候这些线程自动进入睡眠,有了任务他们会立即执行任务,不断循环。进程还应该可以根据自身任务的繁重与否来增删线程的数目,当所有的任务都完成了之后,所有的线程还能妥当地收官走人,不带走一片云彩。
线程池中有这么几点:
(1)、任务队列中刚开始没有任何任务,是一个具有头结点的空链队列。
(2)、使用互斥锁来保护这个队列。
(3)、使用条件变量来代表任务队列中的任务个数的变化——将来如果主线程往队列中投放任务,那么可以通过条件变量来唤醒那些睡着了的线程。
(4)、通过一个公共开关——shutdown,来控制线程退出,进而销毁整个线程池。如果有更好的 idea,可以扩展该设计,但就目前而言,一个相互协作的多线程组织已经初具雏形。
2、线程池API
(1)、初始化线程池
初始化线程池,包括互斥锁、条件变量、任务链表、线程ID数组,并创建指定数量的线程。
/* pool:线程池结构体的地址,threads_number:一开始初始化的线程数量 */
bool init_pool(thread_pool *pool, unsigned int threads_number)
{pthread_mutex_init(&pool->lock, NULL); //初始化互斥锁 pthread_cond_init(&pool->cond, NULL); //初始化条件变量 pool->shutdown = false; //开启线程池的标志false:开 pool->task_list = malloc(sizeof(struct task)); //申请任务链表头 pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//申请最大线程数量的ID内存 if(pool->task_list == NULL || pool->tids == NULL) //判断两个申请的内存是否成功 {perror("allocate memory error");return false;}pool->task_list->next = NULL; //初始化链表头,将下一个节点指向NULL pool->waiting_tasks = 0; //将等待运行的任务数量置0 pool->active_threads = threads_number; //登记当前的线程数量 int i;for(i=0; i<pool->active_threads; i++) //创建线程池里面的线程 {if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0) //每一个线程都去跑routine这个函数的内容{perror("create threads error");return false;}}return true;
}
程序代码分析:
①、同步机制初始化: 初始化互斥锁 lock和条件变量 cond。
②、线程池状态设置: 设置 shutdown为 false,表示线程池处于运行状态。 分配任务链表的头节点 task_list,并将其 next指针置为 NULL,表示初始时没有任务。
③、线程ID数组分配: 分配一个大小为 MAX_ACTIVE_THREADS的线程ID数组 tids。
④、内存分配检查: 如果 task_list或 tids分配失败,输出错误信息并返回 false。
⑤、创建线程: 循环创建指定数量 (threads_number) 的线程,每个线程执行 routine函数,并将线程池地址作为参数传递。
(2)、增加线程池线程数量
动态增加线程池中的线程数量。
/* 添加线程到线程池中 pool:线程池结构体地址, additional_threads:添加的线程的数量 */
int add_thread(thread_pool *pool, unsigned additional_threads)
{if(additional_threads == 0)return 0;unsigned total_threads =pool->active_threads + additional_threads; //将总数记录在这个变量中int i, actual_increment = 0;for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++){if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0) //新建线程{perror("add threads error");if(actual_increment == 0)return -1;break;}actual_increment++; //记录成功添加了多少条线程到线程池中}pool->active_threads += actual_increment; //将最后成功添加到线程池的线程总数记录线程池中return actual_increment; //返回新建了多少条线程
}
程序代码分析:
①、参数检查: 如果 additional_threads为 0,直接返回 0,表示没有新增线程。
②、计算总线程数: 计算新增后的总线程数 total_threads。
③、循环创建线程: 从当前活跃线程数 pool->active_threads开始,尝试创建新的线程,直到达到 total_threads或 MAX_ACTIVE_THREADS的限制。 如果某一线程创建失败: 如果尚未成功创建任何线程 (actual_increment == 0),返回 -1表示失败。 否则,停止创建更多线程。
④、更新活跃线程数: 将成功创建的线程数 actual_increment加到 pool->active_threads上。
⑤、返回实际新增的线程数。
(3)、减少线程池线程数量
动态减少线程池中的线程数量。
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{if(removing_threads == 0)return pool->active_threads;int remain_threads = pool->active_threads - removing_threads; //将移除线程之后的线程数量登记下来remain_threads = remain_threads>0 ? remain_threads:1; //如果这个数量不大于0,则把它置1int i;for(i=pool->active_threads-1; i>remain_threads-1; i--) //从id的最后一位线程开始取消{errno = pthread_cancel(pool->tids[i]);if(errno != 0)break;}if(i == pool->active_threads-1) //判断是否取消掉要求的数量return -1;elsepool->active_threads = i+1; //将新的线程数量登记active_threadsreturn pool->active_threads; //返回剩下多少条线程在线程池中
}
程序代码分析:
①、参数检查: 如果 removing_threads为 0,直接返回当前活跃线程数。
②、计算剩余线程数: 计算移除指定数量线程后的剩余线程数 remain_threads,确保其不小于 1(即至少保留一个线程)。
③、取消线程: 从最后一个活跃线程开始,向前遍历,尝试使用 pthread_cancel取消指定数量的线程。 如果 pthread_cancel返回非零值(表示取消失败),则停止取消更多线程。
④、更新活跃线程数: 如果成功取消指定数量的线程,更新 pool->active_threads为 i+1(因为循环从 pool->active_threads-1开始,到 remain_threads-1结束)。如果未能取消任何线程(即 i未改变),返回 -1表示失败。
⑤、返回剩余线程数。
(4)、添加新任务到线程池
向线程池的任务队列中添加新任务。
/* 投放任务:pool:线程池地址;task:任务需要运行的内容的函数指针; arg:传入给task函数的参数 */
bool add_task(thread_pool *pool,void *(*task)(void *arg), void *arg)
{struct task *new_task = malloc(sizeof(struct task)); //新建一个任务节点if(new_task == NULL){perror("allocate memory error");return false;}new_task->task = task; //将任务需要做的函数存进task指针中new_task->arg = arg; //将任务函数参数记录在arg里面new_task->next = NULL; //将任务节点的下一个位置指向NULLpthread_mutex_lock(&pool->lock); //上锁if(pool->waiting_tasks >= MAX_WAITING_TASKS) //判断任务数量有没有超标{pthread_mutex_unlock(&pool->lock); //解锁fprintf(stderr, "too many tasks.\n"); //反馈太多任务了free(new_task); //释放掉刚才登记的任务节点return false; //返回添加不了任务到任务链表中}struct task *tmp = pool->task_list; //将线程池中任务链表的头节点登记到tmpwhile(tmp->next != NULL) //将tmp指向最后的节点的位置tmp = tmp->next;tmp->next = new_task; //将新建的任务节点插入到链表中pool->waiting_tasks++; //将等待的任务数量+1pthread_mutex_unlock(&pool->lock); //解锁pthread_cond_signal(&pool->cond); //唤醒正在睡眠中的线程return true; //返回添加成功
}
程序代码分析:
①、任务节点分配: 分配一个新的任务节点 new_task,并初始化其 task、arg和 next指针。
②、加锁保护共享资源: 使用互斥锁 lock保护对任务链表和 waiting_tasks的访问。
③、任务队列满检查: 如果当前等待的任务数量 waiting_tasks已达到 MAX_WAITING_TASKS,则解锁、输出错误信息、释放任务节点内存,并返回 false。
④、插入任务节点: 遍历任务链表,找到链表的末尾,将新任务节点插入到链表尾部。 增加 waiting_tasks计数。
⑤、解锁并通知线程: 解锁互斥锁,使用 pthread_cond_signal唤醒一个正在等待的线程来处理新任务。
(5)、销毁线程池
销毁线程池,包括等待所有线程结束、释放资源。
bool destroy_pool(thread_pool *pool)
{pool->shutdown = true; //使能线程池的退出开关pthread_cond_broadcast(&pool->cond); //将所有的线程全部唤醒int i;for(i=0; i<pool->active_threads; i++) //开始接合线程{errno = pthread_join(pool->tids[i], NULL);if(errno != 0){printf("join tids[%d] error: %s\n",i, strerror(errno));}elseprintf("[%u] is joined\n", (unsigned)pool->tids[i]);}free(pool->task_list); //释放掉任务头节点free(pool->tids); //释放掉线程ID内存return true;
}
程序代码分析:
①、设置关闭标志: 将 pool->shutdown设置为 true,通知所有线程线程池即将关闭。
②、唤醒所有等待线程: 使用 pthread_cond_broadcast唤醒所有正在等待条件变量的线程,使它们退出等待状态并检查关闭标志。
③、等待线程结束: 循环调用 pthread_join等待每一个活跃线程结束。 如果 pthread_join失败,输出错误信息;否则,输出线程已加入的信息。
④、释放资源: 释放任务链表的头节点 task_list。 释放线程ID数组 tids。
⑤、返回成功。
(6)、线程的执行函数
线程的执行函数,负责从任务队列中取出任务并执行。
void *routine(void *arg)
{thread_pool *pool = (thread_pool *)arg; //将线程池的地址存放进去poolstruct task *p; //定义一个缓冲指针,后期任务队列遍历的时候使用while(1){pthread_cleanup_push(handler, (void *)&pool->lock); //提前登记线程被取消后需要处理的事情pthread_mutex_lock(&pool->lock); //由于需要操作线程池中的共有资源,所以加锁while(pool->waiting_tasks == 0 && !pool->shutdown) //判断是否没有需要运行的任务{pthread_cond_wait(&pool->cond, &pool->lock); //让线程睡眠}if(pool->waiting_tasks == 0 && pool->shutdown == true) //判断线程池是否没有任务并且需要关闭 {pthread_mutex_unlock(&pool->lock); //解锁pthread_exit(NULL); //退出线程 }p = pool->task_list->next; //让p登记需要运行的任务节点 pool->task_list->next = p->next; //将此任务节点从链表中删除 pool->waiting_tasks--; //将等待运行的任务队列-1 pthread_mutex_unlock(&pool->lock); //解锁 pthread_cleanup_pop(0); //解除登记取消线程之后所做的函数 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //忽略线程的取消操作(p->task)(p->arg); //函数调用 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); //重启线程取消操作free(p); //释放任务节点}pthread_exit(NULL);
}void handler(void *arg)
{pthread_mutex_unlock((pthread_mutex_t *)arg); //防止死锁,所以在这里添加解锁操作
}
程序代码分析:
①、线程循环: 线程在 while(1)循环中不断检查是否有任务需要执行。
②、同步与等待: 使用 pthread_mutex_lock保护共享资源(任务链表)。 当没有任务且线程池未关闭时,调用 pthread_cond_wait让线程进入等待状态,释放锁并挂起线程,直到被唤醒。
③、任务获取与执行: 从任务链表中取出一个任务节点,更新链表和任务计数。 在执行任务前,通过 pthread_setcancelstate禁止线程被取消,确保任务执行的原子性。 执行任务函数 p->task(p->arg)。 执行完毕后,重新允许线程被取消,并释放任务节点内存。
④、清理处理: 使用 pthread_cleanup_push和 pthread_cleanup_pop注册和注销清理函数 handler,以防止线程在取消时导致死锁(例如,在持有锁的情况下被取消)。
3、线程池完整代码
(1)、thread_pool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>#include <errno.h>
#include <pthread.h>#define MAX_WAITING_TASKS 1000 //最大的等待任务数量
#define MAX_ACTIVE_THREADS 20 //最大的线程数量struct task //任务链表结构体
{void *(*task)(void *arg); //任务做什么事情的函数指针void *arg; //传入函数的参数struct task *next; //链表的位置结构体指针
};typedef struct thread_pool
{pthread_mutex_t lock; //用于线程池同步互斥的互斥锁pthread_cond_t cond; //用于让线程池里面的线程睡眠的条件变量struct task *task_list; //线程池的执行任务链表pthread_t *tids; //线程池里面线程的ID登记处unsigned waiting_tasks; //等待的任务数量,也就是上面任务链表的长度unsigned active_threads; //当前已经创建的线程数bool shutdown; //线程池的开关
}thread_pool;bool
init_pool(thread_pool *pool,unsigned int threads_number); //初始化线程池bool
add_task(thread_pool *pool,void *(*task)(void *arg),void *arg); //往线程池里面添加任务节点int
add_thread(thread_pool *pool,unsigned int additional_threads_number); //添加线程池中线程的数量int
remove_thread(thread_pool *pool,unsigned int removing_threads_number); //移除线程bool destroy_pool(thread_pool *pool); //销毁线程池
void *routine(void *arg); //线程池里面线程的执行函数#endif
(2)、thread_pool.c
#include "thread_pool.h"void handler(void *arg)
{pthread_mutex_unlock((pthread_mutex_t *)arg); //防止死锁,所以在这里添加解锁操作
}/* 每一个线程池中的线程所执行的内容, arg就是线程池的地址 */
void *routine(void *arg)
{thread_pool *pool = (thread_pool *)arg; //将线程池的地址存放进去poolstruct task *p; //定义一个缓冲指针,后期任务队列遍历的时候使用while(1){pthread_cleanup_push(handler, (void *)&pool->lock); //提前登记线程被取消后需要处理的事情pthread_mutex_lock(&pool->lock); //由于需要操作线程池中的共有资源,所以加锁while(pool->waiting_tasks == 0 && !pool->shutdown) //判断是否没有需要运行的任务{pthread_cond_wait(&pool->cond, &pool->lock); //让线程睡眠}if(pool->waiting_tasks == 0 && pool->shutdown == true) //判断线程池是否没有任务并且需要关闭 {pthread_mutex_unlock(&pool->lock); //解锁pthread_exit(NULL); //退出线程 }p = pool->task_list->next; //让p登记需要运行的任务节点 pool->task_list->next = p->next; //将此任务节点从链表中删除 pool->waiting_tasks--; //将等待运行的任务队列-1 pthread_mutex_unlock(&pool->lock); //解锁 pthread_cleanup_pop(0); //解除登记取消线程之后所做的函数 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //忽略线程的取消操作(p->task)(p->arg); //函数调用 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); //重启线程取消操作free(p); //释放任务节点}pthread_exit(NULL);
}/* pool:线程池结构体的地址,threads_number:一开始初始化的线程数量 */
bool init_pool(thread_pool *pool, unsigned int threads_number)
{pthread_mutex_init(&pool->lock, NULL); //初始化互斥锁 pthread_cond_init(&pool->cond, NULL); //初始化条件变量 pool->shutdown = false; //开启线程池的标志false:开 pool->task_list = malloc(sizeof(struct task)); //申请任务链表头 pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//申请最大线程数量的ID内存 if(pool->task_list == NULL || pool->tids == NULL) //判断两个申请的内存是否成功 {perror("allocate memory error");return false;}pool->task_list->next = NULL; //初始化链表头,将下一个节点指向NULL pool->waiting_tasks = 0; //将等待运行的任务数量置0 pool->active_threads = threads_number; //登记当前的线程数量 int i;for(i=0; i<pool->active_threads; i++) //创建线程池里面的线程 {if(pthread_create(&((pool->tids)[i]), NULL,routine, (void *)pool) != 0) //每一个线程都去跑routine这个函数的内容{perror("create threads error");return false;}}return true;
}/* 投放任务:pool:线程池地址;task:任务需要运行的内容的函数指针; arg:传入给task函数的参数 */
bool add_task(thread_pool *pool,void *(*task)(void *arg), void *arg)
{struct task *new_task = malloc(sizeof(struct task)); //新建一个任务节点if(new_task == NULL){perror("allocate memory error");return false;}new_task->task = task; //将任务需要做的函数存进task指针中new_task->arg = arg; //将任务函数参数记录在arg里面new_task->next = NULL; //将任务节点的下一个位置指向NULLpthread_mutex_lock(&pool->lock); //上锁if(pool->waiting_tasks >= MAX_WAITING_TASKS) //判断任务数量有没有超标{pthread_mutex_unlock(&pool->lock); //解锁fprintf(stderr, "too many tasks.\n"); //反馈太多任务了free(new_task); //释放掉刚才登记的任务节点return false; //返回添加不了任务到任务链表中}struct task *tmp = pool->task_list; //将线程池中任务链表的头节点登记到tmpwhile(tmp->next != NULL) //将tmp指向最后的节点的位置tmp = tmp->next;tmp->next = new_task; //将新建的任务节点插入到链表中pool->waiting_tasks++; //将等待的任务数量+1pthread_mutex_unlock(&pool->lock); //解锁pthread_cond_signal(&pool->cond); //唤醒正在睡眠中的线程return true; //返回添加成功
}/* 添加线程到线程池中 pool:线程池结构体地址, additional_threads:添加的线程的数量 */
int add_thread(thread_pool *pool, unsigned additional_threads)
{if(additional_threads == 0)return 0;unsigned total_threads =pool->active_threads + additional_threads; //将总数记录在这个变量中int i, actual_increment = 0;for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++){if(pthread_create(&((pool->tids)[i]),NULL, routine, (void *)pool) != 0) //新建线程{perror("add threads error");if(actual_increment == 0)return -1;break;}actual_increment++; //记录成功添加了多少条线程到线程池中}pool->active_threads += actual_increment; //将最后成功添加到线程池的线程总数记录线程池中return actual_increment; //返回新建了多少条线程
}int remove_thread(thread_pool *pool, unsigned int removing_threads)
{if(removing_threads == 0)return pool->active_threads;int remain_threads = pool->active_threads - removing_threads; //将移除线程之后的线程数量登记下来remain_threads = remain_threads>0 ? remain_threads:1; //如果这个数量不大于0,则把它置1int i;for(i=pool->active_threads-1; i>remain_threads-1; i--) //从id的最后一位线程开始取消{errno = pthread_cancel(pool->tids[i]);if(errno != 0)break;}if(i == pool->active_threads-1) //判断是否取消掉要求的数量return -1;elsepool->active_threads = i+1; //将新的线程数量登记active_threadsreturn pool->active_threads; //返回剩下多少条线程在线程池中
}bool destroy_pool(thread_pool *pool)
{pool->shutdown = true; //使能线程池的退出开关pthread_cond_broadcast(&pool->cond); //将所有的线程全部唤醒int i;for(i=0; i<pool->active_threads; i++) //开始接合线程{errno = pthread_join(pool->tids[i], NULL);if(errno != 0){printf("join tids[%d] error: %s\n",i, strerror(errno));}elseprintf("[%u] is joined\n", (unsigned)pool->tids[i]);}free(pool->task_list); //释放掉任务头节点free(pool->tids); //释放掉线程ID内存return true;
}
(3)、main.c
#include "thread_pool.h"void *mytask1(void *arg)
{long n = (long)arg;printf("任务1准备执行,执行线程为%u, 执行 %ld 秒...\n",(unsigned)pthread_self(), n);sleep(n);printf("任务1执行完毕,完成线程为%d\n",(unsigned)pthread_self());return NULL;
}void *mytask2(void *arg)
{long n = (long)arg;printf("任务2准备执行,执行线程为%u, 执行 %ld 秒...\n",(unsigned)pthread_self(), n);sleep(n);printf("任务2执行完毕,完成线程为%d\n",(unsigned)pthread_self());return NULL;
}void *mytask3(void *arg)
{long n = (long)arg;printf("任务3准备执行,执行线程为%u, 执行 %ld 秒...\n",(unsigned)pthread_self(), n);sleep(n);printf("任务3执行完毕,完成线程为%d\n",(unsigned)pthread_self());return NULL;
}void *count_time(void *arg)
{int count=0;while(1){sleep(1);printf("现在是第%d秒\n", count++);}
}int main(void)
{pthread_t a;pthread_create(&a, NULL, count_time, NULL);//只是用来创建一条用于计时的线程,跟线程池一点关系都没有// 1, initialize the poolthread_pool pool;init_pool(&pool, 2);// 2, throw tasksprintf("throwing 3 tasks...\n");add_task(&pool, mytask1, (void *)((long)rand()%10));add_task(&pool, mytask2, (void *)((long)rand()%10));add_task(&pool, mytask3, (void *)((long)rand()%10));// 7, destroy the pooldestroy_pool(&pool);return 0;
}
上述线程池测试用例运行结果如下: