Go 運(yùn)行起來是怎樣的,你知道嗎?
當(dāng)我們使用一門語言或一個(gè)軟件時(shí),我們都是面向 API 或文檔來使用它們的,很多時(shí)候我們更關(guān)注的是如何使用它們來解決業(yè)務(wù)的問題,往往不需要了解它具體是如何運(yùn)行的,比如它說可以通過 read 來讀取一個(gè)文件,通過 accept 來獲取一個(gè) TCP 連接,當(dāng)我們需要時(shí)按需調(diào)用就行。但是了解運(yùn)行時(shí)的細(xì)節(jié)不僅有助于我們更了解相關(guān)的技術(shù),而且有助于我們解決碰到的問題,比如之前在 Libuv 中存在慢 IO(DNS 解析)太多導(dǎo)致快 IO(文件 IO)無法執(zhí)行的問題,從而影響了軟件的運(yùn)行性能。本文主要介紹 Go 運(yùn)行時(shí)的一些細(xì)節(jié),但是細(xì)節(jié)太多太復(fù)雜,無法一一描述。
了解 Go 的運(yùn)行時(shí)細(xì)節(jié)前先看一下一些著名軟件的情況。
Redis
Redis 是一個(gè)基于事件驅(qū)動(dòng)+非阻塞 IO 的單線程應(yīng)用。
- 在啟動(dòng)后會(huì)啟動(dòng)一個(gè)服務(wù)器并把服務(wù)器對應(yīng)的 fd 注冊到事件驅(qū)動(dòng)模塊中,開始事件循環(huán)。
- 當(dāng)連接到來時(shí)就會(huì)收到讀事件,然后通過 accept 獲取一個(gè)新的 socket 并把該 socket 和讀寫事件注冊到事件驅(qū)動(dòng)模塊中。
- 當(dāng)數(shù)據(jù)到來時(shí)調(diào) read 讀取。
- 解析并處理請求。
- 調(diào)用 write 返回?cái)?shù)據(jù)。 這是 Redis 的常見的執(zhí)行流程。但是除此之外,還有一些額外的邏輯。
- 通過子線程處理數(shù)據(jù)在內(nèi)存和硬盤間的交換。
- 通過子進(jìn)程進(jìn)行 AOF 重寫和 RDB。
- 通過子線程刷 AOF 數(shù)據(jù)到硬盤。
- 維護(hù)一個(gè)定時(shí)器數(shù)據(jù)結(jié)構(gòu),在每輪中判斷過期的定時(shí)器,通過事件驅(qū)動(dòng)模塊的阻塞時(shí)間保證定時(shí)器的按時(shí)執(zhí)行。 6.0 后 Redis 甚至把網(wǎng)絡(luò) IO 的讀寫也放到了子線程,但是整體來看執(zhí)行的流程還是比較好理解的。
Nginx
Nginx 是一個(gè)基于事件驅(qū)動(dòng)+非阻塞 IO 的單線程應(yīng)用。但是 Nginx 可以啟動(dòng)多個(gè)子進(jìn)程,因?yàn)?Ngnix 和 Redis 的場景不一樣,Redis 是在進(jìn)程的內(nèi)存中維護(hù)數(shù)據(jù)的,多進(jìn)程很難維護(hù)進(jìn)程間的數(shù)據(jù)同步和一致性,除非是每個(gè)進(jìn)程維護(hù)不同的數(shù)據(jù)集,按 key 進(jìn)行哈希讀寫,類似集群模式。而 Nginx 是無狀態(tài)的,可以橫行擴(kuò)容最大化利用資源,在每個(gè)子進(jìn)程內(nèi),Nginx 和 Redis 的架構(gòu)差不多,主體流程也是啟動(dòng)一個(gè)服務(wù)器,然后啟動(dòng)事件循環(huán),處理網(wǎng)絡(luò) IO 事件和定時(shí)器,再通過線程池處理一些耗時(shí)和阻塞式的操作,如文件 IO 的。多進(jìn)程帶來的一個(gè)問題是多個(gè)進(jìn)程需要監(jiān)聽一個(gè)端口,所以需要解決多進(jìn)程監(jiān)聽同一個(gè)端口和處理驚群問題,早期的 Nginx 是通過共享一個(gè) socket + 自己解決驚群問題,現(xiàn)在已經(jīng)支持通過操作系統(tǒng)的 REUSEPORT 特性。
Node.js
Node.js 是一個(gè)基于事件驅(qū)動(dòng)+非阻塞 IO 的單線程應(yīng)用,架構(gòu)上是由單線程執(zhí)行事件循環(huán)+線程池組成。Node.js 支持創(chuàng)建多進(jìn)程,每個(gè)進(jìn)程內(nèi)支持創(chuàng)建多個(gè)子線程,每個(gè)子線程都是一個(gè)獨(dú)立的事件循環(huán)并共享線程池。進(jìn)程間監(jiān)聽端口支持共享 socket、文件描述符傳遞和 REUSEPORT 三種模式。另外 Node.js 已經(jīng)支持異步 IO io_uring。
Go
Go 是一個(gè)基于事件驅(qū)動(dòng)+非阻塞 IO 的多線程應(yīng)用。相對前面幾個(gè)軟件來說,Go 的底層并不是簡單的注冊事件,執(zhí)行回調(diào)那么簡單,Go 運(yùn)行時(shí)的流程和前面介紹的幾個(gè)軟件有很大的不同。
- 實(shí)現(xiàn)了協(xié)程,并通過 n:m 模式原生利用了多核能力。
- 通過 hadnoff 機(jī)制實(shí)現(xiàn)系統(tǒng)調(diào)用等阻塞線程的操作,而不是通過線程池。
- 支持協(xié)作式和搶占式調(diào)度,避免單個(gè)協(xié)程影響整體系統(tǒng)的性能。
- 支持棧自動(dòng)擴(kuò)所容。
- 支持以同步的方式寫異步代碼,而不是回調(diào)(Node.js 也支持,但是不徹底)。
下面看一下 Go 是如何實(shí)現(xiàn)這些能力的。
啟動(dòng)過程
TEXT _rt0_386(SB),NOSPLIT,$8
JMP runtime·rt0_go(SB)
TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
// 創(chuàng)建主協(xié)程
PUSHL $runtime·mainPC(SB) // entry
CALL runtime·newproc(SB)
POPL AX
// 開始調(diào)度
CALL runtime·mstart(SB)
CALL runtime·abort(SB)
RETGo 啟動(dòng)時(shí),初始化完數(shù)據(jù)結(jié)構(gòu)后,就以 runtime·mainPC(runtime·main)為參數(shù),調(diào)用 runtime·newproc 創(chuàng)建了第一個(gè)協(xié)程,可以簡單理解為 Go 內(nèi)部維護(hù)了一個(gè)協(xié)程隊(duì)列,接著調(diào) runtime·mstart 開始調(diào)度協(xié)程的執(zhí)行,可以簡單理解為從協(xié)程隊(duì)列中選擇一個(gè)就緒的協(xié)程執(zhí)行。
TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
CALL runtime·mstart0(SB)
RET // not reachedruntime·mstart 繼續(xù)調(diào) runtime·mstart0。
func mstart0() {
mstart1()
}
func mstart1() {
// 注冊信號處理函數(shù),實(shí)現(xiàn)搶占式調(diào)度
if gp.m == &m0 {
mstartm0()
}
// 開始調(diào)度
schedule()
}因?yàn)楝F(xiàn)在只有剛才創(chuàng)建的主協(xié)程,所以自然就會(huì)調(diào)度主協(xié)程執(zhí)行,主協(xié)程代碼如下。
func main() {
mp := getg().m
// 啟動(dòng) sysmon 線程
systemstack(func() {
newm(sysmon, nil, -1)
})
// 開始 gc 協(xié)程
gcenable()
// 執(zhí)行用戶的 main 函數(shù)
fn := main_main
fn()
}主協(xié)程啟動(dòng)了一個(gè) sysmon 線程(后面介紹)和一個(gè) gc 相關(guān)的協(xié)程,最后執(zhí)行用戶的 main 函數(shù),這樣 Go 程序就執(zhí)行起來了,比如下面的例子。
package main
import "net"
func main() {
listener, _ := net.Listen("tcp", ":8080")
for {
conn, _ := listener.Accept()
go func() {
conn.Read(...)
conn.Write(...)
conn.Close()
}()
}
}當(dāng)調(diào)用 Accept 時(shí),主協(xié)程就阻塞了,但是主線程并沒有阻塞,這時(shí)候主線程會(huì)執(zhí)行其他任務(wù),因?yàn)檫@時(shí)候沒有其他任務(wù)需要執(zhí)行,所以主線程會(huì)阻塞在事件驅(qū)動(dòng)模塊等待連接的到來,我們?nèi)绻陂_頭加上以下代碼,可以看到輸出,說明主線程沒有阻塞。
time.AfterFunc(1*time.Second, func() {
println("1 seconds later")
})以同步方式寫異步代碼
我們知道操作系統(tǒng)的 accept/read/write 等系統(tǒng)調(diào)用在不滿足條件的情況默認(rèn)是會(huì)引起線程阻塞的,那么為什么 Go 里并不會(huì)引起線程阻塞,而僅僅是引起協(xié)程阻塞呢?這就是 Go 的一個(gè)特點(diǎn):以同步方式寫異步代碼。這種方式利于編寫代碼和理解代碼,比如在 Node.js 中,我們需要接收一個(gè) TCP 連接上的數(shù)據(jù)需要通過事件回調(diào)的方式來寫。
const socket = net.connect(...);
socket.on('data', function(data) {});這種方式讓我們很難理解代碼的執(zhí)行路徑,尤其是回調(diào)里又嵌套回調(diào)時(shí)就更復(fù)雜了,雖然 Promise 可以一定程度上緩解這個(gè)問題,但是 Node.js 從架構(gòu)上就是基于事件回調(diào)的,很多地方還是避免不了異步回調(diào)的寫法。在 Go 中,寫法就非常簡單,其底層使用的是非阻塞 IO,再結(jié)合協(xié)程切換機(jī)制實(shí)現(xiàn)的。接下來以 Read 為例,看看具體的實(shí)現(xiàn)。
func (c *conn) Read(b []byte) (int, error) {
n, err := c.fd.Read(b)
return n, err
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
return n, wrapSyscallError(readSyscallName, err)
}
func (fd *FD) Read(p []byte) (int, error) {
// 獲取鎖
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
// 判斷是否超時(shí)或錯(cuò)誤
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
for {
// 以非阻塞方式執(zhí)行系統(tǒng)調(diào)用 read
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
// 沒有數(shù)據(jù)并且是 IO 多路復(fù)用模塊支持監(jiān)聽的 fd 類型
if err == syscall.EAGAIN && fd.pd.pollable() {
// 阻塞協(xié)程
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}
// pollDesc 是對一個(gè) fd、事件和關(guān)聯(lián)的協(xié)程的封裝
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
// 把 pollDesc 切換成 pdWait 狀態(tài)
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
}
gopark(netpollblockcommit, unsafe.Pointer(gpp), ...)
// 事件就緒后改成 pdNil 狀態(tài)
old := gpp.Swap(pdNil)
return old == pdReady
}
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, ...) {
mp.waitlock = lock
mp.waitunlockf = unlockf
releasem(mp)
mcall(park_m)
}
func park_m(gp *g) {
mp := getg().m
// 把當(dāng)前協(xié)程改成 _Gwaiting 狀態(tài)
casgstatus(gp, _Grunning, _Gwaiting)
if fn := mp.waitunlockf; fn != nil {
// 把 pollDesc 的 rg 字段改成協(xié)程結(jié)構(gòu)體的地址
// atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
ok := fn(gp, mp.waitlock)
}
// 重新調(diào)度其他協(xié)程執(zhí)行
schedule()
}可以看到 Read 在沒有數(shù)據(jù)可讀時(shí),調(diào)用協(xié)程會(huì)被修改成等待狀態(tài),等待事件的發(fā)生,同時(shí)發(fā)生調(diào)度選擇其他協(xié)程繼續(xù)運(yùn)行,所以一個(gè)協(xié)程的阻塞影響的只是自己,而不是影響到整個(gè)線程,這大大地提供了資源的利用率和執(zhí)行效率。
那么阻塞的協(xié)程什么時(shí)候又是怎么被喚醒的呢?Go 會(huì)在 sysmon 線程、調(diào)度等時(shí)機(jī)執(zhí)行 netpool 獲取就緒的事件,從而處理相關(guān)的協(xié)程。
func sysmon() {
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
// 更新上次 poll 的時(shí)間
sched.lastpoll.CompareAndSwap(lastpoll, now)
// 通過 IO 多路復(fù)用模塊獲取就緒的事件(所以關(guān)聯(lián)的 g)列表
list, delta := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
incidlelocked(-1)
// 把就緒的 g 放入隊(duì)列等待調(diào)度
injectglist(&list)
incidlelocked(1)
netpollAdjustWaiters(delta)
}
}
}
func netpoll(delay int64) (gList, int32) {
var tp *timespec
var ts timespec
var events [64]keventt
retry:
// 獲取就緒事件
n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
var toRun gList
delta := int32(0)
// 逐個(gè)處理
for i := 0; i < int(n); i++ {
ev := &events[i]
var mode int32
switch ev.filter {
case _EVFILT_READ:
mode += 'r'
case _EVFILT_WRITE:
mode += 'w'
}
if mode != 0 {
var pd *pollDesc
// 找到 pollDesc 中記錄的等待協(xié)程
pd = (*pollDesc)(unsafe.Pointer(ev.udata))
pd.setEventErr(ev.flags == _EV_ERROR, tag)
// 修改狀態(tài)
delta += netpollready(&toRun, pd, mode)
}
}
return toRun, delta
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
delta := int32(0)
var rg, wg *g
// 修改狀態(tài)
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true, &delta)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
return delta
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := gpp.Load()
new := pdReady
// 設(shè)置 pollDesc 的 rg 或 wg 為 pdReady,返回等待的協(xié)程
if gpp.CompareAndSwap(old, new) {
*delta -= 1
return (*g)(unsafe.Pointer(old))
}
}
}Go 最終把就緒的協(xié)程放入就緒隊(duì)列等待調(diào)度執(zhí)行。
系統(tǒng)調(diào)用
有了 IO 多路復(fù)用模塊,IO 操作只注冊事件,阻塞協(xié)程,然后數(shù)據(jù)就緒時(shí)喚醒協(xié)程,并以非阻塞的方式調(diào)用 read 讀取數(shù)據(jù)就行。但是很可惜,IO 多路復(fù)用模塊并不支持所有類型的 IO,比如 epoll 就不支持普通文件的 IO,所以文件 IO 就只能直接以阻塞的方式調(diào)系統(tǒng)調(diào)用來實(shí)現(xiàn)了,但是調(diào)系統(tǒng)調(diào)用不僅耗時(shí)而且可能會(huì)引起線程阻塞,又因?yàn)?Go gmp 機(jī)制中,m 需要獲取 p 才能執(zhí)行 g,一旦線程阻塞就會(huì)凍結(jié)一個(gè) m、g、p,而 p 被凍結(jié)后,p 里面的協(xié)程就沒法執(zhí)行了,所以這時(shí)候需要一種方式讓 p 能脫離出來被其他線程處理,這就是 Go 的 handoff 機(jī)制。handoff 機(jī)制不僅在文件 IO 中使用,在調(diào)用其他系統(tǒng)調(diào)用時(shí)也會(huì)使用。接著看一下打開一個(gè)文件的過程。
func Open(name string) (*File, error) {
return OpenFile(name, O_RDONLY, 0)
}
func OpenFile(name string, flag int, perm FileMode) (*File, error) {
f, err := openFileNolog(name, flag, perm)
if err != nil {
return nil, err
}
f.appendMode = flag&O_APPEND != 0
return f, nil
}
func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
ignoringEINTR(func() error {
r, s, e = open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
return e
})
// ...
return f, nil
}
func open(path string, flag int, perm uint32) (int, poll.SysFile, error) {
fd, err := syscall.Open(path, flag, perm)
return fd, poll.SysFile{}, err
}
func Open(path string, mode int, perm uint32) (fd int, err error) {
r0, _, e1 := syscall(abi.FuncPCABI0(libc_open_trampoline), uintptr(unsafe.Pointer(_p0)), uintptr(mode), uintptr(perm))
return
}
func syscall_syscall(fn, a1, a2, a3 uintptr) (r1, r2, err uintptr) {
args := struct{ fn, a1, a2, a3, r1, r2, err uintptr }{fn, a1, a2, a3, r1, r2, err}
// 執(zhí)行系統(tǒng)調(diào)用前的處理
entersyscall()
libcCall(unsafe.Pointer(abi.FuncPCABI0(syscall)), unsafe.Pointer(&args))
// 執(zhí)行完系統(tǒng)調(diào)用前的處理
exitsyscall()
return args.r1, args.r2, args.err
}可以看到最終在執(zhí)行系統(tǒng)調(diào)用時(shí)會(huì)先進(jìn)行一些特殊的處理,看一下 entersyscall。
func entersyscall() {
fp := getcallerfp()
reentersyscall(getcallerpc(), getcallersp(), fp)
}
func reentersyscall(pc, sp, bp uintptr) {
trace := traceAcquire()
gp := getg()
// 把當(dāng)前協(xié)程改成 _Gsyscall 狀態(tài)
casgstatus(gp, _Grunning, _Gsyscall)
gp.m.syscalltick = gp.m.p.ptr().syscalltick
// 接觸 m 和 p 的關(guān)系
pp := gp.m.p.ptr()
pp.m = 0
// m 中保存當(dāng)前的 p,執(zhí)行完系統(tǒng)調(diào)用后優(yōu)先獲取該 p
gp.m.oldp.set(pp)
gp.m.p = 0
// 把 p 的狀態(tài)改成 _Psyscall
atomic.Store(&pp.status, _Psyscall)
}這里只是需改了下數(shù)據(jù),并不會(huì)直接執(zhí)行 handoff 機(jī)制,執(zhí)行完 reentersyscall 后,協(xié)程和所在的線程就陷入系統(tǒng)調(diào)用了,然后 sysmon 線程會(huì)定時(shí)處理相關(guān)的邏輯,sysmon 中有一段搶占的邏輯。
func retake(now int64) uint32 {
n := 0
// 遍歷所有 p
for i := 0; i < len(allp); i++ {
pp := allp[i]
pd := &pp.sysmontick
s := pp.status
// 處理處于系統(tǒng)調(diào)用的 p
if s == _Psyscall {
// 把 p 改成空閑狀態(tài)
if atomic.Cas(&pp.status, s, _Pidle) {
// 處理 p 上的協(xié)程
handoffp(pp)
}
}
}
}sysmon 把處于系統(tǒng)調(diào)度的 p 交給其他空閑線程或新建線程進(jìn)行處理。
func handoffp(pp *p) {
// 還有 g 需要處理,創(chuàng)建新的線程(m)
if !runqempty(pp) || sched.runqsize != 0 {
startm(pp, false, false)
return
}
}這樣就保證了 p 上的協(xié)程可以被及時(shí)處理。
睡眠
Go 中可以通過 time.Sleep 讓協(xié)程定時(shí)睡眠一段時(shí)間,time.Sleep 實(shí)現(xiàn)如下。
func timeSleep(ns int64) {
gp := getg()
t := gp.timer
if t == nil {
t = new(timer)
// 設(shè)置超時(shí)時(shí)間函數(shù)和參數(shù)
t.init(goroutineReady, gp)
gp.timer = t
}
when := nanotime() + ns
gp.sleepWhen = when
// 阻塞協(xié)程
gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1)
}time.Sleep 首先設(shè)置了超時(shí)時(shí)間函數(shù)和參數(shù),然后把協(xié)程改成阻塞狀態(tài)并觸發(fā)重新調(diào)度,最后執(zhí)行 resetForSleep 注冊定時(shí)器,Go 在調(diào)度時(shí),會(huì)判斷是否有定時(shí)器超時(shí)。
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m
pp := mp.p.ptr()
now, pollUntil, _ := pp.timers.check(0)
}
func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
// 最快超時(shí)的定時(shí)器時(shí)間
next := ts.wakeTime()
if next == 0 {
// No timers to run or adjust.
return now, 0, false
}
now = nanotime()
if len(ts.heap) > 0 {
ts.adjust(now, false)
for len(ts.heap) > 0 {
// 處理超時(shí)的定時(shí)器,如果超時(shí)的話
if tw := ts.run(now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
return now, pollUntil, ran
}
func (ts *timers) run(now int64) int64 {
tw := ts.heap[0]
t := tw.timer
t.lock()
if t.when > now {
// Not ready to run.
t.unlock()
return t.when
}
t.unlockAndRun(now)
return 0
}
func (t *timer) unlockAndRun(now int64) {
f := t.f
arg := t.arg
seq := t.seq
var next int64
delay := now - t.when
f(arg, seq, delay)
}對于 time.Sleep 來時(shí),f 對應(yīng)的函數(shù)是 goroutineReady。
func goroutineReady(arg any, _ uintptr, _ int64) {
goready(arg.(*g), 0)
}
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
// 獲取當(dāng)前線程的 m
mp := acquirem()
// 修改 g 的狀態(tài)為就緒,等待調(diào)度
casgstatus(gp, _Gwaiting, _Grunnable)
// 把 g 放到 m 關(guān)聯(lián)到 p 的 g 隊(duì)列
runqput(mp.p.ptr(), gp, next)
// 喚醒/創(chuàng)建線程處理
wakep()
releasem(mp)
}goroutineReady 最終把協(xié)程加入就緒隊(duì)列,等待調(diào)度。
搶占式調(diào)度
和之前在函數(shù)里插入監(jiān)測點(diǎn)的方式不一樣,現(xiàn)在 Go 已經(jīng)通過信號機(jī)制支持搶占式調(diào)度,防止某個(gè)協(xié)程執(zhí)行的 CPU 時(shí)間過長,因?yàn)樾盘枡C(jī)制具有非常高的優(yōu)先級,通過信號可以徹底解決協(xié)程長期占據(jù) CPU 的問題。Go 在初始化時(shí)會(huì)注冊信號的處理函數(shù)。
func initsig(preinit bool) {
for i := uint32(0); i < _NSIG; i++ {
setsig(i, abi.FuncPCABIInternal(sighandler))
}
}
func setsig(i uint32, fn uintptr) {
var sa usigactiont
sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTART
sa.sa_mask = ^uint32(0)
// 設(shè)置信號處理函數(shù)
fn = abi.FuncPCABI0(sigtramp)
*(*uintptr)(unsafe.Pointer(&sa.__sigaction_u)) = fn
sigaction(i, &sa, nil)
}然后在 sysmon 線程中定時(shí)判斷是否有協(xié)程執(zhí)行的時(shí)間過長。
func retake(now int64) uint32 {
// 遍歷所有 p
for i := 0; i < len(allp); i++ {
pp := allp[i]
s := pp.status
if s == _Prunning {
if pd.schedwhen+forcePreemptNS <= now {
preemptone(pp)
}
}
}
}
func preemptone(pp *p) bool {
mp := pp.m.ptr()
// 設(shè)置搶占標(biāo)記
gp.preempt = true
gp.stackguard0 = stackPreempt
// 給協(xié)程所在的線程 m 發(fā)信號進(jìn)行搶占處理
if preemptMSupported && debug.asyncpreemptoff == 0 {
pp.preempt = true
preemptM(mp)
}
return true
}
func preemptM(mp *m) {
// 還沒發(fā)送則發(fā)送信號
if mp.signalPending.CompareAndSwap(0, 1) {
signalM(mp, sigPreempt)
}
}
// 給指定線程發(fā)送信號
func signalM(mp *m, sig int) {
pthread_kill(pthread(mp.procid), uint32(sig))
}給指定線程發(fā)送信號后,信號處理函數(shù)就會(huì)在對應(yīng)線程的上下文執(zhí)行,從而獲取到該線程上一直占用 CPU 的協(xié)程,信號處理函數(shù)是 sigtramp。
TEXT runtime·sigtramp(SB),NOSPLIT|TOPFRAME,$28
// Save callee-saved C registers, since the caller may be a C signal handler.
MOVL BX, bx-4(SP)
MOVL BP, bp-8(SP)
MOVL SI, si-12(SP)
MOVL DI, di-16(SP)
// We don't save mxcsr or the x87 control word because sigtrampgo doesn't
// modify them.
MOVL (28+4)(SP), BX
MOVL BX, 0(SP)
MOVL (28+8)(SP), BX
MOVL BX, 4(SP)
MOVL (28+12)(SP), BX
MOVL BX, 8(SP)
CALL runtime·sigtrampgo(SB)
MOVL di-16(SP), DI
MOVL si-12(SP), SI
MOVL bp-8(SP), BP
MOVL bx-4(SP), BX
RET最終執(zhí)行 sigtrampgo。
func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
c := &sigctxt{info, ctx}
gp := sigFetchG(c)
setg(gp.m.gsignal)
sighandler(sig, info, ctx, gp)
}
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
gsignal := getg()
mp := gsignal.m
c := &sigctxt{info, ctxt}
if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
doSigPreempt(gp, c)
}
}
func doSigPreempt(gp *g, ctxt *sigctxt) {
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// 修改內(nèi)存,注入 asyncPreempt 地址
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
}
func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
sp := c.sp() - 16 // SP needs 16-byte alignment
c.set_sp(sp)
*(*uint64)(unsafe.Pointer(uintptr(sp))) = c.lr()
*(*uint64)(unsafe.Pointer(uintptr(sp - goarch.PtrSize))) = c.r29()
c.set_lr(uint64(resumePC))
c.set_pc(uint64(targetPC))
}sigtrampgo 最終修改了內(nèi)存地址注入 asyncPreempt 函數(shù)地址,信號處理結(jié)束后執(zhí)行 asyncPreempt,asyncPreempt 繼續(xù)執(zhí)行 asyncPreempt2。
func asyncPreempt2() {
gp := getg()
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
}
func gopreempt_m(gp *g) {
goschedImpl(gp, true)
}
func goschedImpl(gp *g, preempted bool) {
// 把協(xié)程改成就緒狀態(tài)
casgstatus(gp, _Grunning, _Grunnable)
// 解除 m 和 g 的關(guān)系
dropg()
// 消耗太多 CPU 了,把 g 放入全局隊(duì)列
globrunqput(gp)
// 調(diào)度其他協(xié)程執(zhí)行
schedule()
}總結(jié)
Node.js / Redis / Nginx 等軟件的架構(gòu)都是單線程的,所有的任務(wù)都是在單個(gè)線程中被串行執(zhí)行,盡管底層有線程池(處理耗時(shí)或阻塞式操作),但是線程池對用戶是不感知的,我們的可以理解為我們的任務(wù)或代碼是在單個(gè)線程中執(zhí)行的,比如 Redis 命令就是串行執(zhí)行的,不需要擔(dān)心多線程的問題,Node.js 的代碼也是單線程中執(zhí)行的,不需要擔(dān)心數(shù)據(jù)競爭問題,另外這些軟件都是基于異步回調(diào)的,代碼邏輯會(huì)比較割裂,對編寫和理解代碼來說有一定的負(fù)擔(dān)。
但是在 Go 中情況有所不同。Go 可以通過 go 關(guān)鍵字創(chuàng)建多個(gè)協(xié)程,這些協(xié)程是跑在多個(gè)線程中的,天然利用了多核能力,但是如果使用了公共的數(shù)據(jù)結(jié)構(gòu),需要通過互斥機(jī)制保證數(shù)據(jù)的正確性,而又因?yàn)閾屨际秸{(diào)度的存在,盡管我們只跑在一個(gè)線程中,對共享數(shù)據(jù)的修改也會(huì)存在競態(tài)條件。總的來說,Go 的架構(gòu)是在多個(gè)線程上通過 gmp 機(jī)制運(yùn)行多個(gè)協(xié)程,并在必要的時(shí)候進(jìn)行搶占式調(diào)度,單個(gè)協(xié)程內(nèi)執(zhí)行時(shí),不同的阻塞式 API 其底層實(shí)現(xiàn)是不一樣的,一般來說,大多數(shù) API(網(wǎng)絡(luò) IO、睡眠) 都是阻塞協(xié)程不阻塞線程,其原理是把協(xié)程改成阻塞狀態(tài)并放到等待隊(duì)列中,在合適的時(shí)機(jī)并且滿足條件時(shí)把它放到就緒隊(duì)列等待調(diào)度,而部分 API(文件讀寫或其他系統(tǒng)調(diào)用)是會(huì)引起線程阻塞,這時(shí)候 Go 通過 handoff 機(jī)制保證其他協(xié)程的執(zhí)行,但是這些對于用戶都是無感的,單協(xié)程內(nèi)代碼是串行執(zhí)行的。Go 在原生利用多核、同步寫異步代碼和搶占式調(diào)度上對用戶來說是比較有意義的,寫過 Node.js 的同學(xué)應(yīng)該更加深有體會(huì)。



































