专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

golang核心原理-协程调度时机

golang调度模型

模型总揽

120_1.png

核心实体

Goroutines (G)

golang调度单元,golang可以开启成千上万个g,每个g可以理解为一个任务,等待被调度。其存储了goroutine的执行stack信息、goroutine状态以及goroutine的任务函数等。g只能感知到p,下文说的m对其透明的。

OSThread (M)

系统线程,实际执行g的狠角色,但m并不维护g的状态,一切都是由幕后黑手p来控制。

Processor (P)

维护m执行时所需要的上下文,p的个数通常和cpu核数一致(可以设置),代表gorotine的并发度。其维护了g的队列。

实体间的关系

一图胜千言,直接看这个经典的图

120_2.png

调度本质

即schedule函数,通过调度,放弃目前执行的g,选择一个g来执行。选择算法不是本文重点,这里不做过多讲述。

切换时机

  • 会阻塞的系统调用,比如文件io,网络io;
  • time系列定时操作;
  • go func的时候, func执行完的时候;
  • 管道读写阻塞的情况;
  • 垃圾回收之后。
  • 主动调用runtime.Gosched()

调度时机分析

阻塞性系统调用

系统调用,如read,golang重写了所有系统调用,在系统调用加入了调度逻辑 拿read举例

/usr/local/go/src/os/file.go:97

// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
// EOF is signaled by a zero count with err set to io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
    if f == nil {
        return 0, ErrInvalid
    }
    n, e := f.read(b)
    if n == 0 && len(b) > 0 && e == nil {
        return 0, io.EOF
    }
    if e != nil {
        err = &PathError{"read", f.name, e}
    }
    return n, err
}

嵌套到几层,就不全部贴出来,跟到底是如下函数:

func read(fd int, p []byte) (n int, err error) {
    var _p0 unsafe.Pointer
    if len(p) > 0 {
        _p0 = unsafe.Pointer(&p[0])
    } else {
        _p0 = unsafe.Pointer(&_zero)
    }
    r0, _, e1 := Syscall(SYS_READ, uintptr(fd), uintptr(_p0), uintptr(len(p)))
    n = int(r0)
    if e1 != 0 {
        err = errnoErr(e1)
    }
    return
}

func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)

Syscall是汇编实现

TEXT    ·Syscall(SB),NOSPLIT,$0-56
    BL  runtime·entersyscall(SB)
    MOVD    a1+8(FP), R3
    MOVD    a2+16(FP), R4
    MOVD    a3+24(FP), R5
    MOVD    R0, R6
    MOVD    R0, R7
    MOVD    R0, R8
    MOVD    trap+0(FP), R9  // syscall entry
    SYSCALL R9
    BVC ok
    MOVD    $-1, R4
    MOVD    R4, r1+32(FP)   // r1
    MOVD    R0, r2+40(FP)   // r2
    MOVD    R3, err+48(FP)  // errno
    BL  runtime·exitsyscall(SB)
    RET
ok:
    MOVD    R3, r1+32(FP)   // r1
    MOVD    R4, r2+40(FP)   // r2
    MOVD    R0, err+48(FP)  // errno
    BL  runtime·exitsyscall(SB)
    RET

可以看到,进入系统调用时,是调用entersyscall,当离开系统调用,会运行exitsyscall

// Standard syscall entry used by the go syscall library and normal cgo calls.
//go:nosplit
func entersyscall(dummy int32) {
    reentersyscall(getcallerpc(unsafe.Pointer(&dummy)), getcallersp(unsafe.Pointer(&dummy)))
}

