当前位置: 首页 > news >正文

Linux工作队列workqueue的实现

一、work_struct 结构体和初始化

struct work_struct {unsigned long pending;struct list_head entry;void (*func)(void *);void *data;void *wq_data;struct timer_list timer;
};#define __WORK_INITIALIZER(n, f, d) {                           \.entry  = { &(n).entry, &(n).entry },                   \.func = (f),                                            \.data = (d),                                            \.timer = TIMER_INITIALIZER(NULL, 0, 0),                 \}#define DECLARE_WORK(n, f, d)                                   \struct work_struct n = __WORK_INITIALIZER(n, f, d)/* * initialize a work-struct's func and data pointers:*/
#define PREPARE_WORK(_work, _func, _data)                       \do {                                                    \(_work)->func = _func;                          \(_work)->data = _data;                          \} while (0)/*              * initialize all of a work-struct:*/
#define INIT_WORK(_work, _func, _data)                          \do {                                                    \INIT_LIST_HEAD(&(_work)->entry);                \(_work)->pending = 0;                           \PREPARE_WORK((_work), (_func), (_data));        \init_timer(&(_work)->timer);                    \} while (0)

1. work_struct 结构体

struct work_struct {unsigned long pending;struct list_head entry;void (*func)(void *);void *data;void *wq_data;struct timer_list timer;
};

pending:挂起标志

  • 表示该work是否已经在工作队列中排队
  • 防止同一个work被多次加入队列

entry:链表节点

  • 用于将work连接到工作队列的链表中
  • struct list_head是内核通用双向循环链表结构

func:工作函数指针

  • work实际要执行的函数
  • 参数为void *类型,通过data传递

data:工作函数参数

  • 传递给func函数的参数
  • 通常是包含上下文信息的结构体指针

wq_data:工作队列数据

  • 指向所属工作队列的内部数据结构
  • 用于work与特定工作队列的关联

timer:定时器

  • 用于实现延迟工作(delayed work)
  • 可以调度work在指定的时间后执行

2. __WORK_INITIALIZER 初始化宏

#define __WORK_INITIALIZER(n, f, d) {                           \.entry  = { &(n).entry, &(n).entry },                       \.func = (f),                                                \.data = (d),                                                \.timer = TIMER_INITIALIZER(NULL, 0, 0),                     \
}

.entry = { &(n).entry, &(n).entry }

  • 初始化双向链表,前后指针都指向自己
  • 表示一个空的链表节点

.func = (f)

  • 设置工作函数指针

.data = (d)

  • 设置工作函数参数

.timer = TIMER_INITIALIZER(NULL, 0, 0)

  • 初始化定时器,回调函数为NULL,到期时间为0
  • 表示定时器未激活状态

3. DECLARE_WORK 声明宏

#define DECLARE_WORK(n, f, d)                                   \struct work_struct n = __WORK_INITIALIZER(n, f, d)

使用示例:

DECLARE_WORK(my_work, my_work_function, &my_data);

展开结果:

struct work_struct my_work = {.entry  = { &my_work.entry, &my_work.entry },.func = my_work_function,.data = &my_data,.timer = TIMER_INITIALIZER(NULL, 0, 0),
};

作用:静态声明并初始化一个work结构体

4. PREPARE_WORK 准备宏

#define PREPARE_WORK(_work, _func, _data)                       \do {                                                        \(_work)->func = _func;                                  \(_work)->data = _data;                                  \} while (0)

用于重新配置已存在的work结构体

使用示例:

struct work_struct my_work;// 重新配置work
PREPARE_WORK(&my_work, new_function, new_data);

5. INIT_WORK 完全初始化宏

#define INIT_WORK(_work, _func, _data)                          \do {                                                        \INIT_LIST_HEAD(&(_work)->entry);                        \(_work)->pending = 0;                                   \PREPARE_WORK((_work), (_func), (_data));                \init_timer(&(_work)->timer);                            \} while (0)

INIT_LIST_HEAD(&(_work)->entry)

  • 初始化链表头,前后指针指向自己

(_work)->pending = 0

  • 清除挂起标志,表示work不在队列中

PREPARE_WORK((_work), (_func), (_data))

  • 设置工作函数和参数

init_timer(&(_work)->timer)

  • 初始化定时器结构

6. 与tasklet的关键区别

workqueue vs tasklet:

特性workqueuetasklet
执行上下文进程上下文软中断上下文
是否可以睡眠
调度方式内核线程软中断
优先级普通较高
适用场景长时间工作短时间工作

二、创建工作队列create_workqueue

#define create_workqueue(name) __create_workqueue((name), 0)
struct workqueue_struct *__create_workqueue(const char *name,int singlethread)
{int cpu, destroy = 0;struct workqueue_struct *wq;struct task_struct *p;BUG_ON(strlen(name) > 10);wq = kmalloc(sizeof(*wq), GFP_KERNEL);if (!wq)return NULL;memset(wq, 0, sizeof(*wq));wq->name = name;/* We don't need the distraction of CPUs appearing and vanishing. */lock_cpu_hotplug();if (singlethread) {INIT_LIST_HEAD(&wq->list);p = create_workqueue_thread(wq, 0);if (!p)destroy = 1;elsewake_up_process(p);} else {spin_lock(&workqueue_lock);list_add(&wq->list, &workqueues);spin_unlock(&workqueue_lock);for_each_online_cpu(cpu) {p = create_workqueue_thread(wq, cpu);if (p) {kthread_bind(p, cpu);wake_up_process(p);} elsedestroy = 1;}}unlock_cpu_hotplug();/** Was there any error during startup? If yes then clean up:*/if (destroy) {destroy_workqueue(wq);wq = NULL;}return wq;
}

1. 宏定义和函数声明

#define create_workqueue(name) __create_workqueue((name), 0)
  • 定义宏 create_workqueue,它调用 __create_workqueue 函数,第二个参数为 0
