linux内核中kfifo实现队列
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、kfifo是什么?
- 二、使用步骤
- 总结
前言
提示:这里可以添加本文要记录的大概内容:
linux内核中kfifo实现队列
内核版本
5.10.92
提示:以下是本篇文章正文内容,下面案例可供参考
一、kfifo是什么?
kfifo 是 Linux 内核中的一个数据结构,用于实现先进先出(First In First Out, FIFO)队列。它特别适用于需要高效处理数据流的场景,比如在内核模块间传递数据、网络协议栈等。kfifo 提供了无锁编程的支持,在某些情况下可以显著提高性能,因为它们允许在生产者和消费者之间同步而不需要使用传统的锁机制。
kfifo申请的是字节大小(必须是 2 的次幂,不是可能被内核自动修改成符合条件的)。
虽然 kfifo 可以避免一些锁定问题(单生产者和消费者不用加锁),但在多生产者或多消费者场景下,可能仍然需要适当的同步机制来防止竞争条件。
二、使用步骤
代码如下(示例):
接口文件:可以做api用
#include <linux/kfifo.h>
#include <linux/slab.h>
#include <linux/mutex.h>
#include <linux/wait.h>
#include <linux/fs.h>
#include <linux/miscdevice.h>
#include <linux/poll.h>
#include "cq_vgpu_kfifo.h"/*** Initialize a kfifo queue for a misc device* @param queue: pointer to vgpu_kfifo_queue structure* @param item_size: size of each data item* @param max_items: maximum number of items in queue* @return 0 on success, negative error code on failure*/
int vgpu_kfifo_queue_create(struct vgpu_kfifo_queue *queue,unsigned int item_size,unsigned int max_items)
{int ret;if (!queue)return -EINVAL;printk(KERN_INFO "vgpu_kfifo_queue_create: item_size=%u, max_items=%u\n", item_size, max_items);// Initialize the kfiforet = kfifo_alloc(&queue->fifo, item_size * max_items, GFP_KERNEL);if (ret) {printk(KERN_ERR "Failed to allocate kfifo: %d\n", ret);return ret;}// Initialize synchronization primitivesmutex_init(&queue->lock);init_waitqueue_head(&queue->read_wait);init_waitqueue_head(&queue->write_wait);// Set queue parametersqueue->max_items = max_items;queue->item_size = item_size;queue->is_open = true;return 0;
}/*** Destroy and free a kfifo queue* @param queue: pointer to vgpu_kfifo_queue structure*/
void vgpu_kfifo_queue_destroy(struct vgpu_kfifo_queue *queue)
{if (!queue)return;queue->is_open = false;// Wake up any waiting processeswake_up_interruptible(&queue->read_wait);wake_up_interruptible(&queue->write_wait);// Free the kfifo bufferkfifo_free(&queue->fifo);// Clean up synchronization primitivesmutex_destroy(&queue->lock);
}/*** Put data into the queue (producer) - blocking mode* @param queue: pointer to vgpu_kfifo_queue structure* @param data: pointer to data to be queued* @return number of items queued (1) or negative error code*/
int vgpu_kfifo_enqueue(struct vgpu_kfifo_queue *queue, const void *data)
{int ret;if (!queue || !data || !queue->is_open)return -EINVAL;// Acquire lockif (mutex_lock_interruptible(&queue->lock))return -ERESTARTSYS;// Wait while queue is fullwhile (kfifo_is_full(&queue->fifo)) {mutex_unlock(&queue->lock);if(!queue->is_open) {printk("vgpu_kfifo_dequeue !queue->is_open\n");return -1;}printk(KERN_INFO "vgpu_kfifo_enqueue: queue is full, queue->is_open = %d\n", queue->is_open);// Wait for space to become availableif (wait_event_interruptible(queue->write_wait, !kfifo_is_full(&queue->fifo) || !queue->is_open)){printk(KERN_ERR "vgpu_kfifo_enqueue: wait_event_interruptible failed\n");return -ERESTARTSYS;}// Reacquire lockif (mutex_lock_interruptible(&queue->lock))return -ERESTARTSYS;}// Put data into queueret = kfifo_in(&queue->fifo, data, queue->item_size);// Release lockmutex_unlock(&queue->lock);// Wake up any waiting consumersif (ret > 0)wake_up_interruptible(&queue->read_wait);printk(KERN_ERR "vgpu_kfifo_enqueue: exit ret = %d\n", ret);return ret ? 1 : -EIO;
}/*** Put data into the queue (producer) - non-blocking mode* @param queue: pointer to vgpu_kfifo_queue structure* @param data: pointer to data to be queued* @return number of items queued (1), 0 if queue full, or negative error code*/
int vgpu_kfifo_enqueue_nonblock(struct vgpu_kfifo_queue *queue, const void *data)
{int ret;if (!queue || !data || !queue->is_open)return -EINVAL;// Acquire lockif (mutex_lock_interruptible(&queue->lock))return -ERESTARTSYS;// Check if queue is fullif (kfifo_is_full(&queue->fifo)) {mutex_unlock(&queue->lock);return 0; // Queue is full, but non-blocking}// Put data into queueret = kfifo_in(&queue->fifo, data, queue->item_size);// Release lockmutex_unlock(&queue->lock);// Wake up any waiting consumersif (ret > 0)wake_up_interruptible(&queue->read_wait);return ret ? 1 : -EIO;
}/*** Get data from the queue (consumer) - blocking mode* @param queue: pointer to vgpu_kfifo_queue structure* @param data: pointer to buffer for dequeued data* @return number of items dequeued (1) or negative error code*/
int vgpu_kfifo_dequeue(struct vgpu_kfifo_queue *queue, void *data)
{int ret;if (!queue || !data || !queue->is_open)return -EINVAL;// Acquire lockif (mutex_lock_interruptible(&queue->lock))return -ERESTARTSYS;//dump_stack();// Wait while queue is emptywhile (kfifo_is_empty(&queue->fifo)) {mutex_unlock(&queue->lock);if(!queue->is_open) {printk("vgpu_kfifo_dequeue !queue->is_open\n");return -1;}printk(KERN_INFO "vgpu_kfifo_dequeue: queue is empty, queue->is_open = %d\n", queue->is_open);// Wait for data to become availableif (wait_event_interruptible(queue->read_wait, !kfifo_is_empty(&queue->fifo) || !queue->is_open)) {printk(KERN_ERR "vgpu_kfifo_dequeue: wait_event_interruptible failed\n");return -ERESTARTSYS;}// Reacquire lockif (mutex_lock_interruptible(&queue->lock))return -ERESTARTSYS;}// Get data from queueret = kfifo_out(&queue->fifo, data, queue->item_size);// Release lockmutex_unlock(&queue->lock);// Wake up any waiting producersif (ret > 0)wake_up_interruptible(&queue->write_wait);printk(KERN_ERR "vgpu_kfifo_dequeue: exit ret = %d\n", ret);return ret ? 1 : -EIO;
}/*** Get data from the queue (consumer) - non-blocking mode* @param queue: pointer to vgpu_kfifo_queue structure* @param data: pointer to buffer for dequeued data* @return number of items dequeued (1), 0 if empty, or negative error code*/
int vgpu_kfifo_dequeue_nonblock(struct vgpu_kfifo_queue *queue, void *data)
{int ret;if (!queue || !data || !queue->is_open)return -EINVAL;// Acquire lockif (mutex_lock_interruptible(&queue->lock))return -ERESTARTSYS;// Check if queue is emptyif (kfifo_is_empty(&queue->fifo)) {mutex_unlock(&queue->lock);return 0; // Queue is empty, but non-blocking}// Get data from queueret = kfifo_out(&queue->fifo, data, queue->item_size);// Release lockmutex_unlock(&queue->lock);// Wake up any waiting producersif (ret > 0)wake_up_interruptible(&queue->write_wait);return ret ? 1 : -EIO;
}/*** Get the number of items in the queue* @param queue: pointer to vgpu_kfifo_queue structure* @return number of items in queue*/
unsigned int vgpu_kfifo_count(struct vgpu_kfifo_queue *queue)
{unsigned int count;if (!queue)return 0;if (mutex_lock_interruptible(&queue->lock))return 0;count = kfifo_len(&queue->fifo) / queue->item_size;mutex_unlock(&queue->lock);return count;
}/*** Check if queue is empty* @param queue: pointer to vgpu_kfifo_queue structure* @return true if empty, false otherwise*/
bool vgpu_kfifo_is_empty(struct vgpu_kfifo_queue *queue)
{bool empty;if (!queue)return true;if (mutex_lock_interruptible(&queue->lock))return true;empty = kfifo_is_empty(&queue->fifo);mutex_unlock(&queue->lock);return empty;
}/*** Check if queue is full* @param queue: pointer to vgpu_kfifo_queue structure* @return true if full, false otherwise*/
bool vgpu_kfifo_is_full(struct vgpu_kfifo_queue *queue)
{bool full;if (!queue)return false;if (mutex_lock_interruptible(&queue->lock))return false;full = kfifo_is_full(&queue->fifo);mutex_unlock(&queue->lock);return full;
}unsigned int vgpu_kfifo_poll(struct file *file, struct poll_table_struct *wait)
{struct vgpu_kfifo_queue *queue = file->private_data;unsigned int mask = 0;poll_wait(file, &queue->read_wait, wait);poll_wait(file, &queue->write_wait, wait);if (!vgpu_kfifo_is_empty(queue))mask |= POLLIN | POLLRDNORM;if (!vgpu_kfifo_is_full(queue))mask |= POLLOUT | POLLWRNORM;return mask;
}
使用两个内核线程进行生产和消费,rmmod时候退出,可自行修改sleep时间实现 队列满或者空的情况。
测试代码:
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/delay.h>
#include <linux/slab.h>
#include "cq_vgpu_kfifo.h"#define ITEM_SIZE sizeof(int)
#define MAX_ITEMS 10static struct vgpu_kfifo_queue test_queue;
static struct task_struct *producer_thread;
static struct task_struct *consumer_thread;// 生产者线程:每 0.5 秒写入一个整数
static int producer_fn(void *data)
{int value = 0;while (!kthread_should_stop()) {int ret = vgpu_kfifo_enqueue(&test_queue, &value);if (ret < 0) {printk(KERN_INFO "Producer: Failed to enqueue %d, error: %d\n", value, ret);} else {printk(KERN_INFO "Producer: Enqueued %d\n", value);}value++;msleep(500); // 每 0.5 秒生产一个}printk("producer_fn exit\n");return 0;
}// 消费者线程:每 1 秒读取一个整数
static int consumer_fn(void *data)
{while (!kthread_should_stop()) {int value;int ret = vgpu_kfifo_dequeue(&test_queue, &value);if (ret < 0) {printk(KERN_INFO "Consumer: Failed to dequeue, error: %d\n", ret);} else {printk(KERN_INFO "Consumer: Dequeued %d\n", value);}msleep(1500); // 每 1 秒消费一个}printk("consumer_fn exit\n");return 0;
}// 模块初始化
static int __init kfifo_test_init(void)
{int ret;printk(KERN_INFO "Initializing kfifo test module\n");// 初始化队列ret = vgpu_kfifo_queue_create(&test_queue, ITEM_SIZE, MAX_ITEMS);if (ret) {printk(KERN_ERR "Failed to create kfifo queue: %d\n", ret);return ret;}// 创建生产者线程producer_thread = kthread_run(producer_fn, NULL, "kfifo_producer");if (IS_ERR(producer_thread)) {printk(KERN_ERR "Failed to create producer thread\n");vgpu_kfifo_queue_destroy(&test_queue);return PTR_ERR(producer_thread);}// 创建消费者线程consumer_thread = kthread_run(consumer_fn, NULL, "kfifo_consumer");if (IS_ERR(consumer_thread)) {printk(KERN_ERR "Failed to create consumer thread\n");kthread_stop(producer_thread);vgpu_kfifo_queue_destroy(&test_queue);return PTR_ERR(consumer_thread);}return 0;
}// 模块卸载
static void __exit kfifo_test_exit(void)
{printk(KERN_INFO "Cleaning up kfifo test module\n");test_queue.is_open = false;// 停止线程if (producer_thread) {printk(KERN_INFO "Stopping producer thread\n");kthread_stop(producer_thread);producer_thread = NULL;}if (consumer_thread) {printk(KERN_INFO "Stopping consumer thread\n");kthread_stop(consumer_thread);consumer_thread = NULL;}printk(KERN_INFO "Destroying kfifo queue\n");// 销毁队列vgpu_kfifo_queue_destroy(&test_queue);
}module_init(kfifo_test_init);
module_exit(kfifo_test_exit);MODULE_LICENSE("GPL");
MODULE_AUTHOR("Your Name");
MODULE_DESCRIPTION("Test module for kfifo queue");
总结
以上讲解了kfifo实现队列的示例,支持阻塞和非阻塞模式。