| | |
| | |
| | |
| |
|
| | |
| | package par |
| |
|
| | import ( |
| | "errors" |
| | "math/rand" |
| | "sync" |
| | "sync/atomic" |
| | ) |
| |
|
| | |
| | |
| | type Work[T comparable] struct { |
| | f func(T) |
| | running int |
| |
|
| | mu sync.Mutex |
| | added map[T]bool |
| | todo []T |
| | wait sync.Cond |
| | waiting int |
| | } |
| |
|
| | func (w *Work[T]) init() { |
| | if w.added == nil { |
| | w.added = make(map[T]bool) |
| | } |
| | } |
| |
|
| | |
| | func (w *Work[T]) Add(item T) { |
| | w.mu.Lock() |
| | w.init() |
| | if !w.added[item] { |
| | w.added[item] = true |
| | w.todo = append(w.todo, item) |
| | if w.waiting > 0 { |
| | w.wait.Signal() |
| | } |
| | } |
| | w.mu.Unlock() |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | func (w *Work[T]) Do(n int, f func(item T)) { |
| | if n < 1 { |
| | panic("par.Work.Do: n < 1") |
| | } |
| | if w.running >= 1 { |
| | panic("par.Work.Do: already called Do") |
| | } |
| |
|
| | w.running = n |
| | w.f = f |
| | w.wait.L = &w.mu |
| |
|
| | for i := 0; i < n-1; i++ { |
| | go w.runner() |
| | } |
| | w.runner() |
| | } |
| |
|
| | |
| | |
| | |
| | func (w *Work[T]) runner() { |
| | for { |
| | |
| | w.mu.Lock() |
| | for len(w.todo) == 0 { |
| | w.waiting++ |
| | if w.waiting == w.running { |
| | |
| | w.wait.Broadcast() |
| | w.mu.Unlock() |
| | return |
| | } |
| | w.wait.Wait() |
| | w.waiting-- |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | i := rand.Intn(len(w.todo)) |
| | item := w.todo[i] |
| | w.todo[i] = w.todo[len(w.todo)-1] |
| | w.todo = w.todo[:len(w.todo)-1] |
| | w.mu.Unlock() |
| |
|
| | w.f(item) |
| | } |
| | } |
| |
|
| | |
| | |
| | type ErrCache[K comparable, V any] struct { |
| | Cache[K, errValue[V]] |
| | } |
| |
|
| | type errValue[V any] struct { |
| | v V |
| | err error |
| | } |
| |
|
| | func (c *ErrCache[K, V]) Do(key K, f func() (V, error)) (V, error) { |
| | v := c.Cache.Do(key, func() errValue[V] { |
| | v, err := f() |
| | return errValue[V]{v, err} |
| | }) |
| | return v.v, v.err |
| | } |
| |
|
| | var ErrCacheEntryNotFound = errors.New("cache entry not found") |
| |
|
| | |
| | |
| | func (c *ErrCache[K, V]) Get(key K) (V, error) { |
| | v, ok := c.Cache.Get(key) |
| | if !ok { |
| | v.err = ErrCacheEntryNotFound |
| | } |
| | return v.v, v.err |
| | } |
| |
|
| | |
| | type Cache[K comparable, V any] struct { |
| | m sync.Map |
| | } |
| |
|
| | type cacheEntry[V any] struct { |
| | done atomic.Bool |
| | mu sync.Mutex |
| | result V |
| | } |
| |
|
| | |
| | |
| | |
| | func (c *Cache[K, V]) Do(key K, f func() V) V { |
| | entryIface, ok := c.m.Load(key) |
| | if !ok { |
| | entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry[V])) |
| | } |
| | e := entryIface.(*cacheEntry[V]) |
| | if !e.done.Load() { |
| | e.mu.Lock() |
| | if !e.done.Load() { |
| | e.result = f() |
| | e.done.Store(true) |
| | } |
| | e.mu.Unlock() |
| | } |
| | return e.result |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | func (c *Cache[K, V]) Get(key K) (V, bool) { |
| | entryIface, ok := c.m.Load(key) |
| | if !ok { |
| | return *new(V), false |
| | } |
| | e := entryIface.(*cacheEntry[V]) |
| | if !e.done.Load() { |
| | return *new(V), false |
| | } |
| | return e.result, true |
| | } |
| |
|