func reentersyscall(pc, sp uintptr) {
    _g_ := getg()

    // Disable preemption because during this function g is in Gsyscall status,
    // but can have inconsistent g->sched, do not let GC observe it.
    _g_.m.locks++

    // Entersyscall must not call any function that might split/grow the stack.
    // (See details in comment above.)
    // Catch calls that might, by replacing the stack guard with something that
    // will trip any stack check and leaving a flag to tell newstack to die.
    _g_.stackguard0 = stackPreempt
    _g_.throwsplit = true

    // Leave SP around for GC and traceback.
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
    casgstatus(_g_, _Grunning, _Gsyscall)
    if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
        systemstack(func() {
            print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
            throw("entersyscall")
        })
    }

    if trace.enabled {
        systemstack(traceGoSysCall)
        // systemstack itself clobbers g.sched.{pc,sp} and we might
        // need them later when the G is genuinely blocked in a
        // syscall
        save(pc, sp)
    }

    if atomic.Load(&sched.sysmonwait) != 0 { // TODO: fast atomic
        systemstack(entersyscall_sysmon)
        save(pc, sp)
    }

    if _g_.m.p.ptr().runSafePointFn != 0 {
        // runSafePointFn may stack split if run on this stack
        systemstack(runSafePointFn)
        save(pc, sp)
    }

    _g_.m.syscalltick = _g_.m.p.ptr().syscalltick
    _g_.sysblocktraced = true
    _g_.m.mcache = nil
    _g_.m.p.ptr().m = 0
    atomic.Store(&_g_.m.p.ptr().status, _Psyscall)
    if sched.gcwaiting != 0 {
        systemstack(entersyscall_gcwait)
        save(pc, sp)
    }

    // Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched).
    // We set _StackGuard to StackPreempt so that first split stack check calls morestack.
    // Morestack detects this case and throws.
    _g_.stackguard0 = stackPreempt
    _g_.m.locks--
}

进入系统调用时,p和m分离,当前运行的g状态变为_Gsyscall。

_Gsyscall恢复时机:

1、 当m执行完,调用exitsyscall重新和之前的p绑定,其中调度的还是schedule函数;
2、 sysmon线程,发现该p一定时间没有执行,会其分配一个新的m。此时进入调度。

time定时类操作

都拿time.Sleep举例

// Sleep pauses the current goroutine for at least the duration d.
// A negative or zero duration causes Sleep to return immediately.
func Sleep(d Duration)

实际定义在runtime
// timeSleep puts the current goroutine to sleep for at least ns nanoseconds.
//go:linkname timeSleep time.Sleep
func timeSleep(ns int64) {
    if ns <= 0 {
        return
    }

    t := getg().timer
    if t == nil {
        t = new(timer)
        getg().timer = t
    }
    *t = timer{}
    t.when = nanotime() + ns
    t.f = goroutineReady
    t.arg = getg()
    lock(&timers.lock)
    addtimerLocked(t)
    goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2)
}

goparkunlock 最终调用gopark

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    mcall(park_m)
}

mcall(fn) 是切换到g0,让g0来调用fn,这里我们看下park_m定义 park_m

func park_m(gp *g) {mcall(park_m) 
    _g_ := getg()

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()

    if _g_.m.waitunlockf != nil {
        fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}

可以看到,先把状态转化为_Gwaiting, 再进行了一次schedule 针对_Gwaiting的g,需要调用goready,才能恢复。

新起一个协程和退出

新开一个协程,g状态会变为_GIdle,触发调度。当协程执行完,会调用goexit1 此时状态变为_GDead _Gdead可以被复用,或者被gc清除。

管道阻塞

chansend即c<-chanel的实现

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
    }

    ........
    // 省略无关代码
    ........

    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

可以看到,实际还是调用goparkunlock->gopark,来进行调度。

gc之后

stw之后,会重新选择g开始执行。此处不对垃圾回收做过多扩展。

主动调用runtime.Gosched()

没有找到非要调用runtime.Gosched的场景,主要作用还是为了调试,学习runtime吧

// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
//go:nosplit
func Gosched() {
    mcall(gosched_m)
}

第一步就将环境切换到g0,然后执行一个叫gosched_m的函数

// Gosched continuation on g0.
func gosched_m(gp *g) {
    if trace.enabled {
        traceGoSched()
    }
    goschedImpl(gp)
}

func goschedImpl(gp *g) {
    status := readgstatus(gp)
    if status&^_Gscan != _Grunning {
        dumpgstatus(gp)
        throw("bad g status")
    }
    casgstatus(gp, _Grunning, _Grunnable)
    dropg()
    lock(&sched.lock)
    globrunqput(gp)
    unlock(&sched.lock)

    schedule()
}

可以看到,当前g被设置为_Grunnable,放入执行队列。然后调用schedule,选择一个合适的g进行执行。

总结

golang协程调度时机主要是阻塞性操作开始,结束。研究每个场景相关代码,即可对golang有更深的理解。这里也分享一个阅读源码的小经验,每次带着一个特定问题去寻找答案,比如本文的调度时机,后面再看调度算法,垃圾回收,这样每次能忽略无关因素,通过多个不同的主题,整个框架会越来越完善。

参考文章

A complete journey with Goroutines

Go’s work-stealing scheduler

文章永久链接:https://tech.souyunku.com/38289

未经允许不得转载:搜云库技术团队 » golang核心原理-协程调度时机

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们