精品欧美一区二区三区在线观看 _久久久久国色av免费观看性色_国产精品久久在线观看_亚洲第一综合网站_91精品又粗又猛又爽_小泽玛利亚一区二区免费_91亚洲精品国偷拍自产在线观看 _久久精品视频在线播放_美女精品久久久_欧美日韩国产成人在线

Go netpoll 的實現和使用,你學會了嗎?

開發 前端
Go 目前使用的 IO 模型是 IO 多路復用,由 netpoll 模塊實現,這部分沒有特別的地方,重點是 Go 把 IO 模型和協程結合一起,大概的流程如下。

IO 模型是軟件中非常重要的部分,軟件架構也隨著 IO 模型的變化而變化,比如服務器架構經歷了一個請求一個進程+阻塞式 IO,一個請求一個線程+阻塞式 IO,IO 多路復用+非阻塞 IO,異步 IO。現代軟件中,大多數軟件使用的 IO 模型是 IO 多路復用,因為它的平臺兼容性比較好一點,但是慢慢也有不少軟件支持了異步 IO,比如 Node.js 已經支持了 io_uring。Go 目前使用的 IO 模型是 IO 多路復用,由 netpoll 模塊實現,這部分沒有特別的地方,重點是 Go 把 IO 模型和協程結合一起,大概的流程如下。

  1. 當一個協程讀且不滿足條件時,Go 會把協程記錄到 pollDesc 中,接著把它改成等待狀態并觸發重新調度。
  2. Go 會定時或按需調用 netpoll 獲取就緒的事件。
  3. 通過 netpoll 返回的事件信息找到對應的 pollDesc,并根據 pollDesc 找到對應的協程,把協程改成就緒狀態等待調度執行。

本文介紹 netpoll 的實現以及它是如何和協程結合起來的。

netpool

核心數據結構

了解 netpoll 之前,首先需要先了解 netpoll 模塊的核心數據結構 pollDesc。

