File size: 3,113 Bytes
13555f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package utils

import (
	"context"
	"runtime/debug"
	"sync/atomic"
	"time"

	"github.com/mattermost/mattermost/server/public/shared/mlog"
)

// CallbackFunc is a func that can enqueued in the callback queue and will be
// called when dequeued.
type CallbackFunc func() error

// CallbackQueue provides a simple thread pool for processing callbacks. Callbacks will
// be executed in the order in which they are enqueued, but no guarantees are provided
// regarding the order in which they finish (unless poolSize == 1).
type CallbackQueue struct {
	name     string
	poolSize int

	queue chan CallbackFunc
	done  chan struct{}
	alive chan int

	idone uint32

	logger mlog.LoggerIFace
}

// NewCallbackQueue creates a new CallbackQueue and starts a thread pool to service it.
func NewCallbackQueue(name string, queueSize int, poolSize int, logger mlog.LoggerIFace) *CallbackQueue {
	cn := &CallbackQueue{
		name:     name,
		poolSize: poolSize,
		queue:    make(chan CallbackFunc, queueSize),
		done:     make(chan struct{}),
		alive:    make(chan int, poolSize),
		logger:   logger,
	}

	for i := 0; i < poolSize; i++ {
		go cn.loop(i)
	}

	return cn
}

// Shutdown stops accepting enqueues and exits all pool threads. This method waits
// as long as the context allows for the threads to exit.
// Returns true if the pool exited, false on timeout.
func (cn *CallbackQueue) Shutdown(context context.Context) bool {
	if !atomic.CompareAndSwapUint32(&cn.idone, 0, 1) {
		// already shutdown
		return true
	}

	// signal threads to exit
	close(cn.done)

	// wait for the threads to exit or timeout
	count := 0
	for count < cn.poolSize {
		select {
		case <-cn.alive:
			count++
		case <-context.Done():
			return false
		}
	}

	// try to drain any remaining callbacks
	for {
		select {
		case f := <-cn.queue:
			cn.exec(f)
		case <-context.Done():
			return false
		default:
			return true
		}
	}
}

// Enqueue adds a callback to the queue.
func (cn *CallbackQueue) Enqueue(f CallbackFunc) {
	if atomic.LoadUint32(&cn.idone) != 0 {
		cn.logger.Debug("CallbackQueue skipping enqueue, notifier is shutdown", mlog.String("name", cn.name))
		return
	}

	select {
	case cn.queue <- f:
	default:
		start := time.Now()
		cn.queue <- f
		dur := time.Since(start)
		cn.logger.Warn("CallbackQueue queue backlog", mlog.String("name", cn.name), mlog.Duration("wait_time", dur))
	}
}

func (cn *CallbackQueue) loop(id int) {
	defer func() {
		cn.logger.Trace("CallbackQueue thread exited", mlog.String("name", cn.name), mlog.Int("id", id))
		cn.alive <- id
	}()

	for {
		select {
		case f := <-cn.queue:
			cn.exec(f)
		case <-cn.done:
			return
		}
	}
}

func (cn *CallbackQueue) exec(f CallbackFunc) {
	// don't let a panic in the callback exit the thread.
	defer func() {
		if r := recover(); r != nil {
			stack := debug.Stack()
			cn.logger.Error("CallbackQueue callback panic",
				mlog.String("name", cn.name),
				mlog.Any("panic", r),
				mlog.String("stack", string(stack)),
			)
		}
	}()

	if err := f(); err != nil {
		cn.logger.Error("CallbackQueue callback error", mlog.String("name", cn.name), mlog.Err(err))
	}
}