errgroup使用
May 8, 2024
在go中errgroup包的使用,和源码分析
包 #
golang.org/x/sync/errgroup
errgroup.WithContext() #
//创建一个errGroup对象和ctx对象
g, ctx := errgroup.WithContext(context.TODO())
//Group结构体
type Group struct {
cancel func() // 这个存的是context的cancel方法
wg sync.WaitGroup // 封装sync.WaitGroup
errOnce sync.Once // 保证获取的是第一次出错的信息,避免被后面的goroutine的错误覆盖
err error // 保存第一个返回的错误
}
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
Go() #
//创建并发线程
g.Go(func() error {
return nil
})
//内部实现
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err //记录子协程中的错误
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
wait() #
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
实现 #
g, ctx := errgroup.WithContext(context.TODO())
dataMapping := []string{
{
"test",
"sadhkl",
"ejbfrkw",
},
}
doces := make(chan interface{}, len(dataMapping))
g.Go(func() error {
defer close(doces)
for _, val := range dataMapping {
select {
case doces <- val:
case <-ctx.Done(): //若可以读取,则其他进程已经发起了取消。
return ctx.Err()
}
}
return nil
})
if err := g.Wait(); err != nil {
fmt.Println(err)
}
fmt.Println(doces)