type pollDesc struct {
 fd    uintptr        // constant for pollDesc usage lifetime
 fdseq atomic.Uintptr // protects against stale pollDesc
  // 記錄一些信息,比如是否有錯誤,是否超時等
 atomicInfo atomic.Uint32 // atomic pollInfo

  /*
  核心字段
    Nil:初始化狀態
    pdReady:事件就緒,
    pdWait:準備進入阻塞狀態
    其他:協程結構體的地址
  */
 rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
 wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil

 lock    mutex // protects the following fields
  // 只在某些系統使用,如 aix,記錄 pollDesc 在數組中的索引
 user    uint32    // user settable cookie
 closing bool
  // 超時管理
 rrun    bool      // whether rt is running
 wrun    bool      // whether wt is running
 rseq    uintptr   // protects from stale read timers
 rt      timer     // read deadline timer
 rd      int64     // read deadline (a nanotime in the future, -1 when expired)
 wseq    uintptr   // protects from stale write timers
 wt      timer     // write deadline timer
 wd      int64     // write deadline (a nanotime in the future, -1 when expired)
 self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

一個 fd 對應一個 pollDesc,pollDesc 負責管理 fd 的讀寫事件、等待事件的超時時間以及記錄阻塞等待 fd 事件的協程。netpoll 根據 pollDesc 的信息,通過操作系統的 IO 多路復用模塊實現對事件的感知,比如 Linux 的 epoll 和 MacOS 的 kqueue 的 IO 多路復用模塊。

IO 多路復用

IO 多路復用模塊需要實現下面幾個接口。

// 初始化,比如創建一個 epoll 實例,后面通過該實例監聽 fd 的事件
func netpollinit() {
}

// 判斷 fd 是不是 netpoll 內部的 fd,IO 多路復用實例的 fd 或內部創建的 pipe fd
func netpollIsPollDescriptor(fd uintptr) bool {
 return false
}


// 注冊 fd 事件
func netpollopen(fd uintptr, pd *pollDesc) int32 {
 return 0
}

// 取消 fd 的事件
func netpollclose(fd uintptr) int32 {
 return 0
}

// 特定平臺才實現,更新 fd 事件,喚醒 IO 多路復用模塊注冊 fd 事件
func netpollarm(pd *pollDesc, mode int) {
}

// 喚醒 IO 多路復用模塊
func netpollBreak() {
}

/*
  獲取就緒的 fd
  delay = 0: 非阻塞
  delay < 0: 阻塞直到有就緒 fd,阻塞期間可以通過 netpollBreak 喚醒
  delay > 0: 帶超時的阻塞
*/
func netpoll(delay int64) (gList, int32) {
 return gList{}, 0
}

接下來看一下 MacOS 系統下 kqueue 的實現。

package runtime

var (
 kq             int32         = -1
 netpollWakeSig atomic.Uint32 // 是否已經發送了喚醒 kqueue
)

func netpollinit() {
  // 創建 IO 多路復用模塊的實例
 kq = kqueue()
  // 設置 _FD_CLOEXEC 標記,fork + execve 時自動關閉該 fd,避免子進程繼承
 closeonexec(kq)
  // 注冊 EVFILT_USER,后續可以手動喚醒 IO 多路復用模塊
 ev := keventt{
  ident:  kqIdent,
  filter: _EVFILT_USER,
  flags:  _EV_ADD,
 }
 kevent(kq, &ev, 1, nil, 0, nil)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
 var ev [2]keventt
  // 記錄 fd
 *(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
  // 注冊讀寫事件,工作方式是邊緣觸發
 ev[0].filter = _EVFILT_READ
 ev[0].flags = _EV_ADD | _EV_CLEAR
 ev[0].fflags = 0
 ev[0].data = 0
  // 記錄對應的 pollDesc,就緒處理時使用
  ev[0].udata = (*byte)(unsafe.Pointer(pd))
  // 復制結構體
 ev[1] = ev[0]
 ev[1].filter = _EVFILT_WRITE
 kevent(kq, &ev[0], 2, nil, 0, nil)
}

func netpollclose(fd uintptr) int32 {
 // Don't need to unregister because calling close()
 // on fd will remove any kevents that reference the descriptor.
 return 0
}

func netpollarm(pd *pollDesc, mode int) {
 throw("runtime: unused")
}

func netpollBreak() {
 // Failing to cas indicates there is an in-flight wakeup, so we're done here.
 if !netpollWakeSig.CompareAndSwap(0, 1) {
  return
 }
  // 喚醒 IO 多路復用模塊
 ev := keventt{
  ident:  kqIdent,
  filter: _EVFILT_USER,
  flags:  _EV_ENABLE,
  fflags: _NOTE_TRIGGER,
 }
 kevent(kq, &ev, 1, nil, 0, nil)
}

// delay < 0: 一直阻塞,直到有就緒事件或被主動喚醒
// delay == 0: 非阻塞
// delay > 0: 帶超時的阻塞
// 返回就緒的協程和數量
func netpoll(delay int64) (gList, int32) {
 var tp *timespec
 var ts timespec
  
 if delay < 0 {
  tp = nil
 } else if delay == 0 {
  tp = &ts
 } else {
  ts.setNsec(delay)
  if ts.tv_sec > 1e6 {
   // Darwin returns EINVAL if the sleep time is too long.
   ts.tv_sec = 1e6
  }
  tp = &ts
 }
 var events [64]keventt
retry:
  // 獲取就緒事件
 n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
 var toRun gList
 delta := int32(0)
 for i := 0; i < int(n); i++ {
  ev := &events[i]
    // 是否是手動喚醒事件
  if isWakeup(ev) {
      // 是阻塞式調用時才處理
   if delay != 0 {
    // netpollBreak could be picked up by a nonblocking poll.
    // Only call drainWakeupEvent and reset the netpollWakeSig if blocking.
    ev := keventt{
          ident:  kqIdent,
          filter: _EVFILT_USER,
          flags:  _EV_DISABLE,
        }
        // 禁用,下次手動觸發時再開啟
        kevent(kq, &ev, 1, nil, 0, nil)
        // 恢復標記
    netpollWakeSig.Store(0)
   }
   continue
  }

  var mode int32
  switch ev.filter {
  case _EVFILT_READ:
   mode += 'r'
  case _EVFILT_WRITE:
   mode += 'w'
  }
  if mode != 0 {
   var pd *pollDesc
      // 獲取 pollDesc
   pd = (*pollDesc)(unsafe.Pointer(ev.udata))
      // 記錄錯誤
   pd.setEventErr(ev.flags == _EV_ERROR, tag)
      // 修改協程為就緒狀態,返回喚醒的協程數
   delta += netpollready(&toRun, pd, mode)
  }
 }
  // 返回就緒的協程和數量
 return toRun, delta
}

IO 多路復用的工作方式有兩種,Go 使用的是邊緣觸發。

  1. 邊緣觸發:邊緣觸發是從無到有數據時才會通知,需要通過非阻塞方式(避免讀到沒數據時阻塞)不斷讀取數據,否則不會收到新通知。
  2. 水平觸發:水平觸發就是有數據時會一直通知用戶,用戶可以選擇什么時候讀,讀取時依然需要使用非阻塞的方式。比如驚群場景下,一個連接的到來導致多個進程被喚醒收到可讀事件,但是先被調度的進程會消費這個連接,導致其他進程讀取時已經沒有連接了,如果以阻塞方式調用會引起進程阻塞。

netpoll 模塊除了提供常用的 fd 事件訂閱發布外,還有一個重要的能力就是可喚醒,比如當前定時器最早超時時間是 5s,然后 IO 多路復用模塊阻塞等待 5s,過去 1s 時突然新增了一個超時時間為 1s 的定時器,這時候需要提前喚醒 IO 多路復用模塊,從而及時處理定時器。在某些系統的中可以通過特殊的事件直接喚醒,比如上面的 EVFILT_USER 類型的事件,而比較常用實現方式是通過 pipe 創建兩個 fd,分別為讀 fd 和 寫 fd,把讀 fd 注冊到 IO 多路復用模塊中,然后通過往寫 fd 寫入數據來喚醒 IO 多路復用模塊。

// 創建兩個 fd,一個注冊到 kqueue
func addWakeupEvent(kq int32) {
 r, w, errno := nonblockingPipe()
 ev := keventt{
  filter: _EVFILT_READ,
  flags:  _EV_ADD,
 }
 *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
 n := kevent(kq, &ev, 1, nil, 0, nil)
 netpollBreakRd = uintptr(r)
 netpollBreakWr = uintptr(w)
}

// 通過往 fd 寫喚醒 kqueue
func wakeNetpoll(_ int32) {
 for {
  var b byte
  n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
  if n == 1 || n == -_EAGAIN {
   break
  }
  if n == -_EINTR {
   continue
  }
 }
}

// 判斷是不是內部 fd,即上面創建的 fd
func isWakeup(ev *keventt) bool {
 if uintptr(ev.ident) == netpollBreakRd {
  if ev.filter == _EVFILT_READ {
   return true
  }
  println("runtime: netpoll: break fd ready for", ev.filter)
  throw("runtime: netpoll: break fd ready for something unexpected")
 }
 return false
}

// 消費喚醒時寫入的數據,一個喚醒結束
func drainWakeupEvent(_ int32) {
 var buf [16]byte
 read(int32(netpollBreakRd), noescape(unsafe.Pointer(&buf[0])), int32(len(buf)))
}

超時管理

netpoll 支持等待事件就緒的超時時間,其原理是設置一個定時器,然后超時時把協程改成就緒狀態等待調度執行。

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
 lock(&pd.lock)
  // 舊值
 rd0, wd0 := pd.rd, pd.wd
  // 讀寫超時
 combo0 := rd0 > 0 && rd0 == wd0
 if d > 0 {
    // 絕對超時時間
  d += nanotime()
 }
  // 記錄設置了讀或寫超時
 if mode == 'r' || mode == 'r'+'w' {
  pd.rd = d
 }
 if mode == 'w' || mode == 'r'+'w' {
  pd.wd = d
 }
 pd.publishInfo()
  // 設置了超時并且讀寫一樣
 combo := pd.rd > 0 && pd.rd == pd.wd
  // 設置定時器回調,處理讀寫超時
 rtf := netpollReadDeadline
 if combo {
  rtf = netpollDeadline
 }
  // 第一次設置
 if !pd.rrun {
    // 大于 0,修改定時器時間為 rd,處理函數為 rtf
  if pd.rd > 0 {
   pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
   pd.rrun = true
  }
 } else if pd.rd != rd0 || combo != combo0 { // 新舊的超時時間不一樣
  pd.rseq++ // invalidate current timers
    // 修改
  if pd.rd > 0 {
   pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
  } else {
      // 停止定時器
   pd.rt.stop()
   pd.rrun = false
  }
 }
  // 寫也差不多,忽略
  
  // 超時時間小于 0 才會執行下面的邏輯
 delta := int32(0)
 var rg, wg *g
  // 新的超時時間小于 0,則修改 pollDesc 狀態為 Nil 并喚醒阻塞的協程
 if pd.rd < 0 {
  rg = netpollunblock(pd, 'r', false, &delta)
 }
 if pd.wd < 0 {
  wg = netpollunblock(pd, 'w', false, &delta)
 }
 unlock(&pd.lock)
  // 把協程改成就緒狀態,等待調度
 if rg != nil {
  netpollgoready(rg, 3)
 }
 if wg != nil {
  netpollgoready(wg, 3)
 }
  // 等待者數量減去 delta
 netpollAdjustWaiters(delta)
}

func netpollgoready(gp *g, traceskip int) {
 goready(gp, traceskip+1)
}

func goready(gp *g, traceskip int) {
 systemstack(func() {
  ready(gp, traceskip, true)
 })
}

func ready(gp *g, traceskip int, next bool) {
 status := readgstatus(gp)
 mp := acquirem() 
  // 改成就緒狀態
 casgstatus(gp, _Gwaiting, _Grunnable)
  // 加入 p 的隊列等待調度執行
 runqput(mp.p.ptr(), gp, next)
 wakep()
 releasem(mp)
}

poll_runtime_pollSetDeadline 用于設置一個協程超時等待某個 fd 事件,支持取消超時設置。當超時時執行 netpolldeadlineimpl 把協程改成就緒狀態等待調度執行。

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
 lock(&pd.lock)
 delta := int32(0)
 var rg *g
  // 修改 pollDesc 的狀態
 if read {
  pd.rd = -1
  pd.publishInfo()
  rg = netpollunblock(pd, 'r', false, &delta)
 }
 // write 寫同上
 unlock(&pd.lock)
  // 把協程改成就緒狀態
 if rg != nil {
  netpollgoready(rg, 0)
 }
 netpollAdjustWaiters(delta)
}

