自旋锁简单说明:

自旋锁主要解决在竞态并发下,保护执行时间很短的临界区。它只允许一个执行单位进入临界区,在该执行单位离开前,其它的执行单位将会在进入临界区前不停的循环等待(即所谓的自旋),直至该执行单位离开临界区后,最先等待的一个执行单位会立即进入临界区。此方式不涉及到上下文切换,因此效率极高。

出现并发的场景: 

  • 硬中断触发打断当前进程、softirq、tasklet、timer等形成的并发
  • softirq(软中断)、tasklet(小任务)、timer(内核定时器) 触发打断 当前进程(或内核线程)形成的并发
  • 在 SMP 系统下,多次触发 softirq 之间形成的并发(同一个 softirq 可在多个 cpu 并发执行)
  • 在 SMP 系统下,不同 tasklet、timer 之间的并发(同一个 tasklet 和 timer 不会并发执行)
  • 在内核抢占的调度机制形成高低优先级进程之间(或内核线程)的并发

额外的注意事项:

一、软中断在同一个cpu下并不会并发,但是在多个cpu下是可以并发的,因此性能很高。

如网卡接受数据,产生一个中断后,被 cpu0 处理,关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。此时网卡又再次产生中断,会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续参与,由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。

二、同一个 tasklet、timer 在同一时间,只会在一个cpu上运行,是为了易用性做出的牺牲。

由于 tasklet,timer 都是基于 softirq 的基础实现,为了易用性考虑,与 softirq 不同的是,同一种tasklet、timer 在多个cpu上也不会并行执行,因此不存在并发问题。

其实现原理可以看我的这篇文章:https://blog.csdn.net/lovemengx/article/details/125947279

三、新版本的 Linux 内核不再支持中断嵌套(不确定是从哪个版本开始,以下为内核补丁说明)
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=e58aa3d2d0cc

自旋锁的种类说明:

最基础的自旋锁有三个版本:

1. spin_lock()\spin_unlock()

这是最基础的自旋锁,也是对系统影响最小的自旋锁,在未获得锁时,会自旋等待进入临界区。

2. spin_lock_bh()\spin_unlock_bh()

这是在最基础的自旋锁上获取锁之前,先关闭中断底半部,明确的来说就是关闭软中断(包含基于软中断实现的 tasklet 和 timer),主要影响系统的软中断类的并发。

3. spin_lock_irq()\spin_unlock_irq()、spin_lock_irqsave()\spin_unlock_irqrestore()

这是在最基础的自旋锁上获取锁之前,先屏蔽当前 cpu 的中断,禁止内核抢占当前进程,主要用于防止软硬件中断并发,影响最大,它影响了当前 CPU 的软硬中断和进程调度。

spin_lock_irq() 是会屏蔽当前 cpu 所有的中断,spin_unlock_irq() 会开启当前 cpu 所有的中断。spin_lock_irqsave() 是现将当前 cpu 的中断使能位取出来,然后在屏蔽当前 cpu 所有中断,spin_unlock_irqrestore() 再恢复之前的中断使能位。凡是用到 spin_lock_irq()\spin_unlock_irq() 都可以用 spin_lock_irqsave()\spin_unlock_irqrestore() 替换,根据使用情况决定选择哪种方式即可,例如希望中断执行完成后,所有的中断都要开启,那就选择 spin_lock_irq()\spin_unlock_irq(),如果希望中断执行完成后,只需要恢复执行前的中断开关状态,那么就选择 spin_lock_irqsave()\spin_unlock_irqrestore(),如执行前 A中断 本来就要求关闭的,那么执行完之后,还是希望 A中断 仍处于关闭状态。

使用自旋锁的原则:

首先要先明确硬件中断的优先级最高,它可以随时打断软中断和内核线程与用户进程,他们之间的优先级如下:

1:硬中断   >>>   2:软中断(含基于软中断实现的 tasklet、timer)   >>>   3:内核线程\用户进程

然后需要确定谁可能会并发访问临界区,然后遵循如下规则,选择合适的锁即可:

  • 低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁
  • 同等级要防着同等级的, 就使用最简单自旋锁

一、低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁

例子1:用户进程上下文或内核线程 和 硬件中断 都会访问同一个临界区

用户进程:使用 spin_lock_irq()\spin_unlock_irq() 

硬件中断:使用 spin_lock()\spin_unlock() 

进程上下文访问临界区要防止被硬件中断打断侵入,就需要通过调用 spin_lock_irq()\spin_unlock_irq()  禁止当前 CPU 的中断再去获取锁,那么临界区内就不会被硬件中断访问。但它也只能关闭当前 cpu 的中断,此时其它 cpu 还能继续响应中断,所以中断内部还是需要加上 spin_lock()\spin_unlock() 来保护临界区,即使该中断未拿到锁而持续自旋,也不会影响进程上下文继续执行,顶多就自旋等待一会就能获得锁。

