Tasklet 介绍

Linux 内核提供的四种中断下半部中 softirq(软中断)、tasklet(小任务)、workqueue(工作队列) 、request thread(中断线程)中的其中一种,其效率仅次于软中断,但远高于request thread 和 workqueue。

  • 软中断(softirq) 之所以性能高的原因,在 SMP 系统下多个 cpu 同时并发处理

如网卡的 fifo 半满中断触发,被 cpu0 处理,cpu0 会在关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。若网卡的 fifo 全满中断有再次触发,就会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续相同的流程。由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。

  • 小任务(tasklet) 之所以性能较软中断差,是因为同一种小任务在多个 cpu 上不会并发执行

由于 tasklet 基于 softirq 的基础实现,为了易用性考虑,同一种 tasklet 在多个 cpu 上不会并行执行,因此不存在并发问题,在使用上就可以少一些顾虑。但也正是因为不存在并发,导致了性能较之 softirq 差一些。

  • tasklet 之所以比 workqueue 和 request thread 性能高

原因是因为前者是在软中断上下文件工作(意味着不能调用任何阻塞的接口),而后两者是在进程上下文工作(实质上是在内核线程里面执行)。

使用示例模版

#include <linux/module.h>
#include <linux/init.h>
#include <linux/kernel.h>
#include <linux/interrupt.h>

static struct tasklet_struct my_tasklet;

static void my_tasklet_handle(unsigned long data)
{
	printk("tasklet handle running...\n");
}

static irqreturn_t xxx_interrupt(int irq, void *dev_id)
{
    // 调度 tasklet
    tasklet_schedule(&my_tasklet);
}

static int __init demo_driver_init(void)
{
    // 初始化一个 tasklet ,关联处理函数
	tasklet_init(&my_tasklet, my_tasklet_handle, 0);
    request_irq(xxx, xxx_interrupt, IRQF_SHARED, xxx, xxx);
	return 0;
}

static void __exit demo_driver_exit(void)
{
	tasklet_kill(&my_tasklet);
	return ;
}

module_init(demo_driver_init);
module_exit(demo_driver_exit);
MODULE_LICENSE("GPL v2");

内核源码分析:

Linux 内核被启动后,会执行 start_kernel() 函数,tasklet 是基于 softirq 实现的,会在 softirq_init() 里面进行必要的初始化,主要是初始化 tasklet 链表和与相应的软中断号建立关联。

tasklet_hi_action 是高优先级的 tasklet,tasklet_action 是普通的 tasklet,两者实现原理都一样。

// kernel\linux-4.9\init\main.c
asmlinkage __visible void __init start_kernel(void)
{
	...
	softirq_init();
	...
}

