Linux工作队列workqueue的实现
一、work_struct 结构体和初始化
struct work_struct {unsigned long pending;struct list_head entry;void (*func)(void *);void *data;void *wq_data;struct timer_list timer;
};#define __WORK_INITIALIZER(n, f, d) { \.entry = { &(n).entry, &(n).entry }, \.func = (f), \.data = (d), \.timer = TIMER_INITIALIZER(NULL, 0, 0), \}#define DECLARE_WORK(n, f, d) \struct work_struct n = __WORK_INITIALIZER(n, f, d)/* * initialize a work-struct's func and data pointers:*/
#define PREPARE_WORK(_work, _func, _data) \do { \(_work)->func = _func; \(_work)->data = _data; \} while (0)/* * initialize all of a work-struct:*/
#define INIT_WORK(_work, _func, _data) \do { \INIT_LIST_HEAD(&(_work)->entry); \(_work)->pending = 0; \PREPARE_WORK((_work), (_func), (_data)); \init_timer(&(_work)->timer); \} while (0)
1. work_struct 结构体
struct work_struct {unsigned long pending;struct list_head entry;void (*func)(void *);void *data;void *wq_data;struct timer_list timer;
};
pending
:挂起标志
- 表示该work是否已经在工作队列中排队
- 防止同一个work被多次加入队列
entry
:链表节点
- 用于将work连接到工作队列的链表中
struct list_head
是内核通用双向循环链表结构
func
:工作函数指针
- work实际要执行的函数
- 参数为
void *
类型,通过data
传递
data
:工作函数参数
- 传递给
func
函数的参数 - 通常是包含上下文信息的结构体指针
wq_data
:工作队列数据
- 指向所属工作队列的内部数据结构
- 用于work与特定工作队列的关联
timer
:定时器
- 用于实现延迟工作(delayed work)
- 可以调度work在指定的时间后执行
2. __WORK_INITIALIZER 初始化宏
#define __WORK_INITIALIZER(n, f, d) { \.entry = { &(n).entry, &(n).entry }, \.func = (f), \.data = (d), \.timer = TIMER_INITIALIZER(NULL, 0, 0), \
}
.entry = { &(n).entry, &(n).entry }
- 初始化双向链表,前后指针都指向自己
- 表示一个空的链表节点
.func = (f)
- 设置工作函数指针
.data = (d)
- 设置工作函数参数
.timer = TIMER_INITIALIZER(NULL, 0, 0)
- 初始化定时器,回调函数为NULL,到期时间为0
- 表示定时器未激活状态
3. DECLARE_WORK 声明宏
#define DECLARE_WORK(n, f, d) \struct work_struct n = __WORK_INITIALIZER(n, f, d)
使用示例:
DECLARE_WORK(my_work, my_work_function, &my_data);
展开结果:
struct work_struct my_work = {.entry = { &my_work.entry, &my_work.entry },.func = my_work_function,.data = &my_data,.timer = TIMER_INITIALIZER(NULL, 0, 0),
};
作用:静态声明并初始化一个work结构体
4. PREPARE_WORK 准备宏
#define PREPARE_WORK(_work, _func, _data) \do { \(_work)->func = _func; \(_work)->data = _data; \} while (0)
用于重新配置已存在的work结构体
使用示例:
struct work_struct my_work;// 重新配置work
PREPARE_WORK(&my_work, new_function, new_data);
5. INIT_WORK 完全初始化宏
#define INIT_WORK(_work, _func, _data) \do { \INIT_LIST_HEAD(&(_work)->entry); \(_work)->pending = 0; \PREPARE_WORK((_work), (_func), (_data)); \init_timer(&(_work)->timer); \} while (0)
INIT_LIST_HEAD(&(_work)->entry)
- 初始化链表头,前后指针指向自己
(_work)->pending = 0
- 清除挂起标志,表示work不在队列中
PREPARE_WORK((_work), (_func), (_data))
- 设置工作函数和参数
init_timer(&(_work)->timer)
- 初始化定时器结构
6. 与tasklet的关键区别
workqueue vs tasklet:
特性 | workqueue | tasklet |
---|---|---|
执行上下文 | 进程上下文 | 软中断上下文 |
是否可以睡眠 | 是 | 否 |
调度方式 | 内核线程 | 软中断 |
优先级 | 普通 | 较高 |
适用场景 | 长时间工作 | 短时间工作 |
二、创建工作队列create_workqueue
#define create_workqueue(name) __create_workqueue((name), 0)
struct workqueue_struct *__create_workqueue(const char *name,int singlethread)
{int cpu, destroy = 0;struct workqueue_struct *wq;struct task_struct *p;BUG_ON(strlen(name) > 10);wq = kmalloc(sizeof(*wq), GFP_KERNEL);if (!wq)return NULL;memset(wq, 0, sizeof(*wq));wq->name = name;/* We don't need the distraction of CPUs appearing and vanishing. */lock_cpu_hotplug();if (singlethread) {INIT_LIST_HEAD(&wq->list);p = create_workqueue_thread(wq, 0);if (!p)destroy = 1;elsewake_up_process(p);} else {spin_lock(&workqueue_lock);list_add(&wq->list, &workqueues);spin_unlock(&workqueue_lock);for_each_online_cpu(cpu) {p = create_workqueue_thread(wq, cpu);if (p) {kthread_bind(p, cpu);wake_up_process(p);} elsedestroy = 1;}}unlock_cpu_hotplug();/** Was there any error during startup? If yes then clean up:*/if (destroy) {destroy_workqueue(wq);wq = NULL;}return wq;
}
1. 宏定义和函数声明
#define create_workqueue(name) __create_workqueue((name), 0)
- 定义宏
create_workqueue
,它调用__create_workqueue
函数,第二个参数为 0
struct workqueue_struct *__create_workqueue(const char *name, int singlethread)
- 函数声明,返回工作队列结构体指针
- 参数:
name
: 工作队列名称singlethread
: 标识是否创建单线程工作队列
2. 变量声明
int cpu, destroy = 0;
struct workqueue_struct *wq;
struct task_struct *p;
cpu
: 循环计数器,用于遍历CPUdestroy
: 错误标志,如果创建过程中出错则设为1wq
: 工作队列结构体指针p
: 任务结构体指针(代表内核线程)
3. 输入验证和内存分配
BUG_ON(strlen(name) > 10);
- 内核调试宏,如果工作队列名称长度超过10个字符,会触发内核bug
- 这是为了确保名称不会过长
wq = kmalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)return NULL;
- 为工作队列结构体分配内核内存
GFP_KERNEL
: 标准的内核内存分配标志- 如果分配失败,返回NULL
memset(wq, 0, sizeof(*wq));
- 将分配的内存清零,初始化所有字段为0
4. 工作队列初始化
wq->name = name;
- 设置工作队列的名称
lock_cpu_hotplug();
- 锁定CPU热插拔
- 防止在创建工作队列过程中有CPU被加入或移除,保证操作的原子性
5. 单线程工作队列创建
if (singlethread) {INIT_LIST_HEAD(&wq->list);p = create_workqueue_thread(wq, 0);if (!p)destroy = 1;elsewake_up_process(p);
}
- 如果要求创建单线程工作队列:
INIT_LIST_HEAD(&wq->list)
: 初始化链表头create_workqueue_thread(wq, 0)
: 创建工作队列线程,绑定到任意CPU- 如果创建线程失败,设置
destroy = 1
- 如果成功,
wake_up_process(p)
: 唤醒创建的线程使其开始运行
6. 多线程工作队列创建
else {spin_lock(&workqueue_lock);list_add(&wq->list, &workqueues);spin_unlock(&workqueue_lock);
- 多线程工作队列的情况:
- 获取工作队列锁
- 将新工作队列添加到全局工作队列链表
workqueues
中 - 释放锁
for_each_online_cpu(cpu) {p = create_workqueue_thread(wq, cpu);if (p) {kthread_bind(p, cpu);wake_up_process(p);} elsedestroy = 1;}
}
for_each_online_cpu(cpu)
: 遍历所有在线的CPU核心- 为每个CPU核心创建一个工作线程:
create_workqueue_thread(wq, cpu)
: 创建工作队列线程kthread_bind(p, cpu)
: 将线程绑定到特定的CPU核心wake_up_process(p)
: 唤醒线程- 如果任何线程创建失败,设置
destroy = 1
7. 清理和返回
unlock_cpu_hotplug();
- 解锁CPU热插拔,允许CPU状态变化
if (destroy) {destroy_workqueue(wq);wq = NULL;
}
return wq;
- 检查是否有错误发生:
- 如果有错误(
destroy == 1
),销毁已创建的工作队列,返回NULL - 如果没有错误,返回创建的工作队列指针
- 如果有错误(
8. 关键点总结
-
两种工作队列:
- 单线程:只有一个工作线程处理所有工作
- 多线程:每个CPU核心都有一个工作线程
-
CPU绑定:多线程工作队列中,每个线程绑定到特定CPU,有利于缓存局部性
-
错误处理:采用"全部成功或全部回滚"的策略
-
并发安全:使用CPU热插拔锁和工作队列锁保证操作原子性
三、创建工作队列线程create_workqueue_thread
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,int cpu)
{struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;struct task_struct *p;spin_lock_init(&cwq->lock);cwq->wq = wq;cwq->thread = NULL;cwq->insert_sequence = 0;cwq->remove_sequence = 0;INIT_LIST_HEAD(&cwq->worklist);init_waitqueue_head(&cwq->more_work);init_waitqueue_head(&cwq->work_done);if (is_single_threaded(wq))p = kthread_create(worker_thread, cwq, "%s", wq->name);elsep = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);if (IS_ERR(p))return NULL;cwq->thread = p;return p;
}
1. 函数声明和变量定义
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, int cpu)
static
: 函数只在当前文件中可见- 返回
struct task_struct *
: 指向新创建的内核线程的指针 - 参数:
wq
: 工作队列结构体指针cpu
: 该线程要绑定的CPU编号
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
struct task_struct *p;
cwq
: 指向特定CPU的工作队列结构体wq->cpu_wq
是struct cpu_workqueue_struct
数组+ cpu
通过CPU编号计算偏移量,获取对应CPU的工作队列
p
: 将要创建的内核线程的任务结构体指针
2. 初始化CPU工作队列结构体
spin_lock_init(&cwq->lock);
- 初始化自旋锁,用于保护这个CPU工作队列的并发访问
- 这个锁会保护
worklist
和其他字段的访问
cwq->wq = wq;
- 设置指向父工作队列的指针,建立反向引用关系
cwq->thread = NULL;
- 初始时线程指针设为NULL,创建成功后再赋值
cwq->insert_sequence = 0;
cwq->remove_sequence = 0;
- 初始化序列号计数器:
insert_sequence
: 工作项插入序列号,用于跟踪工作提交顺序remove_sequence
: 工作项执行序列号,用于跟踪工作完成顺序
- 这些序列号可用于调试和统计
3. 初始化链表和等待队列
INIT_LIST_HEAD(&cwq->worklist);
- 初始化工作项链表头
- 所有提交到该CPU工作队列的工作项都会链接到这个链表
init_waitqueue_head(&cwq->more_work);
- 初始化"有更多工作"等待队列
- 当工作线程空闲时,会在这个队列上睡眠,等待新工作项的加入
init_waitqueue_head(&cwq->work_done);
- 初始化"工作完成"等待队列
- 用于同步操作,当需要等待特定工作项完成时使用
4. 创建内核线程
if (is_single_threaded(wq))p = kthread_create(worker_thread, cwq, "%s", wq->name);
elsep = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
is_single_threaded(wq)
: 判断是否是单线程工作队列- 单线程工作队列:
- 线程命名格式:
"%s"
,只使用工作队列名称 - 例如:
"events"
- 线程命名格式:
- 多线程工作队列:
- 线程命名格式:
"%s/%d"
,包含工作队列名称和CPU编号 - 例如:
"kblockd/0"
,"kblockd/1"
- 线程命名格式:
kthread_create
参数:worker_thread
: 线程的主函数(工作线程的入口点)cwq
: 传递给线程函数的参数- 名称格式字符串和参数
5. 错误检查和线程指针设置
if (IS_ERR(p))return NULL;
IS_ERR(p)
: 检查线程创建是否失败kthread_create
失败时返回错误指针,这里转换为返回NULL
cwq->thread = p;
return p;
- 将新创建的线程指针保存到CPU工作队列结构体中
- 返回新创建的线程指针
四、工作队列线程的主函数worker_thread
static int worker_thread(void *__cwq)
{struct cpu_workqueue_struct *cwq = __cwq;DECLARE_WAITQUEUE(wait, current);struct k_sigaction sa;sigset_t blocked;current->flags |= PF_NOFREEZE;set_user_nice(current, -10);/* Block and flush all signals */sigfillset(&blocked);sigprocmask(SIG_BLOCK, &blocked, NULL);flush_signals(current);/* SIG_IGN makes children autoreap: see do_notify_parent(). */sa.sa.sa_handler = SIG_IGN;sa.sa.sa_flags = 0;siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);set_current_state(TASK_INTERRUPTIBLE);while (!kthread_should_stop()) {add_wait_queue(&cwq->more_work, &wait);if (list_empty(&cwq->worklist))schedule();else__set_current_state(TASK_RUNNING);remove_wait_queue(&cwq->more_work, &wait);if (!list_empty(&cwq->worklist))run_workqueue(cwq);set_current_state(TASK_INTERRUPTIBLE);}__set_current_state(TASK_RUNNING);return 0;
}
1. 函数声明和变量定义
static int worker_thread(void *__cwq)
static
: 函数只在当前文件中可见- 返回
int
: 线程退出状态码 - 参数
__cwq
: 传递给线程的参数,实际上是struct cpu_workqueue_struct *
struct cpu_workqueue_struct *cwq = __cwq;
DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa;
sigset_t blocked;
cwq
: 将参数转换为CPU工作队列结构体指针DECLARE_WAITQUEUE(wait, current)
: 声明并初始化等待队列条目wait
: 等待队列条目名称current
: 当前线程的任务结构体
sa
: 信号处理动作结构体blocked
: 信号集,用于阻塞信号
2. 线程特性设置
current->flags |= PF_NOFREEZE;
- 设置不冻结标志:
PF_NOFREEZE
: 防止线程在系统休眠时被冻结- 工作线程需要持续运行来处理工作,不能被挂起
set_user_nice(current, -10);
- 设置线程优先级:
nice值 = -10
,表示较高的优先级- nice值范围:-20(最高优先级)到19(最低优先级)
- 工作线程需要及时响应工作请求,所以设置较高优先级
3. 信号处理设置
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
sigfillset(&blocked)
: 填充信号集,包含所有信号sigprocmask(SIG_BLOCK, &blocked, NULL)
: 阻塞所有信号- 工作线程不应该被普通信号中断
flush_signals(current)
: 清空待处理的信号
sa.sa.sa_handler = SIG_IGN;
sa.sa.sa_flags = 0;
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
- 忽略SIGCHLD信号:
SIG_IGN
: 忽略信号处理SIGCHLD
: 子进程状态改变信号- 设置SIGCHLD的处理动作为忽略,使得子进程自动回收
- 工作线程不应该处理子进程,所以忽略这个信号
4. 主循环 - 等待和工作处理
set_current_state(TASK_INTERRUPTIBLE);
- 设置当前线程状态为 可中断的睡眠
- 线程可以接收信号并被唤醒
while (!kthread_should_stop()) {
- 主循环条件:
kthread_should_stop()
检查是否收到停止信号- 当调用
kthread_stop()
时,这个函数返回true - 循环继续运行,直到收到停止请求
- 当调用
5. 等待机制
add_wait_queue(&cwq->more_work, &wait);
if (list_empty(&cwq->worklist))schedule();
else__set_current_state(TASK_RUNNING);
remove_wait_queue(&cwq->more_work, &wait);
-
加入等待队列:
add_wait_queue(&cwq->more_work, &wait);
- 将当前线程添加到
more_work
等待队列 - 当有新工作加入时,会唤醒这个队列上的线程
- 将当前线程添加到
-
检查工作列表:
if (list_empty(&cwq->worklist))schedule(); else__set_current_state(TASK_RUNNING);
- 如果工作列表为空:调用
schedule()
让出CPU,线程进入睡眠 - 如果工作列表不为空:设置状态为运行,继续处理工作
- 如果工作列表为空:调用
-
移除等待队列:
remove_wait_queue(&cwq->more_work, &wait);
- 从等待队列中移除当前线程
6. 工作处理
if (!list_empty(&cwq->worklist))run_workqueue(cwq);
- 再次检查工作列表
- 如果列表不为空,调用
run_workqueue(cwq)
执行所有待处理的工作
set_current_state(TASK_INTERRUPTIBLE);
- 处理完工作后,重新设置状态为可中断睡眠,准备下一次等待
7. 线程退出处理
__set_current_state(TASK_RUNNING);
return 0;
- 当收到停止信号退出循环时:
- 设置线程状态为运行(清理状态)
- 返回0表示正常退出
五、工作队列的执行函数run_workqueue
static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
{unsigned long flags;/** Keep taking off work from the queue until* done.*/spin_lock_irqsave(&cwq->lock, flags);cwq->run_depth++;if (cwq->run_depth > 3) {/* morton gets to eat his hat */printk("%s: recursion depth exceeded: %d\n",__FUNCTION__, cwq->run_depth);dump_stack();}while (!list_empty(&cwq->worklist)) {struct work_struct *work = list_entry(cwq->worklist.next,struct work_struct, entry);void (*f) (void *) = work->func;void *data = work->data;list_del_init(cwq->worklist.next);spin_unlock_irqrestore(&cwq->lock, flags);BUG_ON(work->wq_data != cwq);clear_bit(0, &work->pending);f(data);spin_lock_irqsave(&cwq->lock, flags);cwq->remove_sequence++;wake_up(&cwq->work_done);}cwq->run_depth--;spin_unlock_irqrestore(&cwq->lock, flags);
}
1. 函数声明和变量
static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
void
: 无返回值- 参数
cwq
: CPU工作队列结构体指针
unsigned long flags;
- 用于保存中断状态的变量,在加锁时使用
2. 加锁和递归深度检查
spin_lock_irqsave(&cwq->lock, flags);
- 加锁并保存中断状态:
- 获取自旋锁保护工作队列
- 禁用本地CPU中断
- 保存当前中断状态到
flags
cwq->run_depth++;
- 增加运行深度计数器:
- 记录当前递归深度
- 用于检测可能的递归调用问题
if (cwq->run_depth > 3) {/* morton gets to eat his hat */printk("%s: recursion depth exceeded: %d\n",__FUNCTION__, cwq->run_depth);dump_stack();
}
- 递归深度检查:
- 如果递归深度超过3,认为可能出现了递归调用问题
- 打印错误信息和递归深度
dump_stack()
: 输出调用栈用于调试
3. 工作处理循环
while (!list_empty(&cwq->worklist)) {
- 循环条件:当工作链表不为空时继续循环
- 处理所有待执行的工作项
4. 获取工作项信息
struct work_struct *work = list_entry(cwq->worklist.next,struct work_struct, entry);
- 从链表中获取工作项:
cwq->worklist.next
: 链表中的第一个工作项list_entry
: 通过链表指针获取包含它的work_struct
结构体entry
: 是work_struct
中的链表节点字段名
void (*f) (void *) = work->func;
void *data = work->data;
- 提取工作函数和数据:
f
: 工作函数的函数指针data
: 传递给工作函数的参数数据
5. 从链表中移除工作项
list_del_init(cwq->worklist.next);
- 从链表中删除工作项:
- 将工作项从工作链表中移除
_init
版本会同时初始化链表节点,使其处于未链接状态
spin_unlock_irqrestore(&cwq->lock, flags);
- 临时释放锁:
- 在真正执行工作函数前释放锁
- 恢复中断状态
- 这样其他CPU可以继续向工作队列添加新工作
6. 执行工作函数
BUG_ON(work->wq_data != cwq);
- 调试检查:
- 验证工作项确实属于当前工作队列
- 如果不匹配,触发内核BUG
clear_bit(0, &work->pending);
- 清除pending标志:
- 将工作项的pending位清零
- 表示这个工作项正在执行或已经完成
f(data);
- 执行实际的工作函数:
- 调用工作项注册的函数
- 传递数据参数
- 这是工作队列的核心功能
7. 重新加锁和后续处理
spin_lock_irqsave(&cwq->lock, flags);
- 重新加锁:
- 工作函数执行完成后,重新获取锁
- 准备更新队列状态
cwq->remove_sequence++;
- 增加移除序列号:
- 每次完成一个工作项就递增
- 用于跟踪工作完成的顺序
wake_up(&cwq->work_done);
- 唤醒等待者:
- 如果有线程在等待工作完成,唤醒它们
- 用于实现同步工作提交
循环结束和清理
cwq->run_depth--;
- 减少运行深度:恢复递归深度计数器
spin_unlock_irqrestore(&cwq->lock, flags);
- 最终释放锁:处理完成,释放锁并恢复中断
六、清理工作队列线程cleanup_workqueue_thread
static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
{struct cpu_workqueue_struct *cwq;unsigned long flags;struct task_struct *p;cwq = wq->cpu_wq + cpu;spin_lock_irqsave(&cwq->lock, flags);p = cwq->thread;cwq->thread = NULL;spin_unlock_irqrestore(&cwq->lock, flags);if (p)kthread_stop(p);
}
1. 函数声明和变量定义
static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
static
: 表示这个函数只在当前文件中可见void
: 函数没有返回值- 参数:
wq
: 要清理的工作队列结构体指针cpu
: 指定要清理哪个CPU对应的工作线程
struct cpu_workqueue_struct *cwq;
unsigned long flags;
struct task_struct *p;
cwq
: 指向特定CPU的工作队列结构体flags
: 用于保存中断状态,在加锁时使用p
: 指向内核线程任务结构体的指针
2. 获取CPU工作队列指针
cwq = wq->cpu_wq + cpu;
wq->cpu_wq
是一个struct cpu_workqueue_struct
数组+ cpu
通过CPU编号计算偏移量,获取对应CPU的工作队列结构体
3. 安全地获取并清除线程指针
spin_lock_irqsave(&cwq->lock, flags);
- 加锁并保存中断状态:
spin_lock_irqsave
: 获取自旋锁,同时禁用本地CPU的中断,并保存当前中断状态到flags
- 这是为了防止在操作线程指针时被中断或其他CPU核心干扰
p = cwq->thread;
cwq->thread = NULL;
p = cwq->thread
: 保存当前线程指针到局部变量p
cwq->thread = NULL
: 立即将工作队列中的线程指针设为NULL- 这样做是为了避免竞争条件:在停止线程的过程中,不会有新的工作被加入到队列
spin_unlock_irqrestore(&cwq->lock, flags);
- 解锁并恢复中断状态:
- 释放自旋锁
- 恢复之前保存的中断状态(重新启用中断)
- 现在锁保护的范围结束
4. 停止工作线程
if (p)kthread_stop(p);
if (p)
: 检查是否真的存在要停止的线程(避免空指针)kthread_stop(p)
: 停止内核线程- 这个函数会发送停止信号给线程
- 等待线程完成当前工作并退出
- 是同步操作,会阻塞直到线程完全停止
七、销毁工作队列destroy_workqueue
void destroy_workqueue(struct workqueue_struct *wq)
{int cpu;flush_workqueue(wq);/* We don't need the distraction of CPUs appearing and vanishing. */lock_cpu_hotplug();if (is_single_threaded(wq))cleanup_workqueue_thread(wq, 0);else {for_each_online_cpu(cpu)cleanup_workqueue_thread(wq, cpu);spin_lock(&workqueue_lock);list_del(&wq->list);spin_unlock(&workqueue_lock);}unlock_cpu_hotplug();kfree(wq);
}
1. 函数声明和变量
void destroy_workqueue(struct workqueue_struct *wq)
void
: 无返回值- 参数
wq
: 要销毁的工作队列结构体指针
int cpu;
- 循环计数器,用于遍历所有CPU
2. 刷新工作队列
flush_workqueue(wq);
- 等待所有未完成的工作项执行完毕:
- 这个函数会阻塞,直到工作队列中所有待处理的工作项都完成执行
- 确保在销毁工作队列前,所有已提交的工作都处理完毕
- 防止工作项在队列销毁后仍尝试执行
3. 锁定CPU热插拔
lock_cpu_hotplug();
- 锁定CPU热插拔:
- 防止在销毁过程中有CPU被加入或移除系统
- 确保CPU状态的稳定性,避免竞争条件
4. 单线程工作队列清理
if (is_single_threaded(wq))cleanup_workqueue_thread(wq, 0);
- 判断工作队列类型:
is_single_threaded(wq)
检查是否为单线程工作队列 - 单线程工作队列清理:
- 调用
cleanup_workqueue_thread(wq, 0)
清理工作线程
- 调用
5. 多线程工作队列清理
else {for_each_online_cpu(cpu)cleanup_workqueue_thread(wq, cpu);
- 多线程工作队列:为每个在线CPU创建工作线程
- 遍历所有在线CPU:
for_each_online_cpu(cpu)
宏遍历系统中所有活跃的CPU - 清理每个CPU的工作线程:对每个CPU调用
cleanup_workqueue_thread(wq, cpu)
spin_lock(&workqueue_lock);list_del(&wq->list);spin_unlock(&workqueue_lock);
}
- 从全局工作队列列表中移除:
spin_lock(&workqueue_lock)
: 获取工作队列全局锁list_del(&wq->list)
: 从全局工作队列链表workqueues
中删除当前工作队列spin_unlock(&workqueue_lock)
: 释放全局锁
- 为什么需要这个操作:
- 多线程工作队列在创建时被添加到全局链表
- 销毁时必须从链表中移除,防止后续被错误访问
6. 解锁和内存释放
unlock_cpu_hotplug();
- 解锁CPU热插拔:
- 允许CPU状态再次变化
- 与前面的
lock_cpu_hotplug()
配对使用
kfree(wq);
- 释放工作队列内存:
kfree()
: 内核内存释放函数- 释放之前分配的
struct workqueue_struct
内存空间 - 这是销毁过程的最后一步
7. 关键设计原理
7.1. 安全销毁流程
1. 等待所有工作完成 (flush_workqueue)
2. 锁定系统状态 (lock_cpu_hotplug)
3. 清理工作线程 (cleanup_workqueue_thread)
4. 从全局结构移除 (list_del)
5. 解锁系统状态 (unlock_cpu_hotplug)
6. 释放内存 (kfree)
八、刷新工作队列flush_workqueue
void fastcall flush_workqueue(struct workqueue_struct *wq)
{might_sleep();if (is_single_threaded(wq)) {/* Always use cpu 0's area. */flush_cpu_workqueue(wq->cpu_wq + 0);} else {int cpu;lock_cpu_hotplug();for_each_online_cpu(cpu)flush_cpu_workqueue(wq->cpu_wq + cpu);unlock_cpu_hotplug();}
}
1. 函数声明
void fastcall flush_workqueue(struct workqueue_struct *wq)
void
: 无返回值fastcall
: 函数调用约定,表示通过寄存器传递参数(x86架构优化)- 参数
wq
: 要刷新的工作队列结构体指针
2. 睡眠检查
might_sleep();
- 可能睡眠的声明:
- 这是一个调试辅助宏,告诉内核这个函数可能会睡眠(阻塞)
- 如果在原子上下文(如中断处理)中调用,会发出警告
- 因为
flush_workqueue
需要等待工作完成,可能会长时间阻塞
3. 单线程工作队列处理
if (is_single_threaded(wq)) {/* Always use cpu 0's area. */flush_cpu_workqueue(wq->cpu_wq + 0);
}
- 判断工作队列类型:
is_single_threaded(wq)
检查是否为单线程工作队列 - 单线程工作队列刷新:
- 总是使用CPU 0的区域
wq->cpu_wq + 0
: 获取CPU 0的工作队列结构体flush_cpu_workqueue()
: 刷新单个CPU的工作队列,等待该CPU上所有工作完成
4. 多线程工作队列处理
else {int cpu;lock_cpu_hotplug();
- 多线程工作队列:进入else分支
- 声明CPU计数器
- 锁定CPU热插拔:
- 防止在刷新过程中有CPU被加入或移除
- 确保CPU列表的稳定性
for_each_online_cpu(cpu)flush_cpu_workqueue(wq->cpu_wq + cpu);
- 遍历所有在线CPU:
for_each_online_cpu(cpu)
宏遍历系统中所有活跃的CPU - 刷新每个CPU的工作队列:
wq->cpu_wq + cpu
: 获取对应CPU的工作队列结构体flush_cpu_workqueue()
: 刷新指定CPU的工作队列
unlock_cpu_hotplug();
}
- 解锁CPU热插拔:允许CPU状态再次变化
九、刷新单个CPU工作队列flush_cpu_workqueue
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{if (cwq->thread == current) {/** Probably keventd trying to flush its own queue. So simply run* it by hand rather than deadlocking.*/run_workqueue(cwq);} else {DEFINE_WAIT(wait);long sequence_needed;spin_lock_irq(&cwq->lock);sequence_needed = cwq->insert_sequence;while (sequence_needed - cwq->remove_sequence > 0) {prepare_to_wait(&cwq->work_done, &wait,TASK_UNINTERRUPTIBLE);spin_unlock_irq(&cwq->lock);schedule();spin_lock_irq(&cwq->lock);}finish_wait(&cwq->work_done, &wait);spin_unlock_irq(&cwq->lock);}
}
1. 函数声明
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
static
: 函数只在当前文件中可见void
: 无返回值- 参数
cwq
: 要刷新的CPU工作队列结构体指针
2. 特殊情况:当前线程就是工作线程
if (cwq->thread == current) {/** Probably keventd trying to flush its own queue. So simply run* it by hand rather than deadlocking.*/run_workqueue(cwq);
}
cwq->thread == current
-
检查是否在工作线程内部调用刷新:
cwq->thread
: 该工作队列的工作线程current
: 当前正在执行的线程- 如果相等,说明在工作线程内部尝试刷新自己的工作队列
-
注释说明:可能是
keventd
(内核事件守护进程)在刷新自己的队列 -
直接执行工作队列:
run_workqueue(cwq)
-
避免死锁:如果等待自己完成工作,会导致死锁
3. 正常情况:外部线程刷新
else {DEFINE_WAIT(wait);long sequence_needed;
DEFINE_WAIT(wait)
: 定义并初始化等待队列条目sequence_needed
: 需要等待的序列号
spin_lock_irq(&cwq->lock);
sequence_needed = cwq->insert_sequence;
- 加锁:保护工作队列状态
- 获取当前插入序列号:
cwq->insert_sequence
- 序列号语义:
insert_sequence
: 工作项插入时的序列号(每次插入递增)remove_sequence
: 工作项完成时的序列号(每次完成递增)- 如果
insert_sequence > remove_sequence
,说明有未完成的工作
4. 等待循环
while (sequence_needed - cwq->remove_sequence > 0) {
sequence_needed - cwq->remove_sequence > 0
- 表示还有工作项未完成:
- 我们在
sequence_needed
时刻记录插入序列号 - 只有当
remove_sequence
达到这个值时,所有之前的工作才完成
- 我们在
5. 等待机制
prepare_to_wait(&cwq->work_done, &wait, TASK_UNINTERRUPTIBLE);
spin_unlock_irq(&cwq->lock);
schedule();
spin_lock_irq(&cwq->lock);
-
准备等待:
prepare_to_wait(&cwq->work_done, &wait, TASK_UNINTERRUPTIBLE);
- 将当前线程添加到
work_done
等待队列 TASK_UNINTERRUPTIBLE
: 不可中断的睡眠状态(不能被信号唤醒)
- 将当前线程添加到
-
释放锁并调度:
spin_unlock_irq(&cwq->lock); schedule();
- 释放锁,让工作线程可以继续执行和更新状态
schedule()
: 主动让出CPU,进入睡眠状态
-
被唤醒后重新加锁:
spin_lock_irq(&cwq->lock);
- 当工作线程完成工作并唤醒等待者时,重新获取锁
- 检查条件是否满足
6. 清理等待状态
finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock);
- 完成等待:
finish_wait()
从等待队列中移除当前线程 - 最终释放锁:处理完成,释放锁
十、提交工作queue_work
static void __queue_work(struct cpu_workqueue_struct *cwq,struct work_struct *work)
{unsigned long flags;spin_lock_irqsave(&cwq->lock, flags);work->wq_data = cwq;list_add_tail(&work->entry, &cwq->worklist);cwq->insert_sequence++;wake_up(&cwq->more_work);spin_unlock_irqrestore(&cwq->lock, flags);
}
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{int ret = 0, cpu = get_cpu();if (!test_and_set_bit(0, &work->pending)) {if (unlikely(is_single_threaded(wq)))cpu = 0;BUG_ON(!list_empty(&work->entry));__queue_work(wq->cpu_wq + cpu, work);ret = 1;}put_cpu();return ret;
}
1. __queue_work
函数 - 实际工作入队操作
static void __queue_work(struct cpu_workqueue_struct *cwq,struct work_struct *work)
1.1. 变量声明和加锁
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
- 保存中断状态并加锁,保护工作队列的并发访问
1.2. 设置工作项属性
work->wq_data = cwq;
- 建立工作项与工作队列的关联:
wq_data
指向所属的CPU工作队列- 用于后续验证和工作执行时的上下文信息
list_add_tail(&work->entry, &cwq->worklist);
- 将工作项添加到队列尾部:
&work->entry
: 工作项中的链表节点&cwq->worklist
: CPU工作队列的工作链表_tail
表示添加到链表末尾
1.3. 更新序列号和唤醒工作线程
cwq->insert_sequence++;
- 递增插入序列号:
- 用于跟踪工作提交的顺序
- 在
flush_workqueue
中用于确定需要等待的工作范围
wake_up(&cwq->more_work);
- 唤醒工作线程:
- 如果工作线程在
more_work
等待队列上睡眠,唤醒它 - 使得工作线程可以立即开始处理新加入的工作
- 如果工作线程在
spin_unlock_irqrestore(&cwq->lock, flags);
- 释放锁并恢复中断状态
2. queue_work
函数 - 外部接口
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
2.1. 变量声明和CPU绑定
int ret = 0, cpu = get_cpu();
ret
: 返回值,0表示失败,1表示成功cpu = get_cpu()
: 禁用内核抢占并获取当前CPU编号- 防止在操作过程中被调度到其他CPU
- 确保操作的原子性
2.2. 工作项状态检查
if (!test_and_set_bit(0, &work->pending)) {
- 原子地测试并设置pending标志:
test_and_set_bit
: 原子操作,测试位0的值并设置为1- 如果原来就是1,说明工作项已经在队列中,返回0(失败)
- 如果原来是0,设置为1并返回0,进入if块
2.3. 单线程工作队列处理
if (unlikely(is_single_threaded(wq)))cpu = 0;
- 单线程工作队列特殊处理:
- 单线程工作队列总是使用CPU 0的工作队列
- 覆盖之前获取的当前CPU编号
2.4. 工作项状态验证
BUG_ON(!list_empty(&work->entry));
- 调试检查:验证工作项不在任何链表中
- 如果
work->entry
不为空,说明工作项已被使用,触发内核BUG - 防止重复提交同一个工作项
2.5. 实际入队操作
__queue_work(wq->cpu_wq + cpu, work);
ret = 1;
- 调用内部函数将工作项加入队列
- 设置返回值为1,表示成功
2.6. 清理和返回
put_cpu();
return ret;
put_cpu()
: 重新启用内核抢占- 与前面的
get_cpu()
配对使用 - 返回操作结果
十一、提交延迟工作queue_delayed_work
int fastcall queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
{int ret = 0;struct timer_list *timer = &work->timer;if (!test_and_set_bit(0, &work->pending)) {BUG_ON(timer_pending(timer));BUG_ON(!list_empty(&work->entry));/* This stores wq for the moment, for the timer_fn */work->wq_data = wq;timer->expires = jiffies + delay;timer->data = (unsigned long)work;timer->function = delayed_work_timer_fn;add_timer(timer);ret = 1;}return ret;
}
1. 函数声明
int fastcall queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
fastcall
: 函数调用约定优化(x86架构)- 返回
int
: 操作结果(1表示成功,0表示失败) - 参数:
wq
: 目标工作队列work
: 要延迟执行的工作项delay
: 延迟时间(以jiffies为单位)
2. 变量声明
int ret = 0;
struct timer_list *timer = &work->timer;
ret
: 返回值,初始化为0(失败)timer
: 指向工作项内嵌的定时器结构
3. 工作项状态检查
if (!test_and_set_bit(0, &work->pending)) {
- 原子地测试并设置pending标志:
test_and_set_bit(0, &work->pending)
: 原子操作- 如果原来pending位是0(工作项空闲),设置为1并返回0,进入if块
- 如果原来是1(工作项已提交),返回1,跳过if块,直接返回0
4. 调试断言检查
BUG_ON(timer_pending(timer));
- 检查定时器状态:
timer_pending(timer)
: 检查定时器是否已经在等待队列中- 如果定时器已激活,触发内核BUG
BUG_ON(!list_empty(&work->entry));
- 检查工作项链表状态:
- 验证工作项不在任何工作队列的链表中
- 如果工作项已在链表中,触发内核BUG
- 确保工作项处于干净状态
5. 设置工作项临时数据
work->wq_data = wq;
- 临时存储工作队列指针:
- 这个字段在定时器回调函数中会用到
- 注意:这里存储的是
workqueue_struct
,而不是cpu_workqueue_struct
- 在定时器到期时,需要根据这个信息找到正确的工作队列
6. 配置定时器
timer->expires = jiffies + delay;
- 设置定时器到期时间:
jiffies
: 系统启动以来的时钟滴答数delay
: 延迟的时钟滴答数
timer->data = (unsigned long)work;
- 设置定时器回调数据:
- 将工作项指针转换为unsigned long存储
- 在定时器回调函数中,通过这个数据找到对应的工作项
timer->function = delayed_work_timer_fn;
- 设置定时器回调函数:
delayed_work_timer_fn
是处理延迟工作的标准函数- 当定时器到期时,这个函数会被调用
7. 激活定时器
add_timer(timer);
- 将定时器添加到系统定时器队列:
- 从此时开始,定时器开始倒计时
- 当
jiffies
达到expires
值时,回调函数会被调用
8. 设置返回值并返回
ret = 1;
- 设置返回值为1,表示成功提交延迟工作
return ret;
- 返回操作结果
十二、延迟工作定时器的回调函数delayed_work_timer_fn
static void delayed_work_timer_fn(unsigned long __data)
{struct work_struct *work = (struct work_struct *)__data;struct workqueue_struct *wq = work->wq_data;int cpu = smp_processor_id();if (unlikely(is_single_threaded(wq)))cpu = 0;__queue_work(wq->cpu_wq + cpu, work);
}
1. 函数声明
static void delayed_work_timer_fn(unsigned long __data)
static
: 函数只在当前文件中可见void
: 无返回值- 参数
__data
: 定时器回调数据,实际上是指向work_struct
的指针
2. 获取工作项指针
struct work_struct *work = (struct work_struct *)__data;
- 从定时器数据恢复工作项指针:
__data
是在queue_delayed_work
中设置的:timer->data = (unsigned long)work
- 通过类型转换将
unsigned long
恢复为struct work_struct *
- 这样就找到了对应的延迟工作项
3. 获取工作队列信息
struct workqueue_struct *wq = work->wq_data;
- 从工作项中获取工作队列指针:
wq_data
是在queue_delayed_work
中设置的:work->wq_data = wq
- 这个字段临时存储了目标工作队列的信息
- 在定时器到期时,需要这个信息来知道将工作提交到哪个队列
4. 确定当前CPU
int cpu = smp_processor_id();
- 获取当前执行定时器的CPU编号:
smp_processor_id()
: 返回当前代码正在执行的CPU编号- 定时器回调可能在任何一个CPU上执行(取决于定时器到期时的调度)
- 这个信息用于确定将工作提交到哪个CPU的工作队列
5. 单线程工作队列处理
if (unlikely(is_single_threaded(wq)))cpu = 0;
- 检查工作队列类型:
is_single_threaded(wq)
: 判断是否为单线程工作队列
- 单线程工作队列特殊处理:
- 如果是单线程工作队列,强制使用CPU 0
- 覆盖之前获取的当前CPU编号
- 单线程工作队列只有一个工作线程,通常绑定到CPU 0
6. 实际提交工作到队列
__queue_work(wq->cpu_wq + cpu, work);
- 将工作项提交到指定CPU的工作队列:
wq->cpu_wq + cpu
: 计算目标CPU的工作队列结构体地址work
: 要提交的工作项__queue_work
: 内部函数,实际执行工作入队操作
十三、取消延迟工作cancel_delayed_work
static inline int cancel_delayed_work(struct work_struct *work)
{int ret;ret = del_timer_sync(&work->timer);if (ret)clear_bit(0, &work->pending);return ret;
}
1. 函数声明
static inline int cancel_delayed_work(struct work_struct *work)
static inline
: 内联函数,减少函数调用开销- 返回
int
: 操作结果(1表示成功取消,0表示失败) - 参数
work
: 要取消的延迟工作项指针
2. 变量声明
int ret;
ret
: 用于存储取消操作的返回值
3. 取消定时器
ret = del_timer_sync(&work->timer);
- 同步删除定时器:
del_timer_sync(&work->timer)
: 安全地删除工作项关联的定时器_sync
版本保证:当函数返回时,定时器处理程序不会在任何CPU上运行- 返回值:
1
: 成功删除了一个活跃的定时器(定时器尚未到期)0
: 定时器不活跃(可能已经到期或已被删除)
4. 清理工作项状态
if (ret)clear_bit(0, &work->pending);
- 条件清理pending标志:
- 只有在成功取消定时器时(
ret == 1
)才清理pending标志 clear_bit(0, &work->pending)
: 将pending标志位清零- 表示工作项不再处于等待执行状态
- 只有在成功取消定时器时(
5. 返回结果
return ret;
- 返回取消操作的结果:
1
: 成功取消了尚未执行的延迟工作0
: 取消失败(工作可能已经在执行或已完成)