| package errgroup | |
| import ( | |
| "context" | |
| "fmt" | |
| "sync" | |
| "sync/atomic" | |
| "github.com/avast/retry-go" | |
| ) | |
| type token struct{} | |
| type Group struct { | |
| cancel func(error) | |
| ctx context.Context | |
| opts []retry.Option | |
| success uint64 | |
| wg sync.WaitGroup | |
| sem chan token | |
| } | |
| func NewGroupWithContext(ctx context.Context, limit int, retryOpts ...retry.Option) (*Group, context.Context) { | |
| ctx, cancel := context.WithCancelCause(ctx) | |
| return (&Group{cancel: cancel, ctx: ctx, opts: append(retryOpts, retry.Context(ctx))}).SetLimit(limit), ctx | |
| } | |
| func (g *Group) done() { | |
| if g.sem != nil { | |
| <-g.sem | |
| } | |
| g.wg.Done() | |
| atomic.AddUint64(&g.success, 1) | |
| } | |
| func (g *Group) Wait() error { | |
| g.wg.Wait() | |
| return context.Cause(g.ctx) | |
| } | |
| func (g *Group) Go(f func(ctx context.Context) error) { | |
| if g.sem != nil { | |
| g.sem <- token{} | |
| } | |
| g.wg.Add(1) | |
| go func() { | |
| defer g.done() | |
| if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { | |
| g.cancel(err) | |
| } | |
| }() | |
| } | |
| func (g *Group) TryGo(f func(ctx context.Context) error) bool { | |
| if g.sem != nil { | |
| select { | |
| case g.sem <- token{}: | |
| default: | |
| return false | |
| } | |
| } | |
| g.wg.Add(1) | |
| go func() { | |
| defer g.done() | |
| if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { | |
| g.cancel(err) | |
| } | |
| }() | |
| return true | |
| } | |
| func (g *Group) SetLimit(n int) *Group { | |
| if len(g.sem) != 0 { | |
| panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) | |
| } | |
| if n > 0 { | |
| g.sem = make(chan token, n) | |
| } else { | |
| g.sem = nil | |
| } | |
| return g | |
| } | |
| func (g *Group) Success() uint64 { | |
| return atomic.LoadUint64(&g.success) | |
| } | |
| func (g *Group) Err() error { | |
| return context.Cause(g.ctx) | |
| } | |