|
|
package usage |
|
|
|
|
|
import ( |
|
|
"context" |
|
|
"sync" |
|
|
"time" |
|
|
|
|
|
log "github.com/sirupsen/logrus" |
|
|
) |
|
|
|
|
|
|
|
|
type Record struct { |
|
|
Provider string |
|
|
Model string |
|
|
APIKey string |
|
|
AuthID string |
|
|
AuthIndex string |
|
|
Source string |
|
|
RequestedAt time.Time |
|
|
Failed bool |
|
|
Detail Detail |
|
|
} |
|
|
|
|
|
|
|
|
type Detail struct { |
|
|
InputTokens int64 |
|
|
OutputTokens int64 |
|
|
ReasoningTokens int64 |
|
|
CachedTokens int64 |
|
|
TotalTokens int64 |
|
|
} |
|
|
|
|
|
|
|
|
type Plugin interface { |
|
|
HandleUsage(ctx context.Context, record Record) |
|
|
} |
|
|
|
|
|
type queueItem struct { |
|
|
ctx context.Context |
|
|
record Record |
|
|
} |
|
|
|
|
|
|
|
|
type Manager struct { |
|
|
once sync.Once |
|
|
stopOnce sync.Once |
|
|
cancel context.CancelFunc |
|
|
|
|
|
mu sync.Mutex |
|
|
cond *sync.Cond |
|
|
queue []queueItem |
|
|
closed bool |
|
|
|
|
|
pluginsMu sync.RWMutex |
|
|
plugins []Plugin |
|
|
} |
|
|
|
|
|
|
|
|
func NewManager(buffer int) *Manager { |
|
|
m := &Manager{} |
|
|
m.cond = sync.NewCond(&m.mu) |
|
|
return m |
|
|
} |
|
|
|
|
|
|
|
|
func (m *Manager) Start(ctx context.Context) { |
|
|
if m == nil { |
|
|
return |
|
|
} |
|
|
m.once.Do(func() { |
|
|
if ctx == nil { |
|
|
ctx = context.Background() |
|
|
} |
|
|
var workerCtx context.Context |
|
|
workerCtx, m.cancel = context.WithCancel(ctx) |
|
|
go m.run(workerCtx) |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
func (m *Manager) Stop() { |
|
|
if m == nil { |
|
|
return |
|
|
} |
|
|
m.stopOnce.Do(func() { |
|
|
if m.cancel != nil { |
|
|
m.cancel() |
|
|
} |
|
|
m.mu.Lock() |
|
|
m.closed = true |
|
|
m.mu.Unlock() |
|
|
m.cond.Broadcast() |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
func (m *Manager) Register(plugin Plugin) { |
|
|
if m == nil || plugin == nil { |
|
|
return |
|
|
} |
|
|
m.pluginsMu.Lock() |
|
|
m.plugins = append(m.plugins, plugin) |
|
|
m.pluginsMu.Unlock() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (m *Manager) Publish(ctx context.Context, record Record) { |
|
|
if m == nil { |
|
|
return |
|
|
} |
|
|
|
|
|
m.Start(context.Background()) |
|
|
m.mu.Lock() |
|
|
if m.closed { |
|
|
m.mu.Unlock() |
|
|
return |
|
|
} |
|
|
m.queue = append(m.queue, queueItem{ctx: ctx, record: record}) |
|
|
m.mu.Unlock() |
|
|
m.cond.Signal() |
|
|
} |
|
|
|
|
|
func (m *Manager) run(ctx context.Context) { |
|
|
for { |
|
|
m.mu.Lock() |
|
|
for !m.closed && len(m.queue) == 0 { |
|
|
m.cond.Wait() |
|
|
} |
|
|
if len(m.queue) == 0 && m.closed { |
|
|
m.mu.Unlock() |
|
|
return |
|
|
} |
|
|
item := m.queue[0] |
|
|
m.queue = m.queue[1:] |
|
|
m.mu.Unlock() |
|
|
m.dispatch(item) |
|
|
} |
|
|
} |
|
|
|
|
|
func (m *Manager) dispatch(item queueItem) { |
|
|
m.pluginsMu.RLock() |
|
|
plugins := make([]Plugin, len(m.plugins)) |
|
|
copy(plugins, m.plugins) |
|
|
m.pluginsMu.RUnlock() |
|
|
if len(plugins) == 0 { |
|
|
return |
|
|
} |
|
|
for _, plugin := range plugins { |
|
|
if plugin == nil { |
|
|
continue |
|
|
} |
|
|
safeInvoke(plugin, item.ctx, item.record) |
|
|
} |
|
|
} |
|
|
|
|
|
func safeInvoke(plugin Plugin, ctx context.Context, record Record) { |
|
|
defer func() { |
|
|
if r := recover(); r != nil { |
|
|
log.Errorf("usage: plugin panic recovered: %v", r) |
|
|
} |
|
|
}() |
|
|
plugin.HandleUsage(ctx, record) |
|
|
} |
|
|
|
|
|
var defaultManager = NewManager(512) |
|
|
|
|
|
|
|
|
func DefaultManager() *Manager { return defaultManager } |
|
|
|
|
|
|
|
|
func RegisterPlugin(plugin Plugin) { DefaultManager().Register(plugin) } |
|
|
|
|
|
|
|
|
func PublishRecord(ctx context.Context, record Record) { DefaultManager().Publish(ctx, record) } |
|
|
|
|
|
|
|
|
func StartDefault(ctx context.Context) { DefaultManager().Start(ctx) } |
|
|
|
|
|
|
|
|
func StopDefault() { DefaultManager().Stop() } |
|
|
|