目录

Go 源码之互斥锁 Mutex

一、总结

  • 锁不可复制:拷贝互斥锁同时会拷贝锁的状态,容易造成死锁

  • 不是可重入锁,并且一个协程上锁,可以由另外一个协程解锁

  • mutex 锁结构

    • state:32位,锁状态,bitmap 设计,

      • 1 mutexLocked :低1位 锁定状态

      • 2 mutexWoken :低2位,从正常模式被唤醒

      • 3 mutexStarving 是低3位,进入饥饿模式

      • 4.mutexWaiterShift 剩下 29 位,当前互斥锁上等待者的数量

    • sema:协程等待信号量,用于控制goroutine的阻塞与唤醒

  • 上锁

    /img/go-source-code-mutex/1.png
    mutex上锁

  • 解锁

    /img/go-source-code-mutex/2.png
    mutex解锁

    • 锁的两种模式

      • 正常模式

          在正常模式下等待的 g 按照先进先出的方式获取锁

          新 g 会 自旋 ,并且和刚唤醒的 g 竞争锁,新 g 会优先获得锁,会导致刚被唤起的 g 一直获取不到锁,

          这种情况的出现会导致线程长时间被阻塞下去,所以Go语言在1.9中进行了优化,引入了 饥饿模式

      • 饥饿模式

          为了解决等待 goroutine 队列的长尾问题(饿死)

          当 g 超过 1ms 没有获取到锁,就会将当前互斥锁切换到饥饿模式

          等待的 g 按照先进先出的方式获取锁

          饥饿模式下,新进来的 G 不会参与抢锁也不会进入自旋状态,会直接进入等待队列的尾部。

          在这种情况下,这个被唤醒的 goroutine 会优先加入到等待队列的前面,防止饿死

          如果一个 goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式

二、源码

(一)Mutex

/src/sync/mutex.go

const (
   mutexLocked = 1 << iota                                     // 1 mutex 锁定状态
   mutexWoken                                                     // 2 mutex 从正常模式被唤醒
   mutexStarving                                                 // 4 mutex进入饥饿状态
   mutexWaiterShift = iota                                     // 3 当前互斥锁上等待者的数量
)

type Mutex struct {                                                 // Mutex 不可被复制
    state int32 // 32位,锁状态,bitmap 设计,低三位表示锁的状态,剩下 29 位表当前互斥锁上等待者的数量
    sema  uint32 // 缓冲信号量,用来控制等待goroutine的阻塞休眠和唤醒,可以理解为一个队列
}
mutex 锁 state

(二)Lock

  • 直接 CAS 进行原子操作上锁,成功则返回,失败则执行 lockSlow()

  • 上锁失败,执行 lockSlow(),内部持续 for 循环

    • 支持自旋(正常模式、cpu空闲、自旋次数<4),则进入自旋

    • 不支持自旋:两种模式

      • 正常模式:加入尾部队列,按照先进先出的方式加入队列等待获取锁

      • 饥饿模式:当 goroutine 超过 1ms 没有获取到锁,就会将当前互斥锁切换到饥饿模式,如果当前 goroutine 存在队列中,则移动到队头,然后按照先进先出的方式获取锁,防止饿死

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {     // 直接 CAS 修改锁的状态,将 state=0 改为 1 
        return
    }
    m.lockSlow()                                                            // CAS 无法直接上锁,则执行慢路径
}
func (m *Mutex) lockSlow() {
    var waitStartTime int64                                             // 用来计算waiter的等待时间
    starving := false                                                     // 是否是饥饿模式
    awoke := false                                                         // 是否唤醒
    iter := 0                                                             // 自旋次数
    old := m.state                                                         // 旧的锁状态
    for {
    // 支持自旋:锁不是饥饿模式 && cpu 支持继续自旋(<=4次)
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      // g不是唤醒状态 && 
      // 没有其他正在唤醒的goroutine && 
      // 等待队列中有正在等待的goroutine
      // && 尝试将当前锁的低2位的Woken状态位设置为1,表示已被唤醒, 这是为了通知在解锁Unlock()中不再唤醒其他waiter
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true                                     // 设置当前goroutine唤醒成功
            }
            runtime_doSpin()                                    // 进行自旋
            iter++                                              // 自旋次数++
            old = m.state                                       //更新锁状态
            continue
        }
        new := old

        if old&mutexStarving == 0 { 
            new |= mutexLocked                          // 非饥饿模式下进行加锁
        }

        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift                // 等待着数量+1
        }


        if starving && old&mutexLocked != 0 {
            new |= mutexStarving                        // 加锁的情况下切换为饥饿模式
        }

        if awoke {                                     // goroutine 唤醒的时候进行重置标志
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
         if atomic.CompareAndSwapInt32(&m.state, old, new) {     //设置新的状态
            if old&(mutexLocked|mutexStarving) == 0 {
                break 
            }
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {                       // 判断是不是第一次加入队列
                waitStartTime = runtime_nanotime()         // 如果之前就在队列里面等待了,加入到对头
            }        
            runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 阻塞等待
          // 检查锁是否处于饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
          // 如果锁处于饥饿状态,直接抢到锁
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
              //设置标志,进行加锁并且waiter-1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                //如果是最后一个的话清除饥饿标志
              if !starving || old>>mutexWaiterShift == 1 {
                 //退出饥饿模式                
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

(三)Unlock

  • 直接 CAS 进行原子操作解锁,成功则返回,失败则执行 unlockSlow()

  • 解锁失败,执行 unlockSlow()

    • 正常模式:

      • 如果当前队列中没有waiter,只有自己本身,直接解锁返回

        • 如果当前队列中有waiter,解锁后唤醒下个等待者 runtime_Semrelease(&m.sema, false, 1)
    • 饥饿模式

      • 饥饿模式直接将锁的控制权交给队列中队头等待的waiter
func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)  // 直接 CAS 修改锁的状态 
    if new != 0 {
        // 不等于0说明解锁失败,
        m.unlockSlow(new)
    }
}
func (m *Mutex) unlockSlow(new int32) {
        //解锁一个未加锁的Mutex会报错
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // 正常模式下,没有waiter或者在处理事情的情况下直接返回
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            //如果有等待者,设置mutexWoken标志,waiter-1,更新state
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式下会直接将mutex交给下一个等待的waiter,让出时间片,以便waiter执行
        runtime_Semrelease(&m.sema, true, 1)
    }
}