这里也能说明,被自旋锁保护的临界区代码不能太过复杂,不然在这种场景下,就会导致中断自旋时间过长,在该中断自旋期间就无法响应其它的中断,如 tick 心跳中断,最终可能导致系统异常死机。

例子2:软中断(softirq、tasklet、timer) 和 硬件中断 都会访问同一个临界区

软件中断:使用 spin_lock_irq()\spin_unlock_irq() 

硬件中断:使用 spin_lock()\spin_unlock() 

例子3:用户进程上下文或内核线程  软中断(softirq、tasklet、timer) 都会访问同一个临界区

用户进程:使用 spin_lock_bh()\spin_unlock_bh()

软件中断:使用 spin_lock()\spin_unlock()

进程上下文访问临界区要防止被软中断打断侵入,就需要使用 spin_lock_bh()\spin_unlock_bh() 禁用软中断,但只能关闭当前 cpu 的软中断,其它 cpu 依然能响应软中断,因此还需在软中断中使用 spin_lock()\spin_unlock() 来保护临界区。

二、同等级要防着同等级的, 就使用最简单自旋锁

例子1:用户进程上下文或内核线程 和 用户进程上下文和内核线程 即多个进程会访问同一个临界区

只需要使用:spin_lock()\spin_unlock(),因为内核支持抢占调度,所以需要上锁。

例子2:不同的 硬件中断 都会访问同一个临界区

只需要使用: spin_lock()\spin_unlock(),不同的硬件中断是可以同时被多颗 cpu 响应处理的,因此需要使用自旋锁进行保护。

如果是旧版内核支持中断嵌套的,则应该使用 spin_lock_irq()\spin_unlock_irq(),以避免被高优先级中断抢占,从而导致出现死锁情况。

例子3:不同的 tasklet、timer 会访问同一个临界区

只需要使用:spin_lock()\spin_unlock(),因为不同的 tasklet 或 timer 是可以在不同的 cpu 并发执行。

注意,如果只有相同的 tasklet 或者 timer 访问临界区,是不需要加锁的,因为相同的 tasklet 或 timer 不会并发,即使是有多个 cpu 也不会。

例子4:在一个或者多个软中断(softirq) 中会访问同一个临界区

只需要使用:spin_lock()\spin_unlock(),虽然同一时间一个 cpu 只能执行一个软中断,但其它的 cpu 还是可以并发执行相同的软中断的。

三、workqueue、waitqueue、completion 用锁规则

workqueue(工作队列)是基于内核线程实现、waitqueue(等待队列)工作在用户进程上下文、completion(完成量)是基于等待队列实现也是工作在用户进程上下文,因此它们的用锁规则等同于用户进程。

自旋锁的代码分析:

自旋锁在不同的硬件环境的实现不一样,此处分析以最复杂的环境下自旋锁的实现原理,即:SMP 下支持任务抢占的硬件环境。

一、自旋锁初始化

锁的数据结构定义,这个数据结构用的是结构体内嵌共用体设计,理解这点非常重要。

#define TICKET_SHIFT	16    // 指明 owner 和 next 的位宽

typedef struct {
	union {                
		u32 slock;        // 32 位
		struct __raw_tickets {
			u16 owner;    // 16 位 解锁计数
			u16 next;     // 16 位 上锁计数
		} tickets;
	};
} arch_spinlock_t;

锁的初始化 spin_lock_init() :lock->raw_lock 成员变量被设置为 __ARCH_SPIN_LOCK_UNLOCKED    { { 0 } },即 lock->tickets.owner = 0, lock->tickets.next = 0

#define spin_lock_init(_lock)				\
do {							\
	spinlock_check(_lock);				\
	raw_spin_lock_init(&(_lock)->rlock);		\
} while (0)

# define raw_spin_lock_init(lock)				\
	do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)

#define __RAW_SPIN_LOCK_UNLOCKED(lockname)	\
	(raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)

#define __RAW_SPIN_LOCK_INITIALIZER(lockname)	\
	{					\
	.raw_lock = __ARCH_SPIN_LOCK_UNLOCKED,	\
	SPIN_DEBUG_INIT(lockname)		\
	SPIN_DEP_MAP_INIT(lockname) }

#define __ARCH_SPIN_LOCK_UNLOCKED	{ { 0 } }

二、自旋锁上锁的分析

static __always_inline void spin_lock(spinlock_t *lock)
{
	raw_spin_lock(&lock->rlock);   // 调用宏
}

#define raw_spin_lock(lock)	_raw_spin_lock(lock)  // 调用  _raw_spin_lock

void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
	__raw_spin_lock(lock); // 继续调用 __raw_spin_lock
}

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    // 设置当前进程不可被抢占
	preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    // 调用 do_raw_spin_lock
	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

