linux tasklet

发布于:2024-04-09 ⋅ 阅读:(113) ⋅ 点赞:(0)

软中断、tasklet 以及工作队列,均是 linux 中将任务推后执行的机制。其中工作队列与用户态使用的线程池类似。

什么是任务推后执行呢 ?

可以借助于开发应用时经常使用的线程池来理解。任务推后执行,就是任务本该执行的时候没有立即执行,而是将任务放到任务容器(标志位或者任务队列)中,相当于生产者;任务容器还有消费者,消费者从容器中取出任务来执行。这就是推后执行,其中有 3 个组成元素,生产者,任务容器和消费者。

1 tasklet 通过软中断实现

tasklet 是通过软中断来实现的,相当于在软中断的基础上又扩展了一层。有一种场景的思路和 tasklet 与软中断的关系比较类似,在开发网络应用时,由于 tcp 端口是有限的,如果系统的规格下 tcp 端口号不够用的话,在应用层有时会做一层虚拟化,用户使用的连接是虚拟连接,并不是独占一个 tcp 端口,而是多个虚拟连接共享一个 tcp 端口。tasklet 对软中断的使用类似于使用 tcp 时的端口复用。

enum
{
	HI_SOFTIRQ=0,
	TIMER_SOFTIRQ,
	NET_TX_SOFTIRQ,
	NET_RX_SOFTIRQ,
	BLOCK_SOFTIRQ,
	IRQ_POLL_SOFTIRQ,
	TASKLET_SOFTIRQ,
	SCHED_SOFTIRQ,
	HRTIMER_SOFTIRQ,
	RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */

	NR_SOFTIRQS
};

tasklet 使用了两个软中断:HI_SOFTIRQ 和 TASKLET_SOFTIRQ。两者优先级不同,前者比后者优先级高。软中断的优先级是怎么实现的呢 ?不同的软中断触发,就是设置软中断的标志位,类似于一个 bitmap, 软中断数值越小,对应的优先级就越高,因为在函数 __do_softirq() 中遍历软中断时,是从小向大进行遍历的,这样保证了数值小的软中断被先处理。软中断的优先级类似于调度策略中的 rt 调度策略,rt 调度策略在内核中的优先级取值范围是 [0, 98],也使用了一个 bitmap 来标志这个优先级下有没有待执行的线程,优先级数值越小,越早被遍历,表示优先级越高。

这两个软中断在函数 softirq_init() 中进行注册。软中断处理函数分别是 tasklet_action 和 tasklet_hi_action。

