GHL's Notes/

细嗦Golang Mutex锁

Golang最大的特点就是Goroutine多协程高并发。
但只要一有多协程就可能会出现Data Race的状况。
如果不好好处理Data Race的问题,就会导致数据乱码等等未知错误。

而锁,就是避免这种情况的工具,所以在多线程环境中非常重要。golang自然考虑到这一点,搞了两种锁。

一种是Mutex,互斥锁
一种是RWMutex,读写互斥锁

而他们的实现都是基于CPU中的atomic指令操作(也叫原子指令)。

原子指令的实现,我就不细嗦了,大概就是等待一个读写完,还没有读写完出现竞争状况就把数据存入缓存,类似做了个队列。

不过那不是我们讨论的。

我现在是讨论Golang的Mutex锁。

Golang是如何实现Mutex的

相关算法

Golang Mutex锁的算法很简单,甚至不能叫算法,它简称为CAS(Compare And Swap)

什么叫CAS呢

其实你看它英文也很好理解。首先compare,也就是我比较一下这个值的状态,如果它满足我想要的,我就把它换成(Swap)新的值。

尽管很简单,但CAS却是最常见的无锁(lock-free)算法之一

golang也用代码解释了这个过程

if *addr == old {
*addr = new
return true
}
return false

相关库

runtime race
这是golang内部的一个race detect库。

什么叫race detect呢。顾名思义,就是检测有没有goroutine竞争的。

1.上锁

golang中维护了一个锁的全局状态变量(state)。
并规定如下状态参数

mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
 mutexWaiterShift = iota

解释一下,mutexLocked,就是上锁了
0,就是没锁。
而mutexWoken和mutexStarving都和Go调度器调度goroutine相关。
当一个goroutine取得锁后,它会牢牢护住这把锁,其他的goroutine抢不了,怎么办呢。go调度器就把他们暂时休眠,还能节省CPU资源。

锁的工作模式

Mutex有两种工作模式。
一种是normal,一种是starvation。

normal就是一般的上锁,然后另一个goroutine等待解锁
starvation则是另一个goroutine等待解释途中突然有新goroutine进来抢锁,我上面说过,go调度器已经将等待锁的goroutine休眠了,但是这个新goroutine刚进来,还是没被休眠的,它自然有更高的优先级去和之前到的goroutine抢锁,这样一来,就出现了不公平的情况。

为了保证公平,才有了starvation状态。
因此才会有mutexWoken和mutexStarving状态,这就是饥饿竞争状态。什么叫饥饿竞争呢,顾名思义,就是多个goroutine去抢着上锁,锁只有一把,不够用了,这就是饥饿竞争,在golang中,抢着上锁是非常常见的状况。

那么golang如何判断starvation呢。

非常简单。

当这个goroutine被Go调度器唤醒后,它开始去抢锁,结果发现怎么抢都抢不到,直到1ms后,它就认为有别的goroutine在和它竞争,于是转入starvation状态。

不多BB,直接贴代码

starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

什么意思呢,就是首先判断有没有进入starving,然后再判断等待时间是否超过了1ms

Mutex上锁分两步

第一步

第一次上锁,也就不存在竞争状况,那很简单啊,直接CAS,把锁状态值用atomic操作改成上锁状态。

因为是atomic,所以会有一个排队状况。第二个来的肯定会发现,啊,怎么上锁了。

贴代码。

if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
    return
}

race.Enabled是检查是否存在竞争状态,存在就用atomic.Load类似的函数(race.Acquire)去取得锁

但如果已经上锁了的话怎么办。

等。

Normal模式

我之前说到休眠,具体是如何实现呢。

通过自旋。什么叫自选?就是等啊等啊等啊,像转圈一样,自己旋转,很形象。

英文叫Spining

    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // Active spinning makes sense.
        // Try to set mutexWoken flag to inform Unlock
        // to not wake other blocked goroutines.
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }

它会尝试去用CAS去看看有没有解锁,没解锁,继续自旋休眠,但如果检测到有新goroutine进来抢锁,就会进入starvation状态,或者已经进入。并更新其值为mutexStarving(3,二进制11),怎么判断和得到的呢。old&mutexWoken这行代码说明什么,我们来假设以下,如果正位于锁定状态,那么锁状态值为1(二进制01),mutexWoken为2(二进制10),与操作后就是00。意思就是位于上锁,还没有解锁。但是mutexWaiterShift告诉它有人来和你竞争了。于是它将值做CAS, 01|10=11=3=mutexStarving。CAS作用应该是保护当前状态不被改变,确保其正确进入starvation模式