三、常见问题

1. sema 字段的含义作用

在正常模式下,一个goroutine先通过自旋方式获得锁,如果还不能获取锁,则通过信号量进行排队等待

(所有等待者都会按照先入先出的顺序排队)但是当被唤醒后,第一个等待者并不会立即获得锁,而是需要和那些正在处于自旋阶段,尚未加入到队列中的routine竞争,如果抢不到锁的话,重新插入到队列的头部,而当这个goroutine加锁等待的时间超过了1ms之后,它会把mutex由正常模式切换到饥饿模式,这种模式下锁的所有权直接传递给头部的routine。后来者不会自旋,也不会尝试获取锁,直接加到队列尾部

2. 什么是CAS,什么是原子操作

CAS(Compare and Swap)比较并交换,比较两个值,如果他们两者相等就把他们交换。这是一个由CPU硬件提供并实现的原子操作。

原子操作:操作系统提高的锁机制来保证操作的原子性和线程安全性。这种锁机制可以使执行原子操作的 CPU 独占内存总线或者缓存,并防止其他 CPU 对同一内存地址进行读写操作,从而避免了数据竞争的问题

具体来说,在执行原子操作时,CPU 会向内存总线或者缓存发送锁请求信号,然后等待锁授权。一旦锁授权成功,CPU 就可以将操作的结果写入内存,然后释放锁。其他 CPU 在锁被释放之前不能对同一内存地址进行读写操作,从而保证了操作的原子性和线程安全性。

需要注意的是,原子操作增加 CPU 的开销和内存带宽的消耗

3. 锁的正常模式和饥饿模式?

正常模式(非公平锁) 正常模式下,所有等待锁的 G 按照 FIFO(先进先出)顺序等待 唤醒的 G 不会直接拥有锁,
而是会和新请求 goroutine 竞争锁 新请求 G 更易抢占:因为它正在 CPU上执行,所以刚唤醒的 G 有很大可能在锁竞争中失败
饥饿模式(公平锁) 为了解决等待 goroutine 队列的长尾问题(饿死)饥饿模式下,新进来的 G 不会参与抢锁也不会进入自旋状态,
会直接进入等待队列的尾部。在这种情况下,这个被唤醒的 goroutine 会优先加入到等待队列的前面,防止饿死
正常模式 » 饥饿模式 ● 当一个goroutine 等待锁时间超过 1 毫秒 ● 当前 队列只剩下一个 goroutine
饥饿模式 » 正常模式 ● G 的执行时间小于 1 ms ● 等待队列已经全部清空了
饥饿模式性能下降 ● 增加锁的竞争:优先考虑最近的 G,其他争夺锁 G 增多 ● 增加了锁的持有时间:优先考虑最近的 G,其他 G 等待更长
总结 两种模式,正常模式下的性能是最好的,goroutine 可以连续多次获取锁,饥饿模式解决了取锁公平的问题,
但是性能会下降,这其实是性能和公平的一个平衡模式;无论哪种模式,
所有等待锁的 G 按照 FIFO(先进先出)顺序等待 unlock 释放锁然后队头 g 获取锁只是正常模式下:
新 g 会 自旋 ,并且和刚唤醒的 g 竞争锁,新 g 会优先获得饥饿模式下:新 g 不会自旋,不参与抢锁,直接添加到队尾

4. 为什么锁不可复制

因为互斥锁没有绑定 gid,复制锁会复制锁的状态,容易出现死锁

5. 什么情况下mutex会从饥饿模式变成正常模式呢?

如果当前 goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式。

如果当前 goroutine 是互斥锁最后一个waiter,或者等待的时间小于 1ms,互斥锁切换回正常模式。

6. goroutine能进入自旋的条件

  • 当前互斥锁处于正常模式,不处于饥饿模式

  • 积累的自旋次数小于最大自旋次数(active_spin=4

  • cpu 核数大于 1

  • 有空闲的 P

  • 当前 goroutine 所挂载的 P 下,本地待运行队列为空

//go:linkname sync_runtime_canSpin sync.runtime_canSpin
func sync_runtime_canSpin(i int) bool {
  // active_spin = 4
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}