errgroup使用

errgroup使用

May 8, 2024
Go

在go中errgroup包的使用,和源码分析

#

golang.org/x/sync/errgroup

errgroup.WithContext() #

//创建一个errGroup对象和ctx对象
g, ctx := errgroup.WithContext(context.TODO())

//Group结构体
type Group struct {
 cancel func() // 这个存的是context的cancel方法

 wg sync.WaitGroup // 封装sync.WaitGroup

 errOnce sync.Once // 保证获取的是第一次出错的信息,避免被后面的goroutine的错误覆盖
 err     error // 保存第一个返回的错误
}

func WithContext(ctx context.Context) (*Group, context.Context) {
 ctx, cancel := context.WithCancel(ctx)
 return &Group{cancel: cancel}, ctx
}

Go() #

//创建并发线程
g.Go(func() error {
		return nil
	})
//内部实现
func (g *Group) Go(f func() error) {
	g.wg.Add(1)
	go func() {
		defer g.wg.Done()
		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err             //记录子协程中的错误
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

wait() #

func (g *Group) Wait() error {
 g.wg.Wait()
 if g.cancel != nil {
  g.cancel()
 }
 return g.err
}

实现 #

g, ctx := errgroup.WithContext(context.TODO())
dataMapping := []string{
	{
	"test",
	"sadhkl",
	"ejbfrkw",
	},
}
doces := make(chan interface{}, len(dataMapping))
g.Go(func() error {
	defer close(doces)
	for _, val := range dataMapping {
		select {
		case doces <- val:
		case <-ctx.Done(): //若可以读取,则其他进程已经发起了取消。
			return ctx.Err()
		}
	}
	return nil
})
if err := g.Wait(); err != nil {
	fmt.Println(err)
}
fmt.Println(doces)