Go 并發(fā)中 panic 的處理
Go 中的并發(fā)
在 go 中,可以通過(guò)原聲關(guān)鍵字 go 創(chuàng)建協(xié)程。
go func() {
// your code
}()
// go onyour code 和 go on 誰(shuí)先執(zhí)行是不確定的,這取決于調(diào)度。如果想等協(xié)程執(zhí)行完再繼續(xù)執(zhí)行的話怎么辦呢?比如下面代碼。
go func() {
// your code1
}()
go func() {
// your code2
}()
// go on其中一種方式是使用 sync.WaitGroup。
var wg sync.WaitGroup
wg.Add(2)
go func() {
wg.Done()
}()
go func() {
wg.Done()
}()
wg.Wait()
// go onwg.Wait() 會(huì)阻塞直到兩個(gè)協(xié)程執(zhí)行完后,這個(gè)有點(diǎn)類似多線程中的線程屏障。但是這種方式過(guò)于靈活,我們需要控制好 Add 和 Done 的邏輯,否則會(huì)導(dǎo)致一直阻塞或者 panic。幸好 go 還提供了 errgroup.Group,代碼如下。
var g errgroup.Group
g.Go(func() error {})
g.Go(func() error {})
g.Wait()在用法上,errgroup.Group 和 sync.WaitGroup 很像,前者也是基于后者實(shí)現(xiàn)的,但是前者使用起來(lái)相對(duì)簡(jiǎn)單、并且還實(shí)現(xiàn)了自動(dòng)管理 Add/Done、限制并發(fā)數(shù)等能力。在日常開(kāi)發(fā)中也是經(jīng)常使用 errgroup.Group 來(lái)實(shí)現(xiàn)并發(fā)。
并發(fā)中的 panic 問(wèn)題
接下來(lái)看一下使用協(xié)程實(shí)現(xiàn)并發(fā)時(shí)的 panic 處理問(wèn)題,前面兩種都是通過(guò) go 關(guān)鍵字創(chuàng)建的協(xié)程,所以只能在函數(shù)里手動(dòng)處理,所以主要看一下 errgroup.Group 的 panic 處理問(wèn)題。我們從 errgroup.Group 的 Go 函數(shù)開(kāi)始看。
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.add(f)
}Go 前面的邏輯用于限制并發(fā),主要是 add 函數(shù)。
func (g *Group) add(f func() error) {
g.wg.Add(1)
go func() {
defer g.done()
// panic 處理
defer func() {
v := recover()
g.mu.Lock()
defer g.mu.Unlock()
// 記錄 panic 信息,但是只會(huì)記錄第一次 panic 的信息
if v != nil && g.panicValue == nil {
g.panicValue = ...
}
}()
// 用戶函數(shù)
err := f()
// 記錄錯(cuò)誤信息,只記錄第一個(gè)錯(cuò)誤
if err != nil {
g.errOnce.Do(func() {
g.err = err
})
}
}()
}可以看到 errgroup.Group 處理了 panic 問(wèn)題并記錄了 panic 信息,看起來(lái)我們的函數(shù)里可以不處理 panic,那 errgroup.Group 是怎么處理 panic 信息的呢?接著看 Wait 函數(shù)中處理。
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
if g.panicValue != nil {
panic(g.panicValue)
}
return g.err
}可以看到最終會(huì)在 Wait 函數(shù)執(zhí)行 panic,所以我們只需要處理 Wait 函數(shù)的 panic 就行。
defer func() {
v := recover()
// ...
}()
g.Wait()但是還有有一個(gè)問(wèn)題是 errgroup.Group 只會(huì)記錄第一個(gè) panic,如果我們多個(gè)協(xié)程發(fā)生了 panic 則會(huì)丟失信息,所以我們最好還是自己處理,代碼如下。
defer func() {
v := recover()
// ...
}()
var g errgroup.Group
g.Go(func() error {
defer func() {
v := recover()
// ...
}()
})
g.Wait()這樣就可以記錄每一個(gè)協(xié)程的 panic 信息,那么如果協(xié)程里的 defer 中再次發(fā)生 panic 怎么辦呢?通過(guò)之前的分析可以知道,這個(gè) panic 會(huì)被 errgroup.Group 捕獲,并最終在 Wait 中執(zhí)行 panic,所以即使協(xié)程里處理了 panic,我們也需要處理 Wait 的 panic。如果我們運(yùn)行在一些框架中,框架往往會(huì)幫我們處理,比如 kitex 的處理如下。
defer func() {
// panic 處理
if handlerErr := recover(); handlerErr != nil {
err = kerrors.ErrPanic.WithCauseAndStack(
fmt.Errorf(
"[happened in biz handler, method=%s.%s, please check the panic at the server side] %s",
svcInfo.ServiceName, methodName, handlerErr),
string(debug.Stack()))
}
}()
// 執(zhí)行業(yè)務(wù)代碼
minfo := svcInfo.MethodInfo(methodName)
implHandlerFunc := minfo.Handler()
err = implHandlerFunc(ctx, svc.handler, args, resp)無(wú)論是協(xié)程里處理還是對(duì) Wait 函數(shù)的處理,每次都要寫(xiě)類似的代碼非常麻煩,一旦忘記寫(xiě)就容易出現(xiàn) panic,嚴(yán)重還會(huì)導(dǎo)致進(jìn)程 crash(如果上層也沒(méi)有處理 panic)。我們可以基于 errgroup.Group 提供一個(gè)安全版本的 errgroup.Group。
type ErrGroup struct {
errgroup.Group
cancel func(error)
// 可以自定義處理函數(shù)
handler func(context.Context, *error)
}
// 默認(rèn)處理
func handler(_ context.Context, err *error) {
if e := recover(); e != nil {
if err != nil {
*err = fmt.Errorf("panic happen: %v", e)
}
}
}
func WithContext(ctx context.Context) (*ErrGroup, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &ErrGroup{cancel: cancel}, ctx
}
func (e *ErrGroup) SafeGo(ctx context.Context, f func() error) {
e.Go(func() (err error) {
if e.handler != nil {
defer e.handler(ctx, &err)
} else {
defer handler(ctx, &err)
}
return f()
})
}
func (e *ErrGroup) SafeWait() (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic happen: %v", e)
}
}()
err = e.Wait()
if e.cancel != nil {
e.cancel(err)
}
return err
}用法如下。
ctx := context.Background()
var eg safe_errgroup.ErrGroup
eg.SafeGo(ctx, func() error {
panic("oops")
})
err := eg.Wait()這樣就可以安全地實(shí)現(xiàn)并發(fā)了,具體可以參考:

