上面的代码使用 preempt_disable() 设置当前进程不可被抢占,此举可避免在持锁期间被高优先级进程抢占当前进程去访问临界区。

void do_raw_spin_lock(raw_spinlock_t *lock)
{
	debug_spin_lock_before(lock);
	arch_spin_lock(&lock->raw_lock);  // 调用 arch_spin_lock
	debug_spin_lock_after(lock);
}

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
	unsigned long tmp;
	u32 newval;
	arch_spinlock_t lockval;

	prefetchw(&lock->slock);
	__asm__ __volatile__(
    "1:	ldrex	%0, [%3]\n"          // 4 -------------------------
    "	add	%1, %0, %4\n"            // 5 -------------------------
    "	strex	%2, %1, [%3]\n"      // 6 -------------------------
    "	teq	%2, #0\n"                // 7 -------------------------
    "	bne	1b"                         
	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp) // 1 -----------
	: "r" (&lock->slock), "I" (1 << TICKET_SHIFT) // 2 ------------
	: "cc");  // 3 指明上述汇编指令会改变条件寄存器

    // 8 ---------------------------------------------
	while (lockval.tickets.next != lockval.tickets.owner) {
		wfe(); // 让当前 cpu 进入低功耗模式
		lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
	}

    // 9 ---------------------------------------------
	smp_mb();
}

这部分的代码是实现自旋锁的核心,是通过内嵌汇编指令实现,使用比较关键的汇编指令 ldrex\strex 实现原子访问:

ldrex Rx, [Ry]:读取寄存器Ry指向的4字节内存值,将其保存到Rx寄存器中,同时标记对Ry指向内存区域的独占访问
strex Rx, Ry, [Rz]:如果发现已经被标记为独占访问了,则将寄存器Ry中的值更新到寄存器Rz指向的内存,并将寄存器Rx设置成0。指令执行成功后,会将独占访问标记位清除。

ldrex 负责拷贝数据和独占访问标记,strex 在根据标记存在与否拷贝数据和清除标记的过程是原子操作。

以下为 arch_spin_lock() 的代码进行逐行解释:

1. 将 lockval、newval、tmp 局部变量分别与 %0、%1、%2 编号关联(编号对应由编译器指定 CPU 的寄存器)

2. 将 &lock->slock 、(1 << TICKET_SHIFT) 分别与 %3、%4 编号关联(编号会由编译器指定 CPU 的寄存器),这里的 TICKET_SHIFT  含义是指明内部变量位宽,代码定义的是 16,表面是 16 位宽的变量,也就是说 %4 是与 1 << 16 关联,便于汇编指令计算。

3. 指定此内嵌的汇编指令会修改条件寄存器

4. ldrex 指令实现读取 %3(lock->slock) 里面的数据到 %0(lockval),也就是将入参 lock 的数据拷贝到局部变量 lockval 中,并标记 lock 所在内存的独占访问标记。这一步主要是记录当前锁的计数。

5. add 指令实现将 %0(lockval) + %4(1 << 16) ,结果放到 %1(newval),实现的效果等同于 newval= lockval.slock + (1 << 16)。这个步骤是根据 arch_spinlock_t  数据结构设计的,它内部是一个共同体,slock 与 tickets 使用的是相同的内存空间,slock 的低 16 位等同于 tickets.owner,高 16 位宽等同于 tickets.next。

6. strex 会先检查 %3(lock->slock) 这块内存的独占标记是否还在,如果不在则设置 %2(tmp) 为 1,说明已经被其它线程修改了,如果还在的话设置为 0,将 %1(newval) 数据覆盖到  %3(lock->slock) ,再将独占访问标记清除。这两步主要是借助中间变量 newval 对 next 计数进行自增后,更新到原 lock 里面。

7. teq 和 bne 指令实现判断如果 %2(tmp) 不等于 0,说明已经被其它线程修改了,重新再跳转到标签 1 执行,也就是重新跳转回第 4 个步骤继续执行,否则就继续往下执行。

8. 这里主要是不停的判断当前的当前锁的 owen 是否与当前的 next 相等,如果不相等则一直循环检查,这个步骤就是实现了我们所说的自旋功能。当其它持有锁的线程要对该锁进行解锁,解锁的操作会将 owen 自增,当 owen 与当前记录的 next 相等,就会让当前线程退出自旋。

9. 自旋退出了,就意味着拿到锁了,就可以访问临界区了。

三、自旋锁解锁的分析

static __always_inline void spin_unlock(spinlock_t *lock)
{
	raw_spin_unlock(&lock->rlock);
}

#define raw_spin_unlock(lock)		_raw_spin_unlock(lock)

void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
{
	__raw_spin_unlock(lock);
}

static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
	spin_release(&lock->dep_map, 1, _RET_IP_);
	do_raw_spin_unlock(lock);
     // 2 -------------------------------------
	preempt_enable();
}