netpool 在 Go 中的使用

IO 多路復用模塊只是提供了一些基礎的能力,那么這些能力是如何和 Go 結合起來的呢?下面從啟動一個 TCP 服務器為例,看看 netpoll 模塊是如何和 Go 結合起來的。

func Listen(network, address string) (Listener, error) {
 var lc ListenConfig
 return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
 l, err = sl.listenTCP(ctx, la)
 return l, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
 return sl.listenTCPProto(ctx, laddr, 0)
}

func (sl *sysListener) listenTCPProto(ctx context.Context, laddr *TCPAddr, proto int) (*TCPListener, error) {
 fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, proto, "listen", ctrlCtxFn)
 return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
 family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
 return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
}

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
 s, err := sysSocket(family, sotype, proto)
 fd, err = newFD(s, family, sotype, net)
  fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn)
 return fd, nil
}

func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
  return os.NewSyscallError("bind", err)
 }
 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
  return os.NewSyscallError("listen", err)
 }
 if err = fd.init(); err != nil {
  return err
 }
 return nil
}

創建一個 TCP 服務器首先創建了一個 socket,接著綁定到監聽到地址和把 socket 改成 listen 狀態,這樣就完成了服務器的啟動,啟動成功后執行 fd.init() 把 socket 對應的 fd 注冊到 IO 多路復用模塊。