struct workqueue_struct *__create_workqueue(const char *name, int singlethread)
  • 函数声明,返回工作队列结构体指针
  • 参数:
    • name: 工作队列名称
    • singlethread: 标识是否创建单线程工作队列

2. 变量声明

int cpu, destroy = 0;
struct workqueue_struct *wq;
struct task_struct *p;
  • cpu: 循环计数器,用于遍历CPU
  • destroy: 错误标志,如果创建过程中出错则设为1
  • wq: 工作队列结构体指针
  • p: 任务结构体指针(代表内核线程)

3. 输入验证和内存分配

BUG_ON(strlen(name) > 10);
  • 内核调试宏,如果工作队列名称长度超过10个字符,会触发内核bug
  • 这是为了确保名称不会过长
wq = kmalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)return NULL;
  • 为工作队列结构体分配内核内存
  • GFP_KERNEL: 标准的内核内存分配标志
  • 如果分配失败,返回NULL
memset(wq, 0, sizeof(*wq));
  • 将分配的内存清零,初始化所有字段为0

4. 工作队列初始化

wq->name = name;
  • 设置工作队列的名称
lock_cpu_hotplug();
  • 锁定CPU热插拔
  • 防止在创建工作队列过程中有CPU被加入或移除,保证操作的原子性

5. 单线程工作队列创建

if (singlethread) {INIT_LIST_HEAD(&wq->list);p = create_workqueue_thread(wq, 0);if (!p)destroy = 1;elsewake_up_process(p);
}
  • 如果要求创建单线程工作队列:
    • INIT_LIST_HEAD(&wq->list): 初始化链表头
    • create_workqueue_thread(wq, 0): 创建工作队列线程,绑定到任意CPU
    • 如果创建线程失败,设置 destroy = 1
    • 如果成功,wake_up_process(p): 唤醒创建的线程使其开始运行

6. 多线程工作队列创建

else {spin_lock(&workqueue_lock);list_add(&wq->list, &workqueues);spin_unlock(&workqueue_lock);
  • 多线程工作队列的情况:
    • 获取工作队列锁
    • 将新工作队列添加到全局工作队列链表 workqueues
    • 释放锁
    for_each_online_cpu(cpu) {p = create_workqueue_thread(wq, cpu);if (p) {kthread_bind(p, cpu);wake_up_process(p);} elsedestroy = 1;}
}
  • for_each_online_cpu(cpu): 遍历所有在线的CPU核心
  • 为每个CPU核心创建一个工作线程:
    • create_workqueue_thread(wq, cpu): 创建工作队列线程
    • kthread_bind(p, cpu): 将线程绑定到特定的CPU核心
    • wake_up_process(p): 唤醒线程
    • 如果任何线程创建失败,设置 destroy = 1

7. 清理和返回

unlock_cpu_hotplug();
  • 解锁CPU热插拔,允许CPU状态变化
if (destroy) {destroy_workqueue(wq);wq = NULL;
}
return wq;
  • 检查是否有错误发生:
    • 如果有错误(destroy == 1),销毁已创建的工作队列,返回NULL
    • 如果没有错误,返回创建的工作队列指针

8. 关键点总结

  1. 两种工作队列

    • 单线程:只有一个工作线程处理所有工作
    • 多线程:每个CPU核心都有一个工作线程
  2. CPU绑定:多线程工作队列中,每个线程绑定到特定CPU,有利于缓存局部性

  3. 错误处理:采用"全部成功或全部回滚"的策略

  4. 并发安全:使用CPU热插拔锁和工作队列锁保证操作原子性

三、创建工作队列线程create_workqueue_thread

static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,int cpu)
{struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;struct task_struct *p;spin_lock_init(&cwq->lock);cwq->wq = wq;cwq->thread = NULL;cwq->insert_sequence = 0;cwq->remove_sequence = 0;INIT_LIST_HEAD(&cwq->worklist);init_waitqueue_head(&cwq->more_work);init_waitqueue_head(&cwq->work_done);if (is_single_threaded(wq))p = kthread_create(worker_thread, cwq, "%s", wq->name);elsep = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);if (IS_ERR(p))return NULL;cwq->thread = p;return p;
}

1. 函数声明和变量定义

static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, int cpu)
  • static: 函数只在当前文件中可见
  • 返回 struct task_struct *: 指向新创建的内核线程的指针
  • 参数:
    • wq: 工作队列结构体指针
    • cpu: 该线程要绑定的CPU编号
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
struct task_struct *p;
  • cwq: 指向特定CPU的工作队列结构体
    • wq->cpu_wqstruct cpu_workqueue_struct 数组
    • + cpu 通过CPU编号计算偏移量,获取对应CPU的工作队列
  • p: 将要创建的内核线程的任务结构体指针

2. 初始化CPU工作队列结构体

spin_lock_init(&cwq->lock);
  • 初始化自旋锁,用于保护这个CPU工作队列的并发访问
  • 这个锁会保护 worklist 和其他字段的访问
cwq->wq = wq;
  • 设置指向父工作队列的指针,建立反向引用关系
cwq->thread = NULL;
  • 初始时线程指针设为NULL,创建成功后再赋值
cwq->insert_sequence = 0;
cwq->remove_sequence = 0;
  • 初始化序列号计数器:
    • insert_sequence: 工作项插入序列号,用于跟踪工作提交顺序
    • remove_sequence: 工作项执行序列号,用于跟踪工作完成顺序
  • 这些序列号可用于调试和统计

3. 初始化链表和等待队列

INIT_LIST_HEAD(&cwq->worklist);
  • 初始化工作项链表头
  • 所有提交到该CPU工作队列的工作项都会链接到这个链表
