File size: 3,986 Bytes
f606b10 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | package usage
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// Record contains the usage statistics captured for a single provider request.
type Record struct {
Provider string
Model string
APIKey string
AuthID string
AuthIndex string
Source string
RequestedAt time.Time
Failed bool
Detail Detail
}
// Detail holds the token usage breakdown.
type Detail struct {
InputTokens int64
OutputTokens int64
ReasoningTokens int64
CachedTokens int64
TotalTokens int64
}
// Plugin consumes usage records emitted by the proxy runtime.
type Plugin interface {
HandleUsage(ctx context.Context, record Record)
}
type queueItem struct {
ctx context.Context
record Record
}
// Manager maintains a queue of usage records and delivers them to registered plugins.
type Manager struct {
once sync.Once
stopOnce sync.Once
cancel context.CancelFunc
mu sync.Mutex
cond *sync.Cond
queue []queueItem
closed bool
pluginsMu sync.RWMutex
plugins []Plugin
}
// NewManager constructs a manager with a buffered queue.
func NewManager(buffer int) *Manager {
m := &Manager{}
m.cond = sync.NewCond(&m.mu)
return m
}
// Start launches the background dispatcher. Calling Start multiple times is safe.
func (m *Manager) Start(ctx context.Context) {
if m == nil {
return
}
m.once.Do(func() {
if ctx == nil {
ctx = context.Background()
}
var workerCtx context.Context
workerCtx, m.cancel = context.WithCancel(ctx)
go m.run(workerCtx)
})
}
// Stop stops the dispatcher and drains the queue.
func (m *Manager) Stop() {
if m == nil {
return
}
m.stopOnce.Do(func() {
if m.cancel != nil {
m.cancel()
}
m.mu.Lock()
m.closed = true
m.mu.Unlock()
m.cond.Broadcast()
})
}
// Register appends a plugin to the delivery list.
func (m *Manager) Register(plugin Plugin) {
if m == nil || plugin == nil {
return
}
m.pluginsMu.Lock()
m.plugins = append(m.plugins, plugin)
m.pluginsMu.Unlock()
}
// Publish enqueues a usage record for processing. If no plugin is registered
// the record will be discarded downstream.
func (m *Manager) Publish(ctx context.Context, record Record) {
if m == nil {
return
}
// ensure worker is running even if Start was not called explicitly
m.Start(context.Background())
m.mu.Lock()
if m.closed {
m.mu.Unlock()
return
}
m.queue = append(m.queue, queueItem{ctx: ctx, record: record})
m.mu.Unlock()
m.cond.Signal()
}
func (m *Manager) run(ctx context.Context) {
for {
m.mu.Lock()
for !m.closed && len(m.queue) == 0 {
m.cond.Wait()
}
if len(m.queue) == 0 && m.closed {
m.mu.Unlock()
return
}
item := m.queue[0]
m.queue = m.queue[1:]
m.mu.Unlock()
m.dispatch(item)
}
}
func (m *Manager) dispatch(item queueItem) {
m.pluginsMu.RLock()
plugins := make([]Plugin, len(m.plugins))
copy(plugins, m.plugins)
m.pluginsMu.RUnlock()
if len(plugins) == 0 {
return
}
for _, plugin := range plugins {
if plugin == nil {
continue
}
safeInvoke(plugin, item.ctx, item.record)
}
}
func safeInvoke(plugin Plugin, ctx context.Context, record Record) {
defer func() {
if r := recover(); r != nil {
log.Errorf("usage: plugin panic recovered: %v", r)
}
}()
plugin.HandleUsage(ctx, record)
}
var defaultManager = NewManager(512)
// DefaultManager returns the global usage manager instance.
func DefaultManager() *Manager { return defaultManager }
// RegisterPlugin registers a plugin on the default manager.
func RegisterPlugin(plugin Plugin) { DefaultManager().Register(plugin) }
// PublishRecord publishes a record using the default manager.
func PublishRecord(ctx context.Context, record Record) { DefaultManager().Publish(ctx, record) }
// StartDefault starts the default manager's dispatcher.
func StartDefault(ctx context.Context) { DefaultManager().Start(ctx) }
// StopDefault stops the default manager's dispatcher.
func StopDefault() { DefaultManager().Stop() }
|