func (fd *netFD) init() error {
 return fd.pfd.Init(fd.net, true)
}

func (fd *FD) Init(net string, pollable bool) error {
 err := fd.pd.init(fd)
 return err
}

func (pd *pollDesc) init(fd *FD) error {
  // 懶初始化 IO 多路復用模塊
 serverInit.Do(runtime_pollServerInit)
  // 注冊 fd 讀寫事件
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
  // 記錄這個上下文,對應一個 pollDesc 結構體,后續用到
 pd.runtimeCtx = ctx
 return nil
}

接著就調 netpoll 模塊的 poll_runtime_pollOpen。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  // 從 cache 里分配一個 pollDesc
 pd := pollcache.alloc()
 lock(&pd.lock)
 wg := pd.wg.Load()
 rg := pd.rg.Load()
 pd.fd = fd
 pd.closing = false
 pd.setEventErr(false, 0)
 pd.rseq++
 pd.rg.Store(pdNil)
 pd.rd = 0
 pd.wseq++
 pd.wg.Store(pdNil)
 pd.wd = 0
 pd.self = pd
 pd.publishInfo()
 unlock(&pd.lock)
  // 注冊 fd 
 errno := netpollopen(fd, pd)
 return pd, 0
}

這樣就完成了服務器的啟動和 socket fd 事件的注冊,接著就調 accept 等待連接的到來。

func (l *TCPListener) Accept() (Conn, error) {
 c, err := l.accept()
 return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
 fd, err := ln.fd.accept()
 return newTCPConn(fd, ...), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
 d, rsa, errcall, err := fd.pfd.Accept()
  // 拿到 fd,創建一個 netFD
 netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
  // 再次注冊到 IO 多路復用模塊,進行數據通信
 netfd.init()
 return netfd, nil
}

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
  // 加鎖
 if err := fd.readLock(); err != nil {
  return -1, nil, "", err
 }
 defer fd.readUnlock()
  // 讀判斷是否有錯誤,有則返回
 if err := fd.pd.prepareRead(fd.isFile); err != nil {
  return -1, nil, "", err
 }
  
 for {
  s, rsa, errcall, err := accept(fd.Sysfd)
  if err == nil {
   return s, rsa, "", err
  }
  switch err {
  case syscall.EAGAIN:
   if fd.pd.pollable() {
        // 阻塞等待,重新 accept
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  return -1, nil, errcall, err
 }
}

這里有兩個地方涉及到 netpoll,分別是 prepareRead 和 accept。

func (pd *pollDesc) prepareRead(isFile bool) error {
 return pd.prepare('r', isFile)
}

func convertErr(res int, isFile bool) error {
 switch res {
 case pollNoError:
  return nil
 case pollErrClosing:
  return errClosing(isFile)
 case pollErrTimeout:
  return ErrDeadlineExceeded
 case pollErrNotPollable:
  return ErrNotPollable
 }
}

func (pd *pollDesc) prepare(mode int, isFile bool) error {
 if pd.runtimeCtx == 0 {
  return nil
 }
 res := runtime_pollReset(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

func poll_runtime_pollReset(pd *pollDesc, mode int) int {
  // 判斷是否有錯誤
 errcode := netpollcheckerr(pd, int32(mode))
  // 有則返回
 if errcode != pollNoError {
  return errcode
 }
 if mode == 'r' {
  pd.rg.Store(pdNil)
 } else if mode == 'w' {
  pd.wg.Store(pdNil)
 }
 return pollNoError
}

func netpollcheckerr(pd *pollDesc, mode int32) int {
 info := pd.info()
 if info.closing() {
  return pollErrClosing
 }
  // 超時
 if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
  return pollErrTimeout
 }
 if mode == 'r' && info.eventErr() {
  return pollErrNotPollable
 }
 return pollNoError
}

如果沒有發生錯誤則繼續調 accept,當沒有連接時,accept 會返回 EAGAIN(非阻塞調用條件不滿足時的錯誤碼),接著執行 waitRead。

if fd.pd.pollable() {
  if err = fd.pd.waitRead(fd.isFile); err == nil {
    continue
  }
}

func (pd *pollDesc) waitRead(isFile bool) error {
 return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
 if pd.runtimeCtx == 0 {
  return errors.New("waiting for unsupported file type")
 }
 res := runtime_pollWait(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
 errcode := netpollcheckerr(pd, int32(mode))
 if errcode != pollNoError {
  return errcode
 }
  // 阻塞當前協程
 for !netpollblock(pd, int32(mode), false) {
  errcode = netpollcheckerr(pd, int32(mode))
  if errcode != pollNoError {
   return errcode
  }
 }
 return pollNoError
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
  // 執行到這時,可能已經就緒了,則返回,因為 Go 是多線程的,一個線程讀的時候,另一個線程可能寫
  if gpp.CompareAndSwap(pdReady, pdNil) {
   return true
  }
    // 一般情況,設置為 pdWait 狀態,表示準備進入阻塞狀態
  if gpp.CompareAndSwap(pdNil, pdWait) {
   break
  }
 }
  // waitio 是 false,但一般沒有 error,執行 gopark 阻塞協程
 if waitio || netpollcheckerr(pd, mode) == pollNoError {
  gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
 }
  // 被喚醒,重置為 pdNil
 old := gpp.Swap(pdNil)
 return old == pdReady
}

netpollblock 最終調 gopark 阻塞協程。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, ...) {
 mp.waitlock = lock
 mp.waitunlockf = unlockf
 releasem(mp)
 mcall(park_m)
}

func park_m(gp *g) {
 mp := getg().m
  // 修改協程為 _Gwaiting 狀態
 casgstatus(gp, _Grunning, _Gwaiting)
  // 執行 waitunlockf,即 netpollblockcommit
 if fn := mp.waitunlockf; fn != nil {
  ok := fn(gp, mp.waitlock)
  mp.waitunlockf = nil
  mp.waitlock = nil
    // 如果返回 false,則說明就緒了,修改協程為 _Grunnable,繼續執行它
  if !ok {
   casgstatus(gp, _Gwaiting, _Grunnable)
   execute(gp, true) // Schedule it back, never returns.
  }
 }
  // 否則重新調度其他協程
 schedule()
}

正常情況下,gopark 設置協程為 Gwaiting 狀態,然后重新調度,相當于該協程就暫停執行了。但是從開始執行 gopark 到現在可能情況已經發生了變化,所以 Go 還會執行一下鉤子函數 waitunlockf,這里是 netpollblockcommit。

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
  // 如果當前還是 pdWait 狀態則修改 pollDesc 的 rg 或 wg 字段為當前協程結構體
 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
  // 修改成功則等待者加 1
 if r {
  netpollAdjustWaiters(1)
 }
  // 返回是否修改成功
 return r
}

