linux内核中kfifo实现队列

发布于:2025-07-29 ⋅ 阅读:(14) ⋅ 点赞:(0)

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:
linux内核中kfifo实现队列
内核版本
5.10.92


提示:以下是本篇文章正文内容,下面案例可供参考

一、kfifo是什么?

kfifo 是 Linux 内核中的一个数据结构,用于实现先进先出(First In First Out, FIFO)队列。它特别适用于需要高效处理数据流的场景,比如在内核模块间传递数据、网络协议栈等。kfifo 提供了无锁编程的支持,在某些情况下可以显著提高性能,因为它们允许在生产者和消费者之间同步而不需要使用传统的锁机制。

kfifo申请的是字节大小(必须是 2 的次幂,不是可能被内核自动修改成符合条件的)。

虽然 kfifo 可以避免一些锁定问题(单生产者和消费者不用加锁),但在多生产者或多消费者场景下,可能仍然需要适当的同步机制来防止竞争条件。

二、使用步骤

代码如下(示例):

接口文件:可以做api用

#include <linux/kfifo.h>
#include <linux/slab.h>
#include <linux/mutex.h>
#include <linux/wait.h>
#include <linux/fs.h>
#include <linux/miscdevice.h>
#include <linux/poll.h>
#include "cq_vgpu_kfifo.h"

/**
 * Initialize a kfifo queue for a misc device
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @param item_size: size of each data item
 * @param max_items: maximum number of items in queue
 * @return 0 on success, negative error code on failure
 */
int vgpu_kfifo_queue_create(struct vgpu_kfifo_queue *queue,
                           unsigned int item_size,
                           unsigned int max_items)
{
    int ret;

    if (!queue)
        return -EINVAL;

    printk(KERN_INFO "vgpu_kfifo_queue_create: item_size=%u, max_items=%u\n", item_size, max_items);
    // Initialize the kfifo
    ret = kfifo_alloc(&queue->fifo, item_size * max_items, GFP_KERNEL);
    if (ret) {
        printk(KERN_ERR "Failed to allocate kfifo: %d\n", ret);
        return ret;
    }

    // Initialize synchronization primitives
    mutex_init(&queue->lock);
    init_waitqueue_head(&queue->read_wait);
    init_waitqueue_head(&queue->write_wait);

    // Set queue parameters
    queue->max_items = max_items;
    queue->item_size = item_size;
    queue->is_open = true;

    return 0;
}

/**
 * Destroy and free a kfifo queue
 * @param queue: pointer to vgpu_kfifo_queue structure
 */
void vgpu_kfifo_queue_destroy(struct vgpu_kfifo_queue *queue)
{
    if (!queue)
        return;

    queue->is_open = false;

    // Wake up any waiting processes
    wake_up_interruptible(&queue->read_wait);
    wake_up_interruptible(&queue->write_wait);

    // Free the kfifo buffer
    kfifo_free(&queue->fifo);

    // Clean up synchronization primitives
    mutex_destroy(&queue->lock);
}

/**
 * Put data into the queue (producer) - blocking mode
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @param data: pointer to data to be queued
 * @return number of items queued (1) or negative error code
 */
int vgpu_kfifo_enqueue(struct vgpu_kfifo_queue *queue, const void *data)
{
    int ret;

    if (!queue || !data || !queue->is_open)
        return -EINVAL;

    // Acquire lock
    if (mutex_lock_interruptible(&queue->lock))
        return -ERESTARTSYS;

    // Wait while queue is full
    while (kfifo_is_full(&queue->fifo)) {
        mutex_unlock(&queue->lock);

        if(!queue->is_open) {
            printk("vgpu_kfifo_dequeue !queue->is_open\n");
            return -1;
        }

        printk(KERN_INFO "vgpu_kfifo_enqueue: queue is full, queue->is_open = %d\n", queue->is_open);
        // Wait for space to become available
        if (wait_event_interruptible(queue->write_wait, !kfifo_is_full(&queue->fifo) || !queue->is_open)){
            printk(KERN_ERR "vgpu_kfifo_enqueue: wait_event_interruptible failed\n");
            return -ERESTARTSYS;
        }
        // Reacquire lock
        if (mutex_lock_interruptible(&queue->lock))
            return -ERESTARTSYS;
    }

    // Put data into queue
    ret = kfifo_in(&queue->fifo, data, queue->item_size);

    // Release lock
    mutex_unlock(&queue->lock);

    // Wake up any waiting consumers
    if (ret > 0)
        wake_up_interruptible(&queue->read_wait);

    printk(KERN_ERR "vgpu_kfifo_enqueue: exit ret = %d\n", ret);
    return ret ? 1 : -EIO;
}