void do_raw_spin_unlock(raw_spinlock_t *lock)
{
	debug_spin_unlock(lock);
	arch_spin_unlock(&lock->raw_lock);
}

static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
	smp_mb();
    // 1 -------------------------------------
	lock->tickets.owner++;
	dsb_sev();
}

1. 对 lock->tickets.owner 进行自增,这样可以让等待锁的线程退出自旋

2. 恢复内核对当前线程的抢占

推演自旋锁的工作过程:

从自旋锁刚进行初始化的状态来推演:lock->tickets.owner = 0,lock->tickets.next = 0

1. 线程A:开始申请锁:读取锁 lock 到临时变量 A.lockval 并设置该内存区域独占标记,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0

2. 线程B:开始申请锁:读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:

lock->tickets.owner =0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0

3. 线程A:计算锁序号:对 A.newval =  A.lockval.slock + (1<<16) = 0x10000,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval = 0x10000

4. 线程C:开始申请锁:读取锁 lock 到临时变量 lockvalC 并设置该内存区域独占标记,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0

5. 线程B:计算锁序号:对 B.newval =  B.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0,B.newval = 0x10000

6. 线程A:更新锁计数:检查该内存区域的独占访问标记存在,将 A.newval​​​​​​​ 覆盖到 lock->slock 并清除独占标记,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval​​​​​​​ = 0x10000

7. 线程A:锁持有检测:由于 A.lockval.tickets.next == A.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval​​​​​​​ = 0x10000

8. 线程B:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:

lock->tickets.owner =0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1​​​​​​​

9. 线程B:计算锁序号:对 B.newval​​​​​​​ =  B.lockval.slock + (1<<16) = 0x10000 + 0x10000,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

10. 线程B:更新锁计数:检查该内存区域的独占访问标记存在,将 B.newval​​​​​​​ 覆盖到 lock->slock 并清除独占标记,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

11. 线程B:锁持有检测:由于 B.lockval.tickets.next != B.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner

lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

12. 线程C:计算锁序号:对 C.newval​​​​​​​ =  C.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0,C.newval = 0x10000

13. 线程C:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 C.lockval 并设置该内存区域独占标记,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2

14. 线程C:计算锁序号:对 C.newval​​​​​​​ =  C.lockval.slock + (1<<16) = 0x20000 + 0x10000,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval​​​​​​​ = 0x30000​​​​​​​

15. 线程C:更新锁计数:检查该内存区域的独占访问标记存在,将 C.newval​​​​​​​ 覆盖到 lock->slock ​​​​​​​并清除独占标记,此时的状态:

lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval​​​​​​​ = 0x30000

16. 线程C:锁持有检测:由于 C.lockval.tickets.next != C.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner

lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2

至此,线程B 一直在等待 lock->tickets.owner ​​​​​​​等于 1,而 线程C 则一直在等待  lock->tickets.owner ​​​​​​​等于 2

17. 线程A:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1

lock->tickets.owner = 1,lock->tickets.next = 3

18. 线程B:锁持有检测:读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner,由于 B.lockval.tickets.next == B.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

lock->tickets.owner = 1,lock->tickets.next = 3,B.lockval.tickets.owner = 1,B.lockval.tickets.next = 1

19. 线程C:锁持有检测:继续读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next != C.lockval.tickets.owner,继续循环自旋。

lock->tickets.owner = 1,lock->tickets.next = 3,C.lockval.tickets.owner = 1,C.lockval.tickets.next = 2​​​​​​​

20. 线程B:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1

lock->tickets.owner = 2,lock->tickets.next = 3

21. 线程C:锁持有检测:读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next == C.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

lock->tickets.owner = 2,lock->tickets.next = 3,C.lockval.tickets.owner = 2,C.lockval.tickets.next = 2​​​​​​​

整个过程推演完毕,很巧妙的借助两个计数器和局部变量,实现等锁线程的有序排队,该思路也适用于应用程序开发。

自旋锁的原理总结:

1. 自旋锁是通过 ldrex、strex 来确保读写锁的计数器是原子操作的,这是 arm 芯片级实现的。

2. 在上锁的过程中,会有两处循环,第一处是汇编指令循环, 第二处是 C 语言的 while 循环,两个循环的意义不一样:

  • 汇编指令循环:实现对锁计数器的原子读写,确保得到的锁数据是最新的,锁的计数更新是准确无误的。
  • C语言的循环:实现锁在等待时的自旋功能,通过比较计数器,实现谁先等待锁,谁就先得到锁的有序排队。

以上两个循环的协同工作,前者实现原子操作和记录,后者实现了有序排队,完成了自旋锁的核心互斥功能。

本文参考:http://www.wowotech.net/kernel_synchronization/spinlock.html​​​​​​​