// kernel\linux-4.9\kernel\softirq.c
void __init softirq_init(void)
{
	int cpu;

    // 初始化链表
	for_each_possible_cpu(cpu) {
		per_cpu(tasklet_vec, cpu).tail =
			&per_cpu(tasklet_vec, cpu).head;
		per_cpu(tasklet_hi_vec, cpu).tail =
			&per_cpu(tasklet_hi_vec, cpu).head;
	}

    // 建立TASKLET_SOFTIRQ、HI_SOFTIRQ软中断号的对应服务接口
	open_softirq(TASKLET_SOFTIRQ, tasklet_action);
	open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

当调用 tasklet_schedule() 时,如果该 tasklet 没有被设置 TASKLET_STATE_SCHED 标记时,才会加入链表内,如果已经设置了 TASKLET_STATE_SCHED 了,那么就会忽略此次的 tasklet _schedule(),这就意味着如果在极短的时间内调用 tasklet_schedule() 只会触发一次(这里可能会存在丢失中断事件的情况)。之后会通过 raise_softirq_irqoff() 启用 TASKLET_SOFTIRQ 软中断。

哪颗 cpu 受理该软中断,就将 tasklet 加入到该 cpu 的 tasklet 链表内,由于相同的软中断可以同时被其他 cpu 触发执行,因此会出现 cpu0\cpu1 的 tasklet 链表内有同一个 tasklet 的情况。

// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_schedule(struct tasklet_struct *t)
{
	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
		__tasklet_schedule(t);
}

// kernel\linux-4.9\kernel\softirq.c
void __tasklet_schedule(struct tasklet_struct *t)
{
	unsigned long flags;

    // 将指定的 tasklet 加入到链表内并设置软中断
	local_irq_save(flags);
	t->next = NULL;
	*__this_cpu_read(tasklet_vec.tail) = t;
	__this_cpu_write(tasklet_vec.tail, &(t->next));
	raise_softirq_irqoff(TASKLET_SOFTIRQ);
	local_irq_restore(flags);
}

在初始化的时候已经为  TASKLET_SOFTIRQ 软中断与 tasklet_action() 建立关联的关系,因此软中断触发时,就会调用 tasklet_action(),这里是精髓部分。

多次触发软中断时,当前 cpu 只能同一时间执行一次相同的软中断,但是如果有多个 cpu 的话,那么会有多个 cpu 并发执行软中断,所以下面的 tasklet_action() 要想到这一点。

1. tasklet_action() 屏蔽当前 CPU 中断,获取当前 CPU 的 tasklet 链表并清空原有的链表,在恢复中断。避免在操作链表的过程中,被硬件中断打断。

2. 开始遍历链表取出 tasklet,先 TASKLET_STATE_RUN 标记确定该 tasklet 是否已经被其它 cpu 执行,因为该 tasklet 在执行的过程中,又被加入到当前的 cpu 的 tasklet 链表内。

3. 如果没有被执行,就继续检查该 tasklet 是否被 tasklet_disable(),如果有被 disable 就清除 TASKLET_STATE_RUN 标记,这样可以重新被添加会当前 tasklet 链表内,等待再次执行。如果有被执行,即使又被设置了 TASKLET_STATE_RUN 标记,也会在第 5 步执行完成后,会清除掉该标记。

4. 如果没有被 disable,那么就清除 TASKLET_STATE_SCHED 标记,该标记一旦被清除,就意味着在 tasklet 执行期间,tasklet_schedule() 可以继续添加新的 tasklet 其他 cpu  的 tasklet 链表内。如果当前 cpu 已经完成了 tasklet_action() ,新的 tasklet 也可能会重新添加到当前的  tasklet 链表。

5. 这里就会执行通过 tasklet_init() 绑定的 func,也就是示例中的 my_tasklet_handle(),执行完成后再清除 TASKLET_STATE_RUN 标记,继续下一个 tasklet。

6. 能走到这一步,会有两种情况,一种情况是即将执行 tasklet ,发现已经被 disable 掉了,另外一种情况是 tasklet 已经在其它 CPU 上执行中。无论哪种情况,都会将当前的 tasklet 重新放回到当前 cpu 的 tasklet 链表内,并调用 __raise_softirq_irqoff() 重新触发软中断(应该是启用该软中断)。

注意,以上获取 TASKLET_STATE_RUN 和 TASKLET_STATE_SCHED 标记都是位原子操作,所以不会出现因并发引发的问题。

// kernel\linux-4.9\kernel\softirq.c
// 某 CPU 要调度各个 tasklet 的实现
static __latent_entropy void tasklet_action(struct softirq_action *a)
{
	struct tasklet_struct *list;

    // 1 ------------------------------------------------------
	local_irq_disable();
	list = __this_cpu_read(tasklet_vec.head); // 获取 tasklet 链表
	__this_cpu_write(tasklet_vec.head, NULL); // 清空 tasklet 链表
	__this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
	local_irq_enable();

    // 如果存在 tasklet 就会进入循环
	while (list) 
    {
		struct tasklet_struct *t = list;
		list = list->next;

        // 2 ------------------------------------------------------
        // TASKLET_STATE_SCHED: 表示该 tasklet 已经被挂接到某个 CPU 上
        // TASKLET_STATE_RUN:   表示该 tasklet 正在某个 CPU 上执行
        // 检查并设置 TASKLET_STATE_RUN 标记
        // 返回 1: 表示 tasklet 没有被执行  返回 0: 表示 tasklet 已经被执行
		if (tasklet_trylock(t)) 
        {
            
            // 3 ------------------------------------------------------
            // 如果当前 tasklet 没有被 tasklet_disable()
			if (!atomic_read(&t->count))  
            {
                // 4 ----------------------------------------------
                // 清除 TASKLET_STATE_SCHED 状态,便于该 tasklet 可以再次被触发
				if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
					BUG();
                // 5 ----------------------------------------------
                // 这期间,该 tasklet 可以被 tasklet_schedule(),从而引出下面第二种情况
				t->func(t->data);        // 如果没有执行且没有 disable 则执行
				tasklet_unlock(t);        // 清理 TASKLET_STATE_RUN 标记
				continue;                 // 继续下一个 tasklet
			}

            // 如果当前已经被 disable 了,那就清理 TASKLET_STATE_RUN 标记
			tasklet_unlock(t);
		}

        // 6 -----------------------------------------------------
        // 有两种情况下,会将该 tasklet 再挂接回链表内,并重新触发,等待下一次执行的机会
        // 1. 如果没有被执行,但是被调用 tasklet_disable() 接口 disable 了
        // 2. 当前 tasklet 已经在其它 CPU 正在执行 func 这时候 tasklet 又会被挂回在
        //    原来的链表中,为了满足同一种类型的 tasklet 只能在一个 CPU 上执行的设计
        //    因此此次不执行 tasklet,挂入链表后等待下一次被执行的时机执行
		local_irq_disable();
		t->next = NULL;
		*__this_cpu_read(tasklet_vec.tail) = t; 
		__this_cpu_write(tasklet_vec.tail, &(t->next));
		__raise_softirq_irqoff(TASKLET_SOFTIRQ);
		local_irq_enable();
	}
}

贴出判断和标记和清除 tasklet 运行的 TASKLET_STATE_RUN  标记代码 

// kernel\linux-4.9\include\interrupt.h
static inline int tasklet_trylock(struct tasklet_struct *t)
{
	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}

// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_unlock(struct tasklet_struct *t)
{
	smp_mb__before_atomic();
	clear_bit(TASKLET_STATE_RUN, &(t)->state);
}

 贴出关闭 tasklet 执行的代码

// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_disable(struct tasklet_struct *t)
{
	tasklet_disable_nosync(t);
	tasklet_unlock_wait(t);
	smp_mb();
}

// kernel\linux-4.9\include\interrupt.h
// 关闭 tasklet(实际上应该理解为暂停调度执行)
static inline void tasklet_disable_nosync(struct tasklet_struct *t)
{
    // 这里对整型原子操作 count 自增了,对应 tasklet_action() 里面的 atomic_read()
	atomic_inc(&t->count);
	smp_mb__after_atomic();
}

// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_enable(struct tasklet_struct *t)
{
    // 这里对整型原子操作 count 自减了,对应 tasklet_action() 里面的 atomic_read()
	smp_mb__before_atomic();
	atomic_dec(&t->count);
}

tasklet_init() 实现与用户函数关联的接口,也是很简单 

// kernel\linux-4.9\kernel\softirq.c
void tasklet_init(struct tasklet_struct *t,
		  void (*func)(unsigned long), unsigned long data)
{
	t->next = NULL;
	t->state = 0;
	atomic_set(&t->count, 0);
	t->func = func;
	t->data = data;
}

 tasklet_kill() 主要实现是尽可能快的让 tasklet 得到执行,等待执行完成后再退出。

1. 判断该 tasklet 是否已经被挂接到某个 cpu 的 tasklet 链表内,如果有挂接到,那么就立即让出 cpu,直至 tasklet 清除 TASKLET_STATE_SCHED 标记(参考 tasklet_action() 第 4 个步骤),进入运行状态。

2. 如果 tasklet 没有挂接或者一旦进入到执行状态,那么就会不停的检测 TASKLET_STATE_RUN 是否有被清除,被清除的话说明已经运行完成(参考 tasklet_action() 第5个步骤),可以放心的退出了。

// kernel\linux-4.9\kernel\softirq.c
void tasklet_kill(struct tasklet_struct *t)
{
	if (in_interrupt())
		pr_notice("Attempt to kill tasklet from interrupt\n");

    // 1 ----------------------------------------------
	while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
		do {
			yield();
		} while (test_bit(TASKLET_STATE_SCHED, &t->state));
	}
    // 2 ----------------------------------------------
	tasklet_unlock_wait(t);
	clear_bit(TASKLET_STATE_SCHED, &t->state);
}

// kernel\linux-4.9\include\interrupt.h
static inline void tasklet_unlock_wait(struct tasklet_struct *t)
{
	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
}

至此,tasklet 关键的实现原理分析完成,实际上还应联合 softirq,才算完整的了解整个机制。

Tasklet 机制总结

1. 每颗 cpu 都有自己的 tasklet 链表,这样可以将 tasklet 分布在各个 cpu 上,可实现并发不同的 tasklet。

2. 相同的 tasklet 只能在某一颗 cpu 上串行执行,其它 cpu 会暂时避让,在此情况下,不需要考虑并发问题(即不需要加锁)。

3. tasklet_schedule() 接口调用时,如果 tasklet 还未被执行,或者处于 disable 期间,指定的 tasklet 不会被加入链表内,即该请求不会被受理。

4. tasklet_disable() 接口只是暂时停止指定的 tasklet 执行,依然会被加回待执行链表内。而在 disable 期间,相同的 tasklet 将无法被加入链表调度。

参考资料:http://www.wowotech.net/irq_subsystem/tasklet.html