互斥锁的定义

  type Mutex struct {
  	state int32
  	sema  uint32
  }

一个 sema,背后实际上 是一个 休眠队列,可以看下上篇。
一个state,这个状态 分为4个部分。
后三位 各自代表一个状态。 前29位代表最大可等待协程的个数。

state的结构

  locked  是否加锁 1加锁,0 正常   占1位
  woken  是否醒来                       占1位
  starving 是否饥饿模式                占1位
  waiterShift 等待的数量               占29位

底层的定义,下面看代码时候,会说明。

正常模式

加锁

假设现在来了2个g,都想加锁,但是只有一个能成功,2个都通过 atomic.CompareAndSwapInt32(lock, 0 ,1) 伪代码去更改 locked 位置。

改成功的g获取了锁,没成功的g先自旋几次,然后如果还是未获取到锁,则进入sema休眠队列。

未成功的g进入休眠队列,把waiterShift加1。

通过这个结论,看代码验证下:

mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota

func (m *Mutex) Lock() {
     // 先给state的最后一位 写 1
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { 
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		} 
        写上了, 加锁成功,直接返回。
		return
	}
    // 写不上进入这个方法
	m.lockSlow() 
}

// 不是完整代码,只截取和这里相关的部分
func (m *Mutex) lockSlow() {
	starving := false
	iter := 0
	old := m.state
	for {
		//  是否是饥饿模式 是否还能自旋 iter会记录自旋次数
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			runtime_doSpin()
			iter++ // 自旋次数加1
			old = m.state
			continue
		}
		// 自旋一定次数后
		new := old
		// 判断是否是饥饿模式
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 进入了休眠 ,不会执行下面的语句了。直到被唤醒
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs		
	}
}

小结:

尝试CAS直接加锁
若无法直接获取,进行多次自旋尝试
多次尝试失败,进入sema队列休眠

如果这个时候,再来一个:

也是同样,进入sema的休眠队列。

解锁

解锁的这个g,除了修改locked的值,还需要去判断waiterShift,有没有协程在等,如果有,要去唤醒一个协程。

看代码:

func (m *Mutex) Unlock() {
     // 减去1,发现state的值,不是0,说明有协程在等
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {

	if new&mutexStarving == 0 { // 这里是讲了 非饥饿模式
		old := new
		for {
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1) // 从 sema中释放一个 g
				return
			}
			old = m.state
		}
}

正常模式比较好理解:

如果一个g先加锁成功,则别的g进来后,先自旋等待一下,然后进入sema休眠队列。
等到g解锁时候,回去释放sema休眠队列中的一个g,这个队列是平衡树。

mutex正常模式:自旋加锁+sema休眠等待

饥饿模式

假设g解锁后,释放了一个g出来。现在 mutexlocked位置为0。 这个时候,又来了2个g,那刚刚释放的g不一定能竞争得过来的这两个g。

为了解决这个问题,go设置了锁饥饿模式:

当前协程等待锁的时间超过了 1ms,切换到饥饿模式

饥饿模式中,不自旋,新来的协程直接sema休眠

饥饿模式中,被唤醒的协程直接获取锁

没有协程在队列中继续等待时,回到正常模式

把starving置为1

新过来的协程直接休眠,唤醒的协程直接获得锁

代码: 有点长 要结合里面的for循环,看两遍
func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		//  饥饿模式不自旋了
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			runtime_doSpin()
			iter++ // 自旋次数加1
			old = m.state
			continue
		}
	new := old
	// 判断是否是饥饿模式
	if old&mutexStarving == 0 {
		new |= mutexLocked
	}
	// 如果是饥饿模式,给waiterShift 加1
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// starving 现在为 true了
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}

		// 写入饥饿模式的状态 new 现在为饥饿模式了
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 新进来的g,直接休眠
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)

			//starvationThresholdNs = 1e6  1毫秒
			// 唤醒的g,从这里开始执行, 判断g等待的时间 ,超过了1毫秒 starving 置为 true
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {

				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {

					delta -= mutexStarving
				}
				// 直接改为 g已经获取锁的值,直接写入
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
}

总结:

锁竞争严重时,互斥锁进入饥饿模式

饥饿模式没有自旋等待,有利于公平,见过有人叫 公平锁 。

使用经验

1.  减少锁的使用时间,lock和unlock 之间,业务要精简,只放必须的代码。 

2.  善用defer确保锁的释放。避免忘记释放 例如走到if这样分支,最后没有释放锁。

思考一个问题:

加锁、开锁其实就是用的 atomic 操作一个 值,开发者也能实现,为什么还要用锁 ?

结合上几篇讲的 sema 和 协程抢占的内容,这样做是能够做到锁住一段代码,但是,未获取锁的g,无法做到休眠、唤醒的功能。 所以,系统才的 mutex 采用 atomic和 sema的结合。