/**
 * Put data into the queue (producer) - non-blocking mode
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @param data: pointer to data to be queued
 * @return number of items queued (1), 0 if queue full, or negative error code
 */
int vgpu_kfifo_enqueue_nonblock(struct vgpu_kfifo_queue *queue, const void *data)
{
    int ret;

    if (!queue || !data || !queue->is_open)
        return -EINVAL;

    // Acquire lock
    if (mutex_lock_interruptible(&queue->lock))
        return -ERESTARTSYS;

    // Check if queue is full
    if (kfifo_is_full(&queue->fifo)) {
        mutex_unlock(&queue->lock);
        return 0; // Queue is full, but non-blocking
    }

    // Put data into queue
    ret = kfifo_in(&queue->fifo, data, queue->item_size);

    // Release lock
    mutex_unlock(&queue->lock);

    // Wake up any waiting consumers
    if (ret > 0)
        wake_up_interruptible(&queue->read_wait);

    return ret ? 1 : -EIO;
}

/**
 * Get data from the queue (consumer) - blocking mode
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @param data: pointer to buffer for dequeued data
 * @return number of items dequeued (1) or negative error code
 */
int vgpu_kfifo_dequeue(struct vgpu_kfifo_queue *queue, void *data)
{
    int ret;

    if (!queue || !data || !queue->is_open)
        return -EINVAL;

    // Acquire lock
    if (mutex_lock_interruptible(&queue->lock))
        return -ERESTARTSYS;

        //dump_stack();
    // Wait while queue is empty
    while (kfifo_is_empty(&queue->fifo)) {
        mutex_unlock(&queue->lock);

        if(!queue->is_open) {
            printk("vgpu_kfifo_dequeue !queue->is_open\n");
            return -1;
        }
        printk(KERN_INFO "vgpu_kfifo_dequeue: queue is empty, queue->is_open = %d\n", queue->is_open);
        // Wait for data to become available
        if (wait_event_interruptible(queue->read_wait, !kfifo_is_empty(&queue->fifo) || !queue->is_open)) {
            printk(KERN_ERR "vgpu_kfifo_dequeue: wait_event_interruptible failed\n");
            return -ERESTARTSYS;
        }
        // Reacquire lock
        if (mutex_lock_interruptible(&queue->lock))
            return -ERESTARTSYS;
    }

    // Get data from queue
    ret = kfifo_out(&queue->fifo, data, queue->item_size);

    // Release lock
    mutex_unlock(&queue->lock);

    // Wake up any waiting producers
    if (ret > 0)
        wake_up_interruptible(&queue->write_wait);

    printk(KERN_ERR "vgpu_kfifo_dequeue: exit ret = %d\n", ret);
    return ret ? 1 : -EIO;
}

/**
 * Get data from the queue (consumer) - non-blocking mode
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @param data: pointer to buffer for dequeued data
 * @return number of items dequeued (1), 0 if empty, or negative error code
 */
int vgpu_kfifo_dequeue_nonblock(struct vgpu_kfifo_queue *queue, void *data)
{
    int ret;

    if (!queue || !data || !queue->is_open)
        return -EINVAL;

    // Acquire lock
    if (mutex_lock_interruptible(&queue->lock))
        return -ERESTARTSYS;

    // Check if queue is empty
    if (kfifo_is_empty(&queue->fifo)) {
        mutex_unlock(&queue->lock);
        return 0; // Queue is empty, but non-blocking
    }

    // Get data from queue
    ret = kfifo_out(&queue->fifo, data, queue->item_size);

    // Release lock
    mutex_unlock(&queue->lock);

    // Wake up any waiting producers
    if (ret > 0)
        wake_up_interruptible(&queue->write_wait);

    return ret ? 1 : -EIO;
}

/**
 * Get the number of items in the queue
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @return number of items in queue
 */
unsigned int vgpu_kfifo_count(struct vgpu_kfifo_queue *queue)
{
    unsigned int count;

    if (!queue)
        return 0;

    if (mutex_lock_interruptible(&queue->lock))
        return 0;

    count = kfifo_len(&queue->fifo) / queue->item_size;
    mutex_unlock(&queue->lock);

    return count;
}

/**
 * Check if queue is empty
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @return true if empty, false otherwise
 */
bool vgpu_kfifo_is_empty(struct vgpu_kfifo_queue *queue)
{
    bool empty;

    if (!queue)
        return true;

    if (mutex_lock_interruptible(&queue->lock))
        return true;

    empty = kfifo_is_empty(&queue->fifo);
    mutex_unlock(&queue->lock);

    return empty;
}

/**
 * Check if queue is full
 * @param queue: pointer to vgpu_kfifo_queue structure
 * @return true if full, false otherwise
 */