而runtime_canSpin函数还有一个功能,也就是判断自旋是否达到现在,iter为迭代次数

它的实现是这样的

func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq or on other Ps.
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

什么意思呢。大概就是如果单核心cpu不自旋,无论如何。多核心的话开始判断是否大于活跃自旋个数。
runq和P大概就是和调度器有关的东东了,暂时没去了解。

所以以下代码有两种情况。

第一,解锁了。

第二,进入了Starvation模式,开始抢锁竞争。

进入starvation或不需要自旋后进入以下代码。

            new := old
    if old&mutexStarving == 0 {
        new |= mutexLocked
    }
    if old&(mutexLocked|mutexStarving) != 0 {
        new += 1 << mutexWaiterShift
    }
    // The current goroutine switches mutex to starvation mode.
    // But if the mutex is currently unlocked, don't do the switch.
    // Unlock expects that starving mutex has waiters, which will not
    // be true in this case.
    if starving && old&mutexLocked != 0 {
        new |= mutexStarving
    }
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        if old&(mutexLocked|mutexStarving) == 0 {
            break // locked the mutex with CAS
        }
        // If we were already waiting before, queue at the front of the queue.
        queueLifo := waitStartTime != 0
        if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
        }
        runtime_SemacquireMutex(&m.sema, queueLifo, 1)
        starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
        old = m.state
        if old&mutexStarving != 0 {
            // If this goroutine was woken and mutex is in starvation mode,
            // ownership was handed off to us but mutex is in somewhat
            // inconsistent state: mutexLocked is not set and we are still
            // accounted as waiter. Fix that.
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                throw("sync: inconsistent mutex state")
            }
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            if !starving || old>>mutexWaiterShift == 1 {
                // Exit starvation mode.
                // Critical to do it here and consider wait time.
                // Starvation mode is so inefficient, that two goroutines
                // can go lock-step infinitely once they switch mutex
                // to starvation mode.
                delta -= mutexStarving
            }
            atomic.AddInt32(&m.state, delta)
            break
        }
        awoke = true
        iter = 0
    } else {
        old = m.state
    }

starvation判断过程先不细嗦,先细嗦解锁了。它是如何判断解锁的呢

    if old&mutexStarving == 0 {
        new |= mutexLocked
    }

这一行代码,mutexStarving二进制为11,也就是能和它与操作为0的只能是解锁状态(00),那么new的值会尝试被修改成01(mutexLocked),也就是准备抢锁

    if old&(mutexLocked|mutexStarving) != 0 {
        new += 1 << mutexWaiterShift
    }

这一行代码说明了什么。与(mutexLocked|mutexStarving)=11不等于0,意思是锁目前状态不为00,意思就是没有解锁,或者那么new会加上 1 << mutexWaiterShift=3,意思就是新增等待的,告诉正在准备下一轮自旋的goroutine马上进入starvation模式。

    if starving && old&mutexLocked != 0 {
        new |= mutexStarving
    }

这一行就开始判断是否已经为starvation模式了。

代码开始执行到另一个CAS。确保目前状态能被new顺利替换。替换成功,如果成功抢锁就直接break,跳出循环。但如果替换失败呢,说明目前原子操作被别的goroutine抢了,那么就记录下新状态,下一轮循环继续。
抢锁失败,则开始记录waitTime,剩下的就是Starvation模式了。

Starvation模式

前面剖析代码途中,有如下参数我没有嗦。
awoke。

这个awoke是干啥用的捏。

其实是告诉自旋的goroutine,你锁要被抢了。

靠,有人抢我锁?我不干他丫的。

    if awoke {
        // The goroutine has been woken from sleep,
        // so we need to reset the flag in either case.
        if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
        }
        new &^= mutexWoken
    }

看到这行代码。它要开始狂暴了。&^是先与后异或操作。

什么意思呢。大概就是重置吧。

然后就是starvation饥饿竞争了。

首先记录下waitTime。第一个进入先-8。虽然goroutine是无需的,但是此时awoke,也就是刚睡醒的goroutine,发现有人要抢他锁后,他已经去CAS换掉了此时的状态值,如果没换到就继续尝试换,不行就继续自旋。

换掉状态值后,通过runtime_SemacquireMutex把该睡醒的goroutine优先级调到最高。
这样新来的goroutine便抢不过它了。它便有机会去把delta加上8。但万一有goroutine抢得过它呢。它就继续滚回去自旋了。再重设,再来抢。

留下一条评论

暂无评论