void __init softirq_init(void)
{
    int cpu;

    for_each_possible_cpu(cpu) {
        per_cpu(tasklet_vec, cpu).tail =
            &per_cpu(tasklet_vec, cpu).head;
        per_cpu(tasklet_hi_vec, cpu).tail =
            &per_cpu(tasklet_hi_vec, cpu).head;
    }

    open_softirq(TASKLET_SOFTIRQ, tasklet_action);
    open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

2 tasklet 数据结构和 api

2.1 struct tasklet_struct

struct tasklet_struct 表示一个 tasklet。其中最主要的成员是这个 tasklet 的处理函数,处理函数有两个选择,一个是 func,func 的入参是用户自定义的参数,data 在 struct tasklet_struct 中的 data 成员中保存;一个是 callback,callback 的参数是 tasklet 本身,当使用 callback 的时候需要将 use_callback 设置为 true。

struct tasklet_struct
{
	struct tasklet_struct *next;
	unsigned long state;
	atomic_t count;
	bool use_callback;
	union {
		void (*func)(unsigned long data);
		void (*callback)(struct tasklet_struct *t);
	};
	unsigned long data;
};

state 字段表示 tasklet 当前的状态。状态可取如下两个值:

enum
{
    // 说明 tasklet 已经被提交了,等待被执行
	TASKLET_STATE_SCHED,	/* Tasklet is scheduled for execution */
    // 说明 tasklet 当前正在被执行
    // 该状态只在多处理器系统上才会生效
    // 单处理器系统上不需要对 tasklet 做是不是 running 的判断
	TASKLET_STATE_RUN	/* Tasklet is running (SMP only) */
};


// 只用定义了 CONFIG_SMP,TASKLET_STATE_RUN 才会被使用
#ifdef CONFIG_SMP
static inline int tasklet_trylock(struct tasklet_struct *t)
{
	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}

static inline void tasklet_unlock(struct tasklet_struct *t)
{
	smp_mb__before_atomic();
	clear_bit(TASKLET_STATE_RUN, &(t)->state);
}

static inline void tasklet_unlock_wait(struct tasklet_struct *t)
{
	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
}
#else
#define tasklet_trylock(t) 1
#define tasklet_unlock_wait(t) do { } while (0)
#define tasklet_unlock(t) do { } while (0)
#endif

count 是 tasklet 的计数器,当 count 不是 0,tasklet 会被禁止,不允许执行;只有当 count 为 0 的时候,tasklet 才可以被执行。与之对应的是 tasklet_disable() 和 tasklet_enable() 两个函数,分别是禁用 tasklet 和使能 tasklet,在 tasklet_disable() 中增加 count 的值,在 tasklet_enable() 中减小 count 的值。

static inline void tasklet_disable(struct tasklet_struct *t)
{
	tasklet_disable_nosync(t);
	tasklet_unlock_wait(t);
	smp_mb();
}

static inline void tasklet_enable(struct tasklet_struct *t)
{
	smp_mb__before_atomic();
	atomic_dec(&t->count);
}

2.2 tasklet 初始化

初始化一个 tasklet,就是定义一个 struct tasklet_struct 结构体,然后初始化其中的成员。可以通过宏 DECLARE_TASKLET 来定义一个 tasklet,name 是 struct tasklet_struct 的名字,_callback 是 tasklet 的处理函数;tasklet_setup() 可以初始化一个 tasklet,处理函数是 callback,tasklet_init() 也可以初始化一个 tasklet,处理函数是 func。

#define DECLARE_TASKLET(name, _callback)		\
struct tasklet_struct name = {				\
	.count = ATOMIC_INIT(0),			\
	.callback = _callback,				\
	.use_callback = true,				\
}

void tasklet_setup(struct tasklet_struct *t,
           void (*callback)(struct tasklet_struct *))
{
    t->next = NULL;
    t->state = 0;
    atomic_set(&t->count, 0);
    t->callback = callback;
    t->use_callback = true;
    t->data = 0;
}
EXPORT_SYMBOL(tasklet_setup);

void tasklet_init(struct tasklet_struct *t,
          void (*func)(unsigned long), unsigned long data)
{
    t->next = NULL;
    t->state = 0;
    atomic_set(&t->count, 0);
    t->func = func;
    t->use_callback = false;
    t->data = data;
}
EXPORT_SYMBOL(tasklet_init);

2.3 tasklet_schedule

使用 TASKLET_SOFTIRQ 的 tasklet 通过 task_schedule 来调度;使用 HI_SOFTIRQ 的 tasklet 通过 tasklet_hi_schedule 来调度。

tasklet_schedule() 这样的命名并不是太好理解,触发软中断的时候使用的关键字是 raise,而触发 tasklet 的时候使用的关键字是 schedule。意思都是将任务设置到待执行的状态,可以执行了。

① 判断 tasklet 是不是处于 SCHED 状态

test_and_set_bit() 将这一 bit 设置为 1,并返回设置之前的值,如果设置之前是 0 那么就会调用__tasklet_schedule() 将 tasklet 设置到待执行状态;否则的话,说明 tasklet 已经调度了,直接返回。

static inline void tasklet_schedule(struct tasklet_struct *t)
{
	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
		__tasklet_schedule(t);
}

② __tasklet_schedule() 最终调用到函数 __tasklet_schedule_common(),在这个函数中主要做了两件事,一个是将这个 tasklet 加入到 tasklet_vec 链表中,这样在函数 tasklet_action 中就能遍历到这个 tasklet 并进行处理;第二个工作是触发 tasklet 软中断。

struct tasklet_head {
    struct tasklet_struct *head;
    struct tasklet_struct **tail;
};

static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

static void __tasklet_schedule_common(struct tasklet_struct *t,
                      struct tasklet_head __percpu *headp,
                      unsigned int softirq_nr)
{
    struct tasklet_head *head;
    unsigned long flags;

    local_irq_save(flags);
    head = this_cpu_ptr(headp);
    t->next = NULL;
    *head->tail = t;
    head->tail = &(t->next);
    raise_softirq_irqoff(softirq_nr);
    local_irq_restore(flags); tasklet_schedule
}

tasklet_vec 和 tasklet_hi_vec 是 per cpu 的变量,每个 cpu 都有一个对应的链表。这两个链表的维护非常绕,有两个成员,head 和 tail,head 是 struct tasklet_struct 指针,tail 是 struct task_struct 二级指针,在初始化的时候,使  tail 指向了 head。

在 tasklet_schedule() 中将 tasklet 加到链表中,只需要维护 tail 就可以了。

void __init softirq_init(void)
{
    int cpu;

    for_each_possible_cpu(cpu) {
        per_cpu(tasklet_vec, cpu).tail =
            &per_cpu(tasklet_vec, cpu).head;
        per_cpu(tasklet_hi_vec, cpu).tail =
            &per_cpu(tasklet_hi_vec, cpu).head;
    }

    open_softirq(TASKLET_SOFTIRQ, tasklet_action);
    open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

2.4 tasklet 执行

tasklet 的执行在函数 tasklet_action() 中进行。最终调用到函数 tasklet_action_common()。

(1) tl_head 是 tasklet_vec 链表,首先从 tasklet_vec 中把所有的 tasklet 取下来,然后将 tasklet_vec 链表设置为空

(2) 处理 tasklet 链表中的任务

① tasklet_trylock() 会判断当前 tasklet 是不是出于 RUN 状态,如果是的话那么不处理这个 tasklet

② atomic_read 读取 count 的值,来判断这个 tasklet 是不是被禁用,如果被禁用,则不处理

③ 清除 SCHED 标志,清除之后,可以再次调度这个 tasklet 了

④ 如果这个 tasklet 没有被处理,那么会被加回到 tasklet_vec 链表中

static void tasklet_action_common(struct softirq_action *a,
                  struct tasklet_head *tl_head,
                  unsigned int softirq_nr)
{
    struct tasklet_struct *list;

    local_irq_disable();
    list = tl_head->head;
    tl_head->head = NULL;
    tl_head->tail = &tl_head->head;
    local_irq_enable();

    while (list) {
        struct tasklet_struct *t = list;

        list = list->next;
    
        if (tasklet_trylock(t)) {
            if (!atomic_read(&t->count)) {
                if (!test_and_clear_bit(TASKLET_STATE_SCHED,
                            &t->state))
                    BUG();
                if (t->use_callback)
                    t->callback(t);
                else
                    t->func(t->data);
                tasklet_unlock(t);
                continue;
            }
            tasklet_unlock(t);
        }
        
        local_irq_disable();
        t->next = NULL;
        *tl_head->tail = t;
        tl_head->tail = &t->next;
        __raise_softirq_irqoff(softirq_nr);
        local_irq_enable();
    }
}

3 tasklet 使用示例

如下是在一个内核模块中使用 tasklet。

① 定义了一个 tasklet my_tasklet,在模块初始化函数中通过 tasklet_init() 进行初始化,

tasklet 的处理函数是 tasklet_handler() 在该函数中打印了一条日志 "Tasklet executed successfully!\n"。

② 在内核模块中创建了一个线程,在该线程中每隔一秒通过 tasklet_schedule() 调度一次。

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/kthread.h>
#include <linux/delay.h>
#include <linux/interrupt.h>

// 定义 Tasklet 处理函数
void tasklet_handler(unsigned long data);

// 声明一个 Tasklet 结构体
static struct tasklet_struct my_tasklet;

// 内核线程执行函数
static int thread_func(void *data)
{
    while (!kthread_should_stop()) {
        // 调度 Tasklet 执行
        tasklet_schedule(&my_tasklet);

        // 等待 1s
        msleep(1000);
    }
    return 0;
}

// 模拟的 Tasklet 处理函数
void tasklet_handler(unsigned long data)
{
    printk(KERN_INFO "Tasklet executed successfully!\n");
}

// 模块初始化函数
static int __init tasklet_example_init(void)
{
    // 初始化 Tasklet 结构体
    tasklet_init(&my_tasklet, tasklet_handler, 0);

    // 创建内核线程
    kthread_run(thread_func, NULL, "tasklet_thread");

    printk(KERN_INFO "Tasklet example module initialized\n");
    return 0;
}

// 模块清理函数
static void __exit tasklet_example_exit(void)
{
    // 停止 Tasklet 的调度
    tasklet_kill(&my_tasklet);

    printk(KERN_INFO "Tasklet example module exited\n");
}

// 注册模块初始化和清理函数
module_init(tasklet_example_init);
module_exit(tasklet_example_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("wyl");

4 软中断,tasklet 的区别

软中断和 tasklet 的最主要区别,同一个软中断可以在多个 cpu 上并发处理,而对于一个 tasklet 来说,虽然也可以调度到不同的 cpu 上,但是对于同一个 tasklet 不会并发处理。

从 tasklet 的处理函数 tasklet_action_common() 中也可以看出来,如果当前这个 tasklet 处于 RUN 状态,那么就不会处理它。


网站公告

今日签到

点亮在社区的每一天
去签到