init_waitqueue_head(&cwq->more_work);
  • 初始化"有更多工作"等待队列
  • 当工作线程空闲时,会在这个队列上睡眠,等待新工作项的加入
init_waitqueue_head(&cwq->work_done);
  • 初始化"工作完成"等待队列
  • 用于同步操作,当需要等待特定工作项完成时使用

4. 创建内核线程

if (is_single_threaded(wq))p = kthread_create(worker_thread, cwq, "%s", wq->name);
elsep = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
  • is_single_threaded(wq): 判断是否是单线程工作队列
  • 单线程工作队列
    • 线程命名格式:"%s",只使用工作队列名称
    • 例如:"events"
  • 多线程工作队列
    • 线程命名格式:"%s/%d",包含工作队列名称和CPU编号
    • 例如:"kblockd/0", "kblockd/1"
  • kthread_create 参数:
    • worker_thread: 线程的主函数(工作线程的入口点)
    • cwq: 传递给线程函数的参数
    • 名称格式字符串和参数

5. 错误检查和线程指针设置

if (IS_ERR(p))return NULL;
  • IS_ERR(p): 检查线程创建是否失败
  • kthread_create 失败时返回错误指针,这里转换为返回 NULL
cwq->thread = p;
return p;
  • 将新创建的线程指针保存到CPU工作队列结构体中
  • 返回新创建的线程指针

四、工作队列线程的主函数worker_thread

static int worker_thread(void *__cwq)
{struct cpu_workqueue_struct *cwq = __cwq;DECLARE_WAITQUEUE(wait, current);struct k_sigaction sa;sigset_t blocked;current->flags |= PF_NOFREEZE;set_user_nice(current, -10);/* Block and flush all signals */sigfillset(&blocked);sigprocmask(SIG_BLOCK, &blocked, NULL);flush_signals(current);/* SIG_IGN makes children autoreap: see do_notify_parent(). */sa.sa.sa_handler = SIG_IGN;sa.sa.sa_flags = 0;siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);set_current_state(TASK_INTERRUPTIBLE);while (!kthread_should_stop()) {add_wait_queue(&cwq->more_work, &wait);if (list_empty(&cwq->worklist))schedule();else__set_current_state(TASK_RUNNING);remove_wait_queue(&cwq->more_work, &wait);if (!list_empty(&cwq->worklist))run_workqueue(cwq);set_current_state(TASK_INTERRUPTIBLE);}__set_current_state(TASK_RUNNING);return 0;
}

1. 函数声明和变量定义

static int worker_thread(void *__cwq)
  • static: 函数只在当前文件中可见
  • 返回 int: 线程退出状态码
  • 参数 __cwq: 传递给线程的参数,实际上是 struct cpu_workqueue_struct *
struct cpu_workqueue_struct *cwq = __cwq;
DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa;
sigset_t blocked;
  • cwq: 将参数转换为CPU工作队列结构体指针
  • DECLARE_WAITQUEUE(wait, current): 声明并初始化等待队列条目
    • wait: 等待队列条目名称
    • current: 当前线程的任务结构体
  • sa: 信号处理动作结构体
  • blocked: 信号集,用于阻塞信号

2. 线程特性设置

current->flags |= PF_NOFREEZE;
  • 设置不冻结标志
    • PF_NOFREEZE: 防止线程在系统休眠时被冻结
    • 工作线程需要持续运行来处理工作,不能被挂起
set_user_nice(current, -10);
  • 设置线程优先级
    • nice值 = -10,表示较高的优先级
    • nice值范围:-20(最高优先级)到19(最低优先级)
    • 工作线程需要及时响应工作请求,所以设置较高优先级

3. 信号处理设置

sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
  • sigfillset(&blocked): 填充信号集,包含所有信号
  • sigprocmask(SIG_BLOCK, &blocked, NULL): 阻塞所有信号
    • 工作线程不应该被普通信号中断
  • flush_signals(current): 清空待处理的信号
sa.sa.sa_handler = SIG_IGN;
sa.sa.sa_flags = 0;
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
  • 忽略SIGCHLD信号
    • SIG_IGN: 忽略信号处理
    • SIGCHLD: 子进程状态改变信号
    • 设置SIGCHLD的处理动作为忽略,使得子进程自动回收
    • 工作线程不应该处理子进程,所以忽略这个信号

4. 主循环 - 等待和工作处理

set_current_state(TASK_INTERRUPTIBLE);
  • 设置当前线程状态为 可中断的睡眠
    • 线程可以接收信号并被唤醒