正常情況下,這個過程是 pollDesc 的 rg 或 wg 字段從 Nil 到 pdWait 再到協程結構體地址,除非執行過程中事件已經就緒。從前面的分析可以看到啟動一個服務器并調用 Accept 后為什么協程會阻塞了,從中也可以看到 Go 中以同步方式寫異步代碼的底層實現。

那么協程阻塞后,什么時候才會喚醒呢?又是怎么喚醒的?Go 會在某些時機調用 IO 多路復用模塊的 netpoll 獲取就緒的事件,從而喚醒關聯的協程。這種時機有幾個,比如在 sysmon 線程中定時獲取,或者調度時獲取,我們搜索 netpoll 函數的調用就可以看到,下面以 sysmon 線程的處理為例。

if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
  // 更新上次 poll 的時間
  sched.lastpoll.CompareAndSwap(lastpoll, now)
  // 通過 IO 多路復用模塊獲取就緒的事件(所以關聯的 g)列表
  list, delta := netpoll(0) // non-blocking - returns list of goroutines
  if !list.empty() {
    // 把g 改成就緒并放入隊列等待調度
    injectglist(&list)
    incidlelocked(1)
    // 等待者減 delta(負數)
    netpollAdjustWaiters(delta)
  }
}

前面介紹過 netpoll 最終會調 netpollready。

func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
  // 喚醒的協程個數
 delta := int32(0)
 var rg, wg *g
  // 喚醒對應的等待者,讀或寫
 if mode == 'r' || mode == 'r'+'w' {
  rg = netpollunblock(pd, 'r', true, &delta)
 }
 if mode == 'w' || mode == 'r'+'w' {
  wg = netpollunblock(pd, 'w', true, &delta)
 }
  // 喚醒成功插入隊列
 if rg != nil {
  toRun.push(rg)
 }
 if wg != nil {
  toRun.push(wg)
 }
 return delta
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
    // 當前狀態
  old := gpp.Load()
    // 之前就是 pdReady 則不需要再處理,等待上層消費后改成 Nil
  if old == pdReady {
   return nil
  }
    // 新狀態
  new := pdNil
    // 這里是 true,設置超時并且超時時 ioready 是 false
  if ioready {
   new = pdReady
  }
    // 改成 pdReady 狀態
  if gpp.CompareAndSwap(old, new) {
      // 當前狀態如果是 pdWait 則改成 old,即不需要處理該協程,
      // 前面介紹阻塞協程時講到,pdWait 是準備進入阻塞狀態,然后在 netpollblockcommit 會再進一步判斷,如果這里改成 pdReady 狀態,則協程不會阻塞。
   if old == pdWait {
    old = pdNil
   } else if old != pdNil { // old 是等待讀寫事件的協程
    *delta -= 1 // 等待者減一
   }
      // 返回待喚醒的協程
   return (*g)(unsafe.Pointer(old))
  }
 }
}