bool vgpu_kfifo_is_full(struct vgpu_kfifo_queue *queue)
{
    bool full;

    if (!queue)
        return false;

    if (mutex_lock_interruptible(&queue->lock))
        return false;

    full = kfifo_is_full(&queue->fifo);
    mutex_unlock(&queue->lock);

    return full;
}

unsigned int vgpu_kfifo_poll(struct file *file, struct poll_table_struct *wait)
{
    struct vgpu_kfifo_queue *queue = file->private_data;
    unsigned int mask = 0;

    poll_wait(file, &queue->read_wait, wait);
    poll_wait(file, &queue->write_wait, wait);

    if (!vgpu_kfifo_is_empty(queue))
        mask |= POLLIN | POLLRDNORM;

    if (!vgpu_kfifo_is_full(queue))
        mask |= POLLOUT | POLLWRNORM;

    return mask;
}

使用两个内核线程进行生产和消费,rmmod时候退出,可自行修改sleep时间实现 队列满或者空的情况。
测试代码:

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/delay.h>
#include <linux/slab.h>
#include "cq_vgpu_kfifo.h"

#define ITEM_SIZE   sizeof(int)
#define MAX_ITEMS   10

static struct vgpu_kfifo_queue test_queue;
static struct task_struct *producer_thread;
static struct task_struct *consumer_thread;

// 生产者线程:每 0.5 秒写入一个整数
static int producer_fn(void *data)
{
    int value = 0;

    while (!kthread_should_stop()) {
        int ret = vgpu_kfifo_enqueue(&test_queue, &value);
        if (ret < 0) {
            printk(KERN_INFO "Producer: Failed to enqueue %d, error: %d\n", value, ret);
        } else {
            printk(KERN_INFO "Producer: Enqueued %d\n", value);
        }
        value++;
        msleep(500); // 每 0.5 秒生产一个
    }

    printk("producer_fn exit\n");
    return 0;
}

// 消费者线程:每 1 秒读取一个整数
static int consumer_fn(void *data)
{
    while (!kthread_should_stop()) {
        int value;
        int ret = vgpu_kfifo_dequeue(&test_queue, &value);
        if (ret < 0) {
            printk(KERN_INFO "Consumer: Failed to dequeue, error: %d\n", ret);
        } else {
            printk(KERN_INFO "Consumer: Dequeued %d\n", value);
        }
        msleep(1500); // 每 1 秒消费一个
    }

    printk("consumer_fn exit\n");
    return 0;
}

// 模块初始化
static int __init kfifo_test_init(void)
{
    int ret;

    printk(KERN_INFO "Initializing kfifo test module\n");

    // 初始化队列
    ret = vgpu_kfifo_queue_create(&test_queue, ITEM_SIZE, MAX_ITEMS);
    if (ret) {
        printk(KERN_ERR "Failed to create kfifo queue: %d\n", ret);
        return ret;
    }

    // 创建生产者线程
    producer_thread = kthread_run(producer_fn, NULL, "kfifo_producer");
    if (IS_ERR(producer_thread)) {
        printk(KERN_ERR "Failed to create producer thread\n");
        vgpu_kfifo_queue_destroy(&test_queue);
        return PTR_ERR(producer_thread);
    }

    // 创建消费者线程
    consumer_thread = kthread_run(consumer_fn, NULL, "kfifo_consumer");
    if (IS_ERR(consumer_thread)) {
        printk(KERN_ERR "Failed to create consumer thread\n");
        kthread_stop(producer_thread);
        vgpu_kfifo_queue_destroy(&test_queue);
        return PTR_ERR(consumer_thread);
    }

    return 0;
}

// 模块卸载
static void __exit kfifo_test_exit(void)
{
    printk(KERN_INFO "Cleaning up kfifo test module\n");

    test_queue.is_open = false;
    // 停止线程
    if (producer_thread) {
        printk(KERN_INFO "Stopping producer thread\n");
        kthread_stop(producer_thread);
        producer_thread = NULL;
    }
    if (consumer_thread) {
        printk(KERN_INFO "Stopping consumer thread\n");
        kthread_stop(consumer_thread);
        consumer_thread = NULL;
    }

    printk(KERN_INFO "Destroying kfifo queue\n");
    // 销毁队列
    vgpu_kfifo_queue_destroy(&test_queue);
}

module_init(kfifo_test_init);
module_exit(kfifo_test_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("Your Name");
MODULE_DESCRIPTION("Test module for kfifo queue");


总结

以上讲解了kfifo实现队列的示例,支持阻塞和非阻塞模式。


网站公告

今日签到

点亮在社区的每一天
去签到