while (!kthread_should_stop()) {
  • 主循环条件kthread_should_stop() 检查是否收到停止信号
    • 当调用 kthread_stop() 时,这个函数返回true
    • 循环继续运行,直到收到停止请求

5. 等待机制

add_wait_queue(&cwq->more_work, &wait);
if (list_empty(&cwq->worklist))schedule();
else__set_current_state(TASK_RUNNING);
remove_wait_queue(&cwq->more_work, &wait);
  1. 加入等待队列

    add_wait_queue(&cwq->more_work, &wait);
    
    • 将当前线程添加到 more_work 等待队列
    • 当有新工作加入时,会唤醒这个队列上的线程
  2. 检查工作列表

    if (list_empty(&cwq->worklist))schedule();
    else__set_current_state(TASK_RUNNING);
    
    • 如果工作列表为空:调用 schedule() 让出CPU,线程进入睡眠
    • 如果工作列表不为空:设置状态为运行,继续处理工作
  3. 移除等待队列

    remove_wait_queue(&cwq->more_work, &wait);
    
    • 从等待队列中移除当前线程

6. 工作处理

if (!list_empty(&cwq->worklist))run_workqueue(cwq);
  • 再次检查工作列表
  • 如果列表不为空,调用 run_workqueue(cwq) 执行所有待处理的工作
set_current_state(TASK_INTERRUPTIBLE);
  • 处理完工作后,重新设置状态为可中断睡眠,准备下一次等待

7. 线程退出处理

__set_current_state(TASK_RUNNING);
return 0;
  • 当收到停止信号退出循环时:
    • 设置线程状态为运行(清理状态)
    • 返回0表示正常退出

五、工作队列的执行函数run_workqueue

static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
{unsigned long flags;/** Keep taking off work from the queue until* done.*/spin_lock_irqsave(&cwq->lock, flags);cwq->run_depth++;if (cwq->run_depth > 3) {/* morton gets to eat his hat */printk("%s: recursion depth exceeded: %d\n",__FUNCTION__, cwq->run_depth);dump_stack();}while (!list_empty(&cwq->worklist)) {struct work_struct *work = list_entry(cwq->worklist.next,struct work_struct, entry);void (*f) (void *) = work->func;void *data = work->data;list_del_init(cwq->worklist.next);spin_unlock_irqrestore(&cwq->lock, flags);BUG_ON(work->wq_data != cwq);clear_bit(0, &work->pending);f(data);spin_lock_irqsave(&cwq->lock, flags);cwq->remove_sequence++;wake_up(&cwq->work_done);}cwq->run_depth--;spin_unlock_irqrestore(&cwq->lock, flags);
}

1. 函数声明和变量

static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
  • void: 无返回值
  • 参数 cwq: CPU工作队列结构体指针
unsigned long flags;
  • 用于保存中断状态的变量,在加锁时使用

2. 加锁和递归深度检查

spin_lock_irqsave(&cwq->lock, flags);
  • 加锁并保存中断状态
    • 获取自旋锁保护工作队列
    • 禁用本地CPU中断
    • 保存当前中断状态到 flags
cwq->run_depth++;
  • 增加运行深度计数器
    • 记录当前递归深度
    • 用于检测可能的递归调用问题
if (cwq->run_depth > 3) {/* morton gets to eat his hat */printk("%s: recursion depth exceeded: %d\n",__FUNCTION__, cwq->run_depth);dump_stack();
}
  • 递归深度检查
    • 如果递归深度超过3,认为可能出现了递归调用问题
    • 打印错误信息和递归深度
    • dump_stack(): 输出调用栈用于调试

3. 工作处理循环

while (!list_empty(&cwq->worklist)) {
  • 循环条件:当工作链表不为空时继续循环
  • 处理所有待执行的工作项

4. 获取工作项信息

struct work_struct *work = list_entry(cwq->worklist.next,struct work_struct, entry);
  • 从链表中获取工作项
    • cwq->worklist.next: 链表中的第一个工作项
    • list_entry: 通过链表指针获取包含它的 work_struct 结构体
    • entry: 是 work_struct 中的链表节点字段名
void (*f) (void *) = work->func;
void *data = work->data;
  • 提取工作函数和数据
    • f: 工作函数的函数指针
    • data: 传递给工作函数的参数数据

5. 从链表中移除工作项

list_del_init(cwq->worklist.next);
  • 从链表中删除工作项
    • 将工作项从工作链表中移除
    • _init 版本会同时初始化链表节点,使其处于未链接状态
spin_unlock_irqrestore(&cwq->lock, flags);
  • 临时释放锁
    • 在真正执行工作函数前释放锁
    • 恢复中断状态
    • 这样其他CPU可以继续向工作队列添加新工作

6. 执行工作函数

BUG_ON(work->wq_data != cwq);
  • 调试检查
    • 验证工作项确实属于当前工作队列
    • 如果不匹配,触发内核BUG
clear_bit(0, &work->pending);
  • 清除pending标志
    • 将工作项的pending位清零
    • 表示这个工作项正在执行或已经完成
f(data);
  • 执行实际的工作函数
    • 调用工作项注册的函数
    • 传递数据参数
    • 这是工作队列的核心功能

7. 重新加锁和后续处理

spin_lock_irqsave(&cwq->lock, flags);
  • 重新加锁
    • 工作函数执行完成后,重新获取锁
    • 准备更新队列状态
cwq->remove_sequence++;
  • 增加移除序列号
    • 每次完成一个工作项就递增
    • 用于跟踪工作完成的顺序
wake_up(&cwq->work_done);
  • 唤醒等待者
    • 如果有线程在等待工作完成,唤醒它们
    • 用于实现同步工作提交

循环结束和清理

cwq->run_depth--;
  • 减少运行深度:恢复递归深度计数器
spin_unlock_irqrestore(&cwq->lock, flags);
  • 最终释放锁:处理完成,释放锁并恢复中断

六、清理工作队列线程cleanup_workqueue_thread

static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
{struct cpu_workqueue_struct *cwq;unsigned long flags;struct task_struct *p;cwq = wq->cpu_wq + cpu;spin_lock_irqsave(&cwq->lock, flags);p = cwq->thread;cwq->thread = NULL;spin_unlock_irqrestore(&cwq->lock, flags);if (p)kthread_stop(p);
}

1. 函数声明和变量定义

static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
  • static: 表示这个函数只在当前文件中可见
  • void: 函数没有返回值
  • 参数:
    • wq: 要清理的工作队列结构体指针
    • cpu: 指定要清理哪个CPU对应的工作线程
struct cpu_workqueue_struct *cwq;
unsigned long flags;
struct task_struct *p;
  • cwq: 指向特定CPU的工作队列结构体
  • flags: 用于保存中断状态,在加锁时使用
  • p: 指向内核线程任务结构体的指针

2. 获取CPU工作队列指针

cwq = wq->cpu_wq + cpu;
  • wq->cpu_wq 是一个 struct cpu_workqueue_struct 数组
  • + cpu 通过CPU编号计算偏移量,获取对应CPU的工作队列结构体

3. 安全地获取并清除线程指针

spin_lock_irqsave(&cwq->lock, flags);
  • 加锁并保存中断状态
    • spin_lock_irqsave: 获取自旋锁,同时禁用本地CPU的中断,并保存当前中断状态到 flags
    • 这是为了防止在操作线程指针时被中断或其他CPU核心干扰
p = cwq->thread;
cwq->thread = NULL;
  • p = cwq->thread: 保存当前线程指针到局部变量 p
  • cwq->thread = NULL: 立即将工作队列中的线程指针设为NULL
  • 这样做是为了避免竞争条件:在停止线程的过程中,不会有新的工作被加入到队列
spin_unlock_irqrestore(&cwq->lock, flags);
  • 解锁并恢复中断状态
    • 释放自旋锁
    • 恢复之前保存的中断状态(重新启用中断)
    • 现在锁保护的范围结束

4. 停止工作线程

if (p)kthread_stop(p);
  • if (p): 检查是否真的存在要停止的线程(避免空指针)
  • kthread_stop(p): 停止内核线程
    • 这个函数会发送停止信号给线程
    • 等待线程完成当前工作并退出
    • 同步操作,会阻塞直到线程完全停止

七、销毁工作队列destroy_workqueue

void destroy_workqueue(struct workqueue_struct *wq)
{int cpu;flush_workqueue(wq);/* We don't need the distraction of CPUs appearing and vanishing. */lock_cpu_hotplug();if (is_single_threaded(wq))cleanup_workqueue_thread(wq, 0);else {for_each_online_cpu(cpu)cleanup_workqueue_thread(wq, cpu);spin_lock(&workqueue_lock);list_del(&wq->list);spin_unlock(&workqueue_lock);}unlock_cpu_hotplug();kfree(wq);
}

1. 函数声明和变量

void destroy_workqueue(struct workqueue_struct *wq)
  • void: 无返回值
  • 参数 wq: 要销毁的工作队列结构体指针
int cpu;
  • 循环计数器,用于遍历所有CPU

2. 刷新工作队列

flush_workqueue(wq);
  • 等待所有未完成的工作项执行完毕
    • 这个函数会阻塞,直到工作队列中所有待处理的工作项都完成执行
    • 确保在销毁工作队列前,所有已提交的工作都处理完毕
    • 防止工作项在队列销毁后仍尝试执行

3. 锁定CPU热插拔

lock_cpu_hotplug();
  • 锁定CPU热插拔
    • 防止在销毁过程中有CPU被加入或移除系统
    • 确保CPU状态的稳定性,避免竞争条件

4. 单线程工作队列清理

if (is_single_threaded(wq))cleanup_workqueue_thread(wq, 0);
  • 判断工作队列类型is_single_threaded(wq) 检查是否为单线程工作队列
  • 单线程工作队列清理
    • 调用 cleanup_workqueue_thread(wq, 0) 清理工作线程

5. 多线程工作队列清理

else {for_each_online_cpu(cpu)cleanup_workqueue_thread(wq, cpu);
  • 多线程工作队列:为每个在线CPU创建工作线程
  • 遍历所有在线CPUfor_each_online_cpu(cpu) 宏遍历系统中所有活跃的CPU
  • 清理每个CPU的工作线程:对每个CPU调用 cleanup_workqueue_thread(wq, cpu)
    spin_lock(&workqueue_lock);list_del(&wq->list);spin_unlock(&workqueue_lock);
}
  • 从全局工作队列列表中移除
    • spin_lock(&workqueue_lock): 获取工作队列全局锁
    • list_del(&wq->list): 从全局工作队列链表 workqueues 中删除当前工作队列
    • spin_unlock(&workqueue_lock): 释放全局锁
  • 为什么需要这个操作
    • 多线程工作队列在创建时被添加到全局链表
    • 销毁时必须从链表中移除,防止后续被错误访问

6. 解锁和内存释放

unlock_cpu_hotplug();
  • 解锁CPU热插拔
    • 允许CPU状态再次变化
    • 与前面的 lock_cpu_hotplug() 配对使用
kfree(wq);
  • 释放工作队列内存
    • kfree(): 内核内存释放函数
    • 释放之前分配的 struct workqueue_struct 内存空间
    • 这是销毁过程的最后一步

7. 关键设计原理

7.1. 安全销毁流程

1. 等待所有工作完成 (flush_workqueue)
2. 锁定系统状态 (lock_cpu_hotplug)
3. 清理工作线程 (cleanup_workqueue_thread)
4. 从全局结构移除 (list_del)
5. 解锁系统状态 (unlock_cpu_hotplug)
6. 释放内存 (kfree)

八、刷新工作队列flush_workqueue

void fastcall flush_workqueue(struct workqueue_struct *wq)
{might_sleep();if (is_single_threaded(wq)) {/* Always use cpu 0's area. */flush_cpu_workqueue(wq->cpu_wq + 0);} else {int cpu;lock_cpu_hotplug();for_each_online_cpu(cpu)flush_cpu_workqueue(wq->cpu_wq + cpu);unlock_cpu_hotplug();}
}

1. 函数声明

void fastcall flush_workqueue(struct workqueue_struct *wq)
  • void: 无返回值
  • fastcall: 函数调用约定,表示通过寄存器传递参数(x86架构优化)
  • 参数 wq: 要刷新的工作队列结构体指针

2. 睡眠检查

might_sleep();
  • 可能睡眠的声明
    • 这是一个调试辅助宏,告诉内核这个函数可能会睡眠(阻塞)
    • 如果在原子上下文(如中断处理)中调用,会发出警告
    • 因为 flush_workqueue 需要等待工作完成,可能会长时间阻塞

3. 单线程工作队列处理

if (is_single_threaded(wq)) {/* Always use cpu 0's area. */flush_cpu_workqueue(wq->cpu_wq + 0);
}
  • 判断工作队列类型is_single_threaded(wq) 检查是否为单线程工作队列
  • 单线程工作队列刷新
    • 总是使用CPU 0的区域
    • wq->cpu_wq + 0: 获取CPU 0的工作队列结构体
    • flush_cpu_workqueue(): 刷新单个CPU的工作队列,等待该CPU上所有工作完成

4. 多线程工作队列处理

else {int cpu;lock_cpu_hotplug();
  • 多线程工作队列:进入else分支
  • 声明CPU计数器
  • 锁定CPU热插拔
    • 防止在刷新过程中有CPU被加入或移除
    • 确保CPU列表的稳定性
    for_each_online_cpu(cpu)flush_cpu_workqueue(wq->cpu_wq + cpu);
  • 遍历所有在线CPUfor_each_online_cpu(cpu) 宏遍历系统中所有活跃的CPU
  • 刷新每个CPU的工作队列
    • wq->cpu_wq + cpu: 获取对应CPU的工作队列结构体
    • flush_cpu_workqueue(): 刷新指定CPU的工作队列
    unlock_cpu_hotplug();
}
  • 解锁CPU热插拔:允许CPU状态再次变化

九、刷新单个CPU工作队列flush_cpu_workqueue

static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{if (cwq->thread == current) {/** Probably keventd trying to flush its own queue. So simply run* it by hand rather than deadlocking.*/run_workqueue(cwq);} else {DEFINE_WAIT(wait);long sequence_needed;spin_lock_irq(&cwq->lock);sequence_needed = cwq->insert_sequence;while (sequence_needed - cwq->remove_sequence > 0) {prepare_to_wait(&cwq->work_done, &wait,TASK_UNINTERRUPTIBLE);spin_unlock_irq(&cwq->lock);schedule();spin_lock_irq(&cwq->lock);}finish_wait(&cwq->work_done, &wait);spin_unlock_irq(&cwq->lock);}
}

1. 函数声明

static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  • static: 函数只在当前文件中可见
  • void: 无返回值
  • 参数 cwq: 要刷新的CPU工作队列结构体指针

2. 特殊情况:当前线程就是工作线程

if (cwq->thread == current) {/** Probably keventd trying to flush its own queue. So simply run* it by hand rather than deadlocking.*/run_workqueue(cwq);
}
cwq->thread == current
  • 检查是否在工作线程内部调用刷新

    • cwq->thread: 该工作队列的工作线程
    • current: 当前正在执行的线程
    • 如果相等,说明在工作线程内部尝试刷新自己的工作队列
  • 注释说明:可能是 keventd(内核事件守护进程)在刷新自己的队列

  • 直接执行工作队列run_workqueue(cwq)

  • 避免死锁:如果等待自己完成工作,会导致死锁

3. 正常情况:外部线程刷新

else {DEFINE_WAIT(wait);long sequence_needed;
  • DEFINE_WAIT(wait): 定义并初始化等待队列条目
  • sequence_needed: 需要等待的序列号
spin_lock_irq(&cwq->lock);
sequence_needed = cwq->insert_sequence;
  • 加锁:保护工作队列状态
  • 获取当前插入序列号cwq->insert_sequence
  • 序列号语义
    • insert_sequence: 工作项插入时的序列号(每次插入递增)
    • remove_sequence: 工作项完成时的序列号(每次完成递增)
    • 如果 insert_sequence > remove_sequence,说明有未完成的工作

4. 等待循环

while (sequence_needed - cwq->remove_sequence > 0) {
  • sequence_needed - cwq->remove_sequence > 0
  • 表示还有工作项未完成:
    • 我们在 sequence_needed 时刻记录插入序列号
    • 只有当 remove_sequence 达到这个值时,所有之前的工作才完成

5. 等待机制

prepare_to_wait(&cwq->work_done, &wait, TASK_UNINTERRUPTIBLE);
spin_unlock_irq(&cwq->lock);
schedule();
spin_lock_irq(&cwq->lock);
  1. 准备等待

    prepare_to_wait(&cwq->work_done, &wait, TASK_UNINTERRUPTIBLE);
    
    • 将当前线程添加到 work_done 等待队列
    • TASK_UNINTERRUPTIBLE: 不可中断的睡眠状态(不能被信号唤醒)
  2. 释放锁并调度

    spin_unlock_irq(&cwq->lock);
    schedule();
    
    • 释放锁,让工作线程可以继续执行和更新状态
    • schedule(): 主动让出CPU,进入睡眠状态
  3. 被唤醒后重新加锁

    spin_lock_irq(&cwq->lock);
    
    • 当工作线程完成工作并唤醒等待者时,重新获取锁
    • 检查条件是否满足

6. 清理等待状态

finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock);
  • 完成等待finish_wait() 从等待队列中移除当前线程
  • 最终释放锁:处理完成,释放锁

十、提交工作queue_work

static void __queue_work(struct cpu_workqueue_struct *cwq,struct work_struct *work)
{unsigned long flags;spin_lock_irqsave(&cwq->lock, flags);work->wq_data = cwq;list_add_tail(&work->entry, &cwq->worklist);cwq->insert_sequence++;wake_up(&cwq->more_work);spin_unlock_irqrestore(&cwq->lock, flags);
}
int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
{int ret = 0, cpu = get_cpu();if (!test_and_set_bit(0, &work->pending)) {if (unlikely(is_single_threaded(wq)))cpu = 0;BUG_ON(!list_empty(&work->entry));__queue_work(wq->cpu_wq + cpu, work);ret = 1;}put_cpu();return ret;
}

1. __queue_work 函数 - 实际工作入队操作

static void __queue_work(struct cpu_workqueue_struct *cwq,struct work_struct *work)

1.1. 变量声明和加锁

unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
  • 保存中断状态并加锁,保护工作队列的并发访问

1.2. 设置工作项属性

work->wq_data = cwq;
  • 建立工作项与工作队列的关联
    • wq_data 指向所属的CPU工作队列
    • 用于后续验证和工作执行时的上下文信息
list_add_tail(&work->entry, &cwq->worklist);
  • 将工作项添加到队列尾部
    • &work->entry: 工作项中的链表节点
    • &cwq->worklist: CPU工作队列的工作链表
    • _tail 表示添加到链表末尾

1.3. 更新序列号和唤醒工作线程

cwq->insert_sequence++;
  • 递增插入序列号
    • 用于跟踪工作提交的顺序
    • flush_workqueue 中用于确定需要等待的工作范围
wake_up(&cwq->more_work);
  • 唤醒工作线程
    • 如果工作线程在 more_work 等待队列上睡眠,唤醒它
    • 使得工作线程可以立即开始处理新加入的工作
spin_unlock_irqrestore(&cwq->lock, flags);
  • 释放锁并恢复中断状态

2. queue_work 函数 - 外部接口

int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)

2.1. 变量声明和CPU绑定

int ret = 0, cpu = get_cpu();
  • ret: 返回值,0表示失败,1表示成功
  • cpu = get_cpu(): 禁用内核抢占并获取当前CPU编号
    • 防止在操作过程中被调度到其他CPU
    • 确保操作的原子性

2.2. 工作项状态检查

if (!test_and_set_bit(0, &work->pending)) {
  • 原子地测试并设置pending标志
    • test_and_set_bit: 原子操作,测试位0的值并设置为1
    • 如果原来就是1,说明工作项已经在队列中,返回0(失败)
    • 如果原来是0,设置为1并返回0,进入if块

2.3. 单线程工作队列处理

if (unlikely(is_single_threaded(wq)))cpu = 0;
  • 单线程工作队列特殊处理
    • 单线程工作队列总是使用CPU 0的工作队列
    • 覆盖之前获取的当前CPU编号

2.4. 工作项状态验证

BUG_ON(!list_empty(&work->entry));
  • 调试检查:验证工作项不在任何链表中
  • 如果 work->entry 不为空,说明工作项已被使用,触发内核BUG
  • 防止重复提交同一个工作项

2.5. 实际入队操作

__queue_work(wq->cpu_wq + cpu, work);
ret = 1;
  • 调用内部函数将工作项加入队列
  • 设置返回值为1,表示成功

2.6. 清理和返回

put_cpu();
return ret;
  • put_cpu(): 重新启用内核抢占
  • 与前面的 get_cpu() 配对使用
  • 返回操作结果

十一、提交延迟工作queue_delayed_work

int fastcall queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
{int ret = 0;struct timer_list *timer = &work->timer;if (!test_and_set_bit(0, &work->pending)) {BUG_ON(timer_pending(timer));BUG_ON(!list_empty(&work->entry));/* This stores wq for the moment, for the timer_fn */work->wq_data = wq;timer->expires = jiffies + delay;timer->data = (unsigned long)work;timer->function = delayed_work_timer_fn;add_timer(timer);ret = 1;}return ret;
}

1. 函数声明

int fastcall queue_delayed_work(struct workqueue_struct *wq,struct work_struct *work, unsigned long delay)
  • fastcall: 函数调用约定优化(x86架构)
  • 返回 int: 操作结果(1表示成功,0表示失败)
  • 参数:
    • wq: 目标工作队列
    • work: 要延迟执行的工作项
    • delay: 延迟时间(以jiffies为单位)

2. 变量声明

int ret = 0;
struct timer_list *timer = &work->timer;
  • ret: 返回值,初始化为0(失败)
  • timer: 指向工作项内嵌的定时器结构

3. 工作项状态检查

if (!test_and_set_bit(0, &work->pending)) {
  • 原子地测试并设置pending标志
    • test_and_set_bit(0, &work->pending): 原子操作
    • 如果原来pending位是0(工作项空闲),设置为1并返回0,进入if块
    • 如果原来是1(工作项已提交),返回1,跳过if块,直接返回0

4. 调试断言检查

BUG_ON(timer_pending(timer));
  • 检查定时器状态
    • timer_pending(timer): 检查定时器是否已经在等待队列中
    • 如果定时器已激活,触发内核BUG
BUG_ON(!list_empty(&work->entry));
  • 检查工作项链表状态
    • 验证工作项不在任何工作队列的链表中
    • 如果工作项已在链表中,触发内核BUG
    • 确保工作项处于干净状态

5. 设置工作项临时数据

work->wq_data = wq;
  • 临时存储工作队列指针
    • 这个字段在定时器回调函数中会用到
    • 注意:这里存储的是 workqueue_struct,而不是 cpu_workqueue_struct
    • 在定时器到期时,需要根据这个信息找到正确的工作队列

6. 配置定时器

timer->expires = jiffies + delay;
  • 设置定时器到期时间
    • jiffies: 系统启动以来的时钟滴答数
    • delay: 延迟的时钟滴答数
timer->data = (unsigned long)work;
  • 设置定时器回调数据
    • 将工作项指针转换为unsigned long存储
    • 在定时器回调函数中,通过这个数据找到对应的工作项
timer->function = delayed_work_timer_fn;
  • 设置定时器回调函数
    • delayed_work_timer_fn 是处理延迟工作的标准函数
    • 当定时器到期时,这个函数会被调用

7. 激活定时器

add_timer(timer);
  • 将定时器添加到系统定时器队列
    • 从此时开始,定时器开始倒计时
    • jiffies 达到 expires 值时,回调函数会被调用

8. 设置返回值并返回

ret = 1;
  • 设置返回值为1,表示成功提交延迟工作
return ret;
  • 返回操作结果

十二、延迟工作定时器的回调函数delayed_work_timer_fn

static void delayed_work_timer_fn(unsigned long __data)
{struct work_struct *work = (struct work_struct *)__data;struct workqueue_struct *wq = work->wq_data;int cpu = smp_processor_id();if (unlikely(is_single_threaded(wq)))cpu = 0;__queue_work(wq->cpu_wq + cpu, work);
}

1. 函数声明

static void delayed_work_timer_fn(unsigned long __data)
  • static: 函数只在当前文件中可见
  • void: 无返回值
  • 参数 __data: 定时器回调数据,实际上是指向 work_struct 的指针

2. 获取工作项指针

struct work_struct *work = (struct work_struct *)__data;
  • 从定时器数据恢复工作项指针
    • __data 是在 queue_delayed_work 中设置的:timer->data = (unsigned long)work
    • 通过类型转换将 unsigned long 恢复为 struct work_struct *
    • 这样就找到了对应的延迟工作项

3. 获取工作队列信息

struct workqueue_struct *wq = work->wq_data;
  • 从工作项中获取工作队列指针
    • wq_data 是在 queue_delayed_work 中设置的:work->wq_data = wq
    • 这个字段临时存储了目标工作队列的信息
    • 在定时器到期时,需要这个信息来知道将工作提交到哪个队列

4. 确定当前CPU

int cpu = smp_processor_id();
  • 获取当前执行定时器的CPU编号
    • smp_processor_id(): 返回当前代码正在执行的CPU编号
    • 定时器回调可能在任何一个CPU上执行(取决于定时器到期时的调度)
    • 这个信息用于确定将工作提交到哪个CPU的工作队列

5. 单线程工作队列处理

if (unlikely(is_single_threaded(wq)))cpu = 0;
  • 检查工作队列类型
    • is_single_threaded(wq): 判断是否为单线程工作队列
  • 单线程工作队列特殊处理
    • 如果是单线程工作队列,强制使用CPU 0
    • 覆盖之前获取的当前CPU编号
    • 单线程工作队列只有一个工作线程,通常绑定到CPU 0

6. 实际提交工作到队列

__queue_work(wq->cpu_wq + cpu, work);
  • 将工作项提交到指定CPU的工作队列
    • wq->cpu_wq + cpu: 计算目标CPU的工作队列结构体地址
    • work: 要提交的工作项
    • __queue_work: 内部函数,实际执行工作入队操作

十三、取消延迟工作cancel_delayed_work

static inline int cancel_delayed_work(struct work_struct *work)
{int ret;ret = del_timer_sync(&work->timer);if (ret)clear_bit(0, &work->pending);return ret;
}

1. 函数声明

static inline int cancel_delayed_work(struct work_struct *work)
  • static inline: 内联函数,减少函数调用开销
  • 返回 int: 操作结果(1表示成功取消,0表示失败)
  • 参数 work: 要取消的延迟工作项指针

2. 变量声明

int ret;
  • ret: 用于存储取消操作的返回值

3. 取消定时器

ret = del_timer_sync(&work->timer);
  • 同步删除定时器
    • del_timer_sync(&work->timer): 安全地删除工作项关联的定时器
    • _sync 版本保证:当函数返回时,定时器处理程序不会在任何CPU上运行
    • 返回值:
      • 1: 成功删除了一个活跃的定时器(定时器尚未到期)
      • 0: 定时器不活跃(可能已经到期或已被删除)

4. 清理工作项状态

if (ret)clear_bit(0, &work->pending);
  • 条件清理pending标志
    • 只有在成功取消定时器时(ret == 1)才清理pending标志
    • clear_bit(0, &work->pending): 将pending标志位清零
    • 表示工作项不再处于等待执行状态

5. 返回结果

return ret;
  • 返回取消操作的结果:
    • 1: 成功取消了尚未执行的延迟工作
    • 0: 取消失败(工作可能已经在执行或已完成)
http://www.dtcms.com/a/465947.html

相关文章:

  • 模板建站和开发网站区别wordpress 页面瀑布流
  • [C# starter-kit] 身份验证与授权 Identity JWT
  • C#通讯关键类的API
  • 网站开发说明书天元建设集团有限公司申请破产了吗
  • 分布式单例模式在微服务架构中的关键作用与实践
  • 网站footer模板建设银行甘肃省行网站
  • 网站建设和电商区别wordpress创建角色
  • 网站开发软件公司网站建设运营公司
  • Java开发环境搭建及基础练习
  • 【三维重建】即插即用的3DGS的PDE优化:高质量渲染和重建
  • TDS:连接器漫谈之可信空间中的沉默契约与隐秘通道
  • AI Compass前沿速览:DeepSeek-V3.2、Sora 2、Imagine v0.9、LONGLIVE–英伟达、xLLM、OpenAgents
  • 为什么要建设图书馆网站网站建设情况的报告
  • linux之 remoteproc 内核实现源码分析
  • vue 识别一个高亮组件全局
  • RFID与机械臂协同:构建智能产品溯源新范式
  • 生活馆网站开发背景网站制作如皋
  • 第二章数据预处理:公式Python代码实现
  • CSS的重绘和重排是什么?如何减少css的重绘和重排?
  • 哪个网站可以做专业兼职程序员用的编程软件
  • 国内云服务器免费优化培训学校
  • Linux多进程:查看当前哪个进程正在使用共享数据区的互斥锁文件
  • 【MySQL SQL语句实战】设计表,建表语句,数据插入,实战案例包括:简单查询、汇总统计、分组分析、多表关联、窗口函数
  • 系统设计-高频面试题(更新中...)
  • IntelliJ IDEA使用经验(十五):SQL脚本文件转为数据库控制台
  • 【实时Linux实战系列】内核跟踪点(Tracepoints)与用户态探针(UST)的协同调试
  • Linux 进程通信——消息队列与信号量
  • 备案ip 查询网站查询网站小说一键生成动漫
  • 做养生产品哪个网站好嘉兴网站建设网址
  • Vue3中实现全局双向绑定变量