netpollunblock 第一種處理場景是事件就緒時把狀態改成 pdReady,第二種處理場景是事件沒有就緒,但設置了超時時間并且已經超時,則把狀態改成 Nil,把該協程改成就緒狀態,然后返回阻塞在該事件的協程并把它加入到隊列中等待調度執行。

func injectglist(glist *gList) {
 head := glist.head.ptr()
 var tail *g
 qsize := 0
  // 改成就緒狀態
 for gp := head; gp != nil; gp = gp.schedlink.ptr() {
  tail = gp
  qsize++
  casgstatus(gp, _Gwaiting, _Grunnable)
 }

 // Turn the gList into a gQueue.
 var q gQueue
 q.head.set(head)
 q.tail.set(tail)
 *glist = gList{}
  // 啟動新的線程處理
 startIdle := func(n int) {
  for i := 0; i < n; i++ {
   mp := acquirem() // See comment in startm.
   lock(&sched.lock)

   pp, _ := pidlegetSpinning(0)
   if pp == nil {
    unlock(&sched.lock)
    releasem(mp)
    break
   }

   startm(pp, false, true)
   unlock(&sched.lock)
   releasem(mp)
  }
 }

 pp := getg().m.p.ptr()
 if pp == nil {
  lock(&sched.lock)
    // 放入全局隊列
  globrunqputbatch(&q, int32(qsize))
  unlock(&sched.lock)
  startIdle(qsize)
  return
 }
  // 放到 p 本地隊列
 if !q.empty() {
  runqputbatch(pp, &q, qsize)
 }
}

協程和 IO 多路復用的結合思路也可以參考 Russ Cox 寫的 libtask 和這個分析文章 https://zhuanlan.zhihu.com/p/360477474。

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2024-02-02 11:03:11

React數據Ref

2024-02-21 19:02:05

Go模板化方式

2022-01-17 07:50:37

Go代碼規范

2022-03-05 23:29:18

LibuvwatchdogNode.js

2022-06-16 07:50:35

數據結構鏈表

2022-11-21 16:57:20

2022-08-29 08:05:44

Go類型JSON

2022-11-08 08:45:30

Prettier代碼格式化工具

2024-07-29 10:35:44

KubernetesCSI存儲

2024-08-19 10:24:14

2024-01-29 08:21:59

AndroidOpenCV車牌

2023-10-30 07:05:31

2023-12-27 07:31:45

json產品場景

2024-04-28 08:24:27

分布式架構Istio

2024-01-18 09:38:00

Java注解JDK5

2024-03-12 08:37:32

asyncawaitJavaScript

2023-12-07 12:29:49

Nginx負載均衡策略

2022-07-08 09:27:48

CSSIFC模型

2024-01-19 08:25:38

死鎖Java通信

2023-07-26 13:11:21

ChatGPT平臺工具
點贊
收藏

51CTO技術棧公眾號

