Spaces:
Sleeping
Sleeping
File size: 3,113 Bytes
13555f3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | package utils
import (
"context"
"runtime/debug"
"sync/atomic"
"time"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
// CallbackFunc is a func that can enqueued in the callback queue and will be
// called when dequeued.
type CallbackFunc func() error
// CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will
// be executed in the order in which they are enqueued, but no guarantees are provided
// regarding the order in which they finish (unless poolSize == 1).
type CallbackQueue struct {
name string
poolSize int
queue chan CallbackFunc
done chan struct{}
alive chan int
idone uint32
logger mlog.LoggerIFace
}
// NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it.
func NewCallbackQueue(name string, queueSize int, poolSize int, logger mlog.LoggerIFace) *CallbackQueue {
cn := &CallbackQueue{
name: name,
poolSize: poolSize,
queue: make(chan CallbackFunc, queueSize),
done: make(chan struct{}),
alive: make(chan int, poolSize),
logger: logger,
}
for i := 0; i < poolSize; i++ {
go cn.loop(i)
}
return cn
}
// Shutdown stops accepting enqueues and exits all pool threads. This method waits
// as long as the context allows for the threads to exit.
// Returns true if the pool exited, false on timeout.
func (cn *CallbackQueue) Shutdown(context context.Context) bool {
if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) {
// already shutdown
return true
}
// signal threads to exit
close(cn.done)
// wait for the threads to exit or timeout
count := 0
for count < cn.poolSize {
select {
case <-cn.alive:
count++
case <-context.Done():
return false
}
}
// try to drain any remaining callbacks
for {
select {
case f := <-cn.queue:
cn.exec(f)
case <-context.Done():
return false
default:
return true
}
}
}
// Enqueue adds a callback to the queue.
func (cn *CallbackQueue) Enqueue(f CallbackFunc) {
if atomic.LoadUint32(&cn.idone) != 0 {
cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name))
return
}
select {
case cn.queue <- f:
default:
start := time.Now()
cn.queue <- f
dur := time.Since(start)
cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur))
}
}
func (cn *CallbackQueue) loop(id int) {
defer func() {
cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id))
cn.alive <- id
}()
for {
select {
case f := <-cn.queue:
cn.exec(f)
case <-cn.done:
return
}
}
}
func (cn *CallbackQueue) exec(f CallbackFunc) {
// don't let a panic in the callback exit the thread.
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
cn.logger.Error("CallbackQueue callback panic",
mlog.String("name", cn.name),
mlog.Any("panic", r),
mlog.String("stack", string(stack)),
)
}
}()
if err := f(); err != nil {
cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err))
}
}
|