| // Copyright 2020 The Go Authors. All rights reserved. | |
| // Use of this source code is governed by a BSD-style | |
| // license that can be found in the LICENSE file. | |
| package par | |
| import "fmt" | |
| // Queue manages a set of work items to be executed in parallel. The number of | |
| // active work items is limited, and excess items are queued sequentially. | |
| type Queue struct { | |
| maxActive int | |
| st chan queueState | |
| } | |
| type queueState struct { | |
| active int // number of goroutines processing work; always nonzero when len(backlog) > 0 | |
| backlog []func() | |
| idle chan struct{} // if non-nil, closed when active becomes 0 | |
| } | |
| // NewQueue returns a Queue that executes up to maxActive items in parallel. | |
| // | |
| // maxActive must be positive. | |
| func NewQueue(maxActive int) *Queue { | |
| if maxActive < 1 { | |
| panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive)) | |
| } | |
| q := &Queue{ | |
| maxActive: maxActive, | |
| st: make(chan queueState, 1), | |
| } | |
| q.st <- queueState{} | |
| return q | |
| } | |
| // Add adds f as a work item in the queue. | |
| // | |
| // Add returns immediately, but the queue will be marked as non-idle until after | |
| // f (and any subsequently-added work) has completed. | |
| func (q *Queue) Add(f func()) { | |
| st := <-q.st | |
| if st.active == q.maxActive { | |
| st.backlog = append(st.backlog, f) | |
| q.st <- st | |
| return | |
| } | |
| if st.active == 0 { | |
| // Mark q as non-idle. | |
| st.idle = nil | |
| } | |
| st.active++ | |
| q.st <- st | |
| go func() { | |
| for { | |
| f() | |
| st := <-q.st | |
| if len(st.backlog) == 0 { | |
| if st.active--; st.active == 0 && st.idle != nil { | |
| close(st.idle) | |
| } | |
| q.st <- st | |
| return | |
| } | |
| f, st.backlog = st.backlog[0], st.backlog[1:] | |
| q.st <- st | |
| } | |
| }() | |
| } | |
| // Idle returns a channel that will be closed when q has no (active or enqueued) | |
| // work outstanding. | |
| func (q *Queue) Idle() <-chan struct{} { | |
| st := <-q.st | |
| defer func() { q.st <- st }() | |
| if st.idle == nil { | |
| st.idle = make(chan struct{}) | |
| if st.active == 0 { | |
| close(st.idle) | |
| } | |
| } | |
| return st.idle | |
| } | |