春色校园综合激情亚洲| 国产成人毛毛毛片| 精品一区二区三| 欧美日韩高清一区二区| 国产高潮呻吟久久久| 免费看黄色一级视频| 久久中文字幕一区二区三区| www.亚洲成人| 久久久久久久无码| 日本免费在线一区| 精品久久久久久国产91| 日韩在线电影一区| 蜜桃av鲁一鲁一鲁一鲁俄罗斯的| 日韩av网站免费在线| 久久这里只有精品视频首页| av网页在线观看| 亚洲伊人精品酒店| 日韩欧美福利视频| www.69av| 免费在线观看黄色网| 成人福利视频在线| 91精品中国老女人| 亚洲国产av一区二区三区| 黄色亚洲大片免费在线观看| 日日狠狠久久偷偷四色综合免费 | 视频一区免费在线观看| 欧美日韩成人在线播放| 黄色av免费播放| 久久99国产精品久久99大师 | 亚洲视频1区2区| 欧美在线视频二区| 老司机午夜福利视频| 久久av中文字幕片| 国产成人精品最新| 青青草免费观看视频| 国产精品hd| 欧美成人午夜激情| 任你操精品视频| 精品国精品国产自在久国产应用| 亚洲第一网站男人都懂| 一区二区在线免费观看视频| 色综合.com| 欧美日韩视频在线第一区| 丁香啪啪综合成人亚洲| 国产污视频在线播放| 一区二区三区欧美日韩| 亚洲国产精品影视| 欧美18一19xxx性| 国产精品不卡一区| 亚洲国产午夜伦理片大全在线观看网站| 亚洲欧洲国产综合| 不卡的av在线播放| 国模精品一区二区三区| 蜜桃在线一区二区| av亚洲精华国产精华精| 国产精品免费视频一区二区| 性一交一乱一透一a级| 国产一区二区三区免费播放| 91九色国产视频| 国产精品一区二区黑人巨大| 国产在线不卡视频| 91精品综合久久| www.午夜激情| 成人av电影免费观看| 国产在线观看一区| 手机福利小视频在线播放| 91首页免费视频| 欧洲成人一区二区| av网站无病毒在线| 亚洲欧洲日韩女同| 8x8ⅹ国产精品一区二区二区| 最爽无遮挡行房视频在线| 亚洲自拍偷拍网站| 3d动漫一区二区三区| 丝袜老师在线| 欧美性猛交一区二区三区精品 | 欧美日韩免费观看视频| 欧美日韩一区二区三区四区| 日韩av自拍偷拍| 91久久偷偷做嫩草影院电| 精品国产一二三| 噜噜噜在线视频| 成人在线免费小视频| 久久精品在线播放| 日韩特黄一级片| 日韩电影在线一区二区三区| 91久久国产精品91久久性色| 韩国中文字幕hd久久精品| 久久天堂av综合合色蜜桃网| 亚洲高清乱码| sis001亚洲原创区| 欧美综合一区二区三区| av在线免费看片| 无码少妇一区二区三区| 久久精品国产成人| 国产又黄又爽又色| 久久国产精品第一页| 国产精品99久久久久久久| 青青免费在线视频| 亚洲欧美视频在线观看| 哪个网站能看毛片| 欧美区一区二区| 国产香蕉精品视频一区二区三区| 欧美偷拍第一页| 久久青草久久| 国产精品欧美久久| 免费**毛片在线| 欧美三级免费观看| 精品无码av一区二区三区不卡| 精品国产乱码久久久| 欧美精品精品精品精品免费| 性高潮视频在线观看| 粉嫩嫩av羞羞动漫久久久| 亚洲国产欧洲综合997久久 | 天天操夜夜操很很操| 日韩av不卡一区| 欧美xxxx综合视频| 国产美女www爽爽爽| 99这里都是精品| 天堂а√在线中文在线| 成人国产精品入口免费视频| 日韩av在线导航| 久草视频手机在线观看| 毛片一区二区三区| 欧美日韩电影一区二区| 波多一区二区| 欧美一级生活片| 欧美日韩精品免费看| 网站在线你懂的| 五月国产精品| 91精品国产色综合| 成人av免费播放| 国产精品久久久久aaaa| 欧洲av无码放荡人妇网站| 影音先锋欧美激情| 久热国产精品视频| 中文字幕资源网| 国产欧美日韩三区| 成人一级片网站| 偷拍亚洲精品| 欧美整片在线观看| 天天干免费视频| 亚洲国产精品一区二区久久| 日本黄色三级网站| 一级欧洲+日本+国产| 91精品视频网站| 国产激情在线| 91精品国产综合久久久蜜臀粉嫩| 国产一区第一页| 韩国三级在线一区| 成年人免费观看的视频| 欧美美女被草| 久久视频精品在线| 国产情侣在线播放| 亚洲制服欧美中文字幕中文字幕| 国产伦精品一区二区三区妓女下载 | 亚洲h精品动漫在线观看| 亚洲图片 自拍偷拍| 一本一道久久a久久精品蜜桃 | 日韩中文字幕二区| 蜜臀久久99精品久久一区二区| 欧美一级高清免费播放| 欧美精品a∨在线观看不卡 | 久久久之久亚州精品露出| 成人爽a毛片一区二区| 亚洲一卡二卡三卡四卡五卡| youjizz.com国产| 在线亚洲一区| 日本不卡一区二区三区视频| 粉嫩91精品久久久久久久99蜜桃| 日韩亚洲一区二区| 国产免费的av| 激情久久av一区av二区av三区| 人妻熟女aⅴ一区二区三区汇编| 老司机免费视频久久 | 欧美日一区二区在线观看| 亚洲永久在线观看| 九色porny自拍视频在线播放| 亚洲男人天堂2024| 91亚洲精品国偷拍自产在线观看 | 亚洲欧美日韩网| 中文字幕在线视频免费| 亚洲自拍偷拍图区| 男人操女人动态图| 国产美女精品人人做人人爽| 老司机午夜免费福利视频| 思热99re视热频这里只精品 | 老熟妇一区二区三区啪啪| 成人免费一区二区三区在线观看| 色哟哟视频在线| 秋霞影院一区二区| 欧美一区二区三区综合| 免费看日本一区二区| 美女视频一区二区三区| 久久久亚洲精品视频| 国产黄在线看| 精品久久久久久久久久久院品网 | 黄色在线免费| 亚洲丁香婷深爱综合| 国产精品51麻豆cm传媒| 亚洲午夜电影网| 亚洲精品国产精品国自| 成人精品鲁一区一区二区| 免费看国产黄色片| 一区久久精品| 自拍视频一区二区三区| 先锋影音国产精品| 亚洲va久久久噜噜噜久久天堂| 在线观看特色大片免费视频| 蜜臀久久99精品久久久无需会员| 色在线免费视频| 日韩欧美国产一区二区三区 | 女子免费在线观看视频www| 亚洲欧美国内爽妇网| 亚洲第一天堂网| 欧美人与禽zozo性伦| 精品不卡一区二区| 亚洲国产精品嫩草影院| 国产黄色录像片| 国产欧美在线观看一区| 欧美大片免费播放器| 国产高清亚洲一区| 日本中文字幕影院| 青青草一区二区三区| 欧美日韩国产精品激情在线播放| 欧美伊人久久| 国产又粗又爽又黄的视频| 精品久久久久久久久久久aⅴ| 久热这里只精品99re8久 | 强制捆绑调教一区二区| 男人和女人啪啪网站| 欧美jjzz| 蜜臀av.com| 永久亚洲成a人片777777| 五月天亚洲综合情| 欧美一级精品片在线看| 欧美日韩亚洲免费| 自拍亚洲一区| 鲁鲁视频www一区二区| 欧美三级电影在线| 国产在线观看一区| 欧美深夜视频| 久久久久久久久久久久久9999| 成人h动漫精品一区二区器材| 99在线看视频| 999在线精品| 国产精品视频福利| 精品国产乱子伦一区二区| 国产精品久久九九| 高清精品视频| 久久av二区| 日韩最新在线| 欧美亚洲精品日韩| 精品免费在线| 一区二区三区四区欧美| 99精品视频在线观看播放| 亚洲最新在线| 欧美一区二区| 日本男女交配视频| 日韩一级网站| 青青草原av在线播放| 久久这里只有| 午夜激情av在线| 国产真实乱子伦精品视频| 香蕉视频xxx| 成人高清av在线| 白丝女仆被免费网站| 国产精品久久久久久久久久免费看| 99久久久无码国产精品不卡| 亚洲天堂2014| 日本熟妇毛耸耸xxxxxx| 日韩欧美在线视频免费观看| 波多野结衣视频免费观看| 欧美日韩美少妇| www.久久伊人| 日韩精品一二三四区| 成人18在线| 久久手机精品视频| 国产v日韩v欧美v| 国产精品久久久久国产a级| 日韩有码欧美| 国产一区二区在线网站| 精品色999| 成年人深夜视频| 久久亚洲风情| 中文字幕一二三| 久久婷婷一区二区三区| 懂色av粉嫩av蜜臀av一区二区三区| 一区二区在线观看视频在线观看| 免费在线观看黄网站| 欧美三级欧美一级| 亚洲第一页视频| 国产亚洲激情在线| 日本三级在线观看网站| 国产z一区二区三区| 国产日韩一区二区三免费高清| 精品卡一卡二| 91精品一区二区三区综合在线爱| 久久久一本二本三本| 狠狠狠色丁香婷婷综合激情 | 一区二区三区视频免费观看| 综合色婷婷一区二区亚洲欧美国产| 国产欧美日韩一区二区三区在线| 色戒在线免费观看| 99精品欧美一区二区三区小说 | 99久久99久久精品国产片果冰| 91免费黄视频| 国内成人自拍视频| 在线不卡av电影| 亚洲福利视频三区| 中文字幕精品一区二| 日韩av在线网站| 日日夜夜天天综合入口| 国产精品自拍网| 亚洲人成网77777色在线播放| 国产精品久久久影院| 麻豆一区二区三区| 91精品人妻一区二区| 亚洲综合免费观看高清完整版在线| 中国黄色一级视频| 日韩精品久久久久| 国精产品一区一区三区mba下载| 国产精品中文字幕在线观看| 在线日本制服中文欧美| 男人添女荫道口图片| 国产精品正在播放| fc2ppv在线播放| 欧美视频一区二区三区四区| 日本ー区在线视频| 91精品国产91久久久久福利| 日韩三级久久| 国产香蕉一区二区三区| 久久99日本精品| 欧美肥妇bbwbbw| 欧美精品在线一区二区| 性开放的欧美大片| 97在线视频一区| 欧美日韩一区二区三区四区不卡 | av在线加勒比| 国产精品亚洲不卡a| 欧美午夜久久| 少妇精品无码一区二区| 一区二区三区色| 亚洲高清视频在线播放| 欧美人成在线视频| 亚洲高清在线一区| 日韩不卡一二区| 国产高清无密码一区二区三区| 欧美日韩免费做爰视频| 日韩写真欧美这视频| 日本一级理论片在线大全| αv一区二区三区| 国语自产精品视频在线看8查询8| 国产人妖在线观看| 亚洲动漫第一页| 天天综合在线视频| 欧美一性一乱一交一视频| 在线成人动漫av| 国产精品区在线| 亚洲另类中文字| 国精品人妻无码一区二区三区喝尿| 欧美激情精品久久久久久久变态| 91综合精品国产丝袜长腿久久| 69精品丰满人妻无码视频a片| 岛国一区二区在线观看| 久草视频在线观| 亚洲人成电影网| 四虎国产精品免费久久5151| av久久久久久| 26uuu精品一区二区| 国产无遮挡又黄又爽又色视频| 日韩在线观看视频免费| 欧一区二区三区| 国内性生活视频| 国产日韩欧美高清| 97人妻精品一区二区三区软件| 欧美高清视频一区二区| 偷窥自拍亚洲色图精选| 欧美男女交配视频| 一区二区三区四区在线播放 | 99精品久久久久久| 小泽玛利亚一区二区三区视频| 久久精品亚洲一区| 日韩高清一级| 一本一道久久a久久综合蜜桃| 亚洲国产一区二区三区青草影视| 久久久久久久久亚洲精品| 91久久国产婷婷一区二区| 亚洲激情综合| 香蕉久久久久久久| 亚洲爱爱爱爱爱| 91欧美精品| 国产妇女馒头高清泬20p多| 欧美国产精品专区| 免费a视频在线观看| 国产精品对白刺激| 亚洲二区免费| 日本免费网站视频| 日韩精品在线视频观看| 日韩电影精品|