Spaces:
Sleeping
Sleeping
| package utils | |
| import ( | |
| "context" | |
| "runtime/debug" | |
| "sync/atomic" | |
| "time" | |
| "github.com/mattermost/mattermost/server/public/shared/mlog" | |
| ) | |
| // CallbackFunc is a func that can enqueued in the callback queue and will be | |
| // called when dequeued. | |
| type CallbackFunc func() error | |
| // CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will | |
| // be executed in the order in which they are enqueued, but no guarantees are provided | |
| // regarding the order in which they finish (unless poolSize == 1). | |
| type CallbackQueue struct { | |
| name string | |
| poolSize int | |
| queue chan CallbackFunc | |
| done chan struct{} | |
| alive chan int | |
| idone uint32 | |
| logger mlog.LoggerIFace | |
| } | |
| // NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it. | |
| func NewCallbackQueue(name string, queueSize int, poolSize int, logger mlog.LoggerIFace) *CallbackQueue { | |
| cn := &CallbackQueue{ | |
| name: name, | |
| poolSize: poolSize, | |
| queue: make(chan CallbackFunc, queueSize), | |
| done: make(chan struct{}), | |
| alive: make(chan int, poolSize), | |
| logger: logger, | |
| } | |
| for i := 0; i < poolSize; i++ { | |
| go cn.loop(i) | |
| } | |
| return cn | |
| } | |
| // Shutdown stops accepting enqueues and exits all pool threads. This method waits | |
| // as long as the context allows for the threads to exit. | |
| // Returns true if the pool exited, false on timeout. | |
| func (cn *CallbackQueue) Shutdown(context context.Context) bool { | |
| if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) { | |
| // already shutdown | |
| return true | |
| } | |
| // signal threads to exit | |
| close(cn.done) | |
| // wait for the threads to exit or timeout | |
| count := 0 | |
| for count < cn.poolSize { | |
| select { | |
| case <-cn.alive: | |
| count++ | |
| case <-context.Done(): | |
| return false | |
| } | |
| } | |
| // try to drain any remaining callbacks | |
| for { | |
| select { | |
| case f := <-cn.queue: | |
| cn.exec(f) | |
| case <-context.Done(): | |
| return false | |
| default: | |
| return true | |
| } | |
| } | |
| } | |
| // Enqueue adds a callback to the queue. | |
| func (cn *CallbackQueue) Enqueue(f CallbackFunc) { | |
| if atomic.LoadUint32(&cn.idone) != 0 { | |
| cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name)) | |
| return | |
| } | |
| select { | |
| case cn.queue <- f: | |
| default: | |
| start := time.Now() | |
| cn.queue <- f | |
| dur := time.Since(start) | |
| cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur)) | |
| } | |
| } | |
| func (cn *CallbackQueue) loop(id int) { | |
| defer func() { | |
| cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id)) | |
| cn.alive <- id | |
| }() | |
| for { | |
| select { | |
| case f := <-cn.queue: | |
| cn.exec(f) | |
| case <-cn.done: | |
| return | |
| } | |
| } | |
| } | |
| func (cn *CallbackQueue) exec(f CallbackFunc) { | |
| // don't let a panic in the callback exit the thread. | |
| defer func() { | |
| if r := recover(); r != nil { | |
| stack := debug.Stack() | |
| cn.logger.Error("CallbackQueue callback panic", | |
| mlog.String("name", cn.name), | |
| mlog.Any("panic", r), | |
| mlog.String("stack", string(stack)), | |
| ) | |
| } | |
| }() | |
| if err := f(); err != nil { | |
| cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err)) | |
| } | |
| } | |