|
|
package model |
|
|
|
|
|
import ( |
|
|
"sort" |
|
|
"sync" |
|
|
"time" |
|
|
|
|
|
"github.com/mudler/LocalAI/pkg/xsysinfo" |
|
|
process "github.com/mudler/go-processmanager" |
|
|
"github.com/mudler/xlog" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type WatchDog struct { |
|
|
sync.Mutex |
|
|
busyTime map[string]time.Time |
|
|
idleTime map[string]time.Time |
|
|
lastUsed map[string]time.Time |
|
|
timeout, idletimeout time.Duration |
|
|
addressMap map[string]*process.Process |
|
|
addressModelMap map[string]string |
|
|
pm ProcessManager |
|
|
stop chan bool |
|
|
|
|
|
busyCheck, idleCheck bool |
|
|
lruLimit int |
|
|
|
|
|
|
|
|
memoryReclaimerEnabled bool |
|
|
memoryReclaimerThreshold float64 |
|
|
watchdogInterval time.Duration |
|
|
|
|
|
|
|
|
forceEvictionWhenBusy bool |
|
|
} |
|
|
|
|
|
type ProcessManager interface { |
|
|
ShutdownModel(modelName string) error |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func NewWatchDog(opts ...WatchDogOption) *WatchDog { |
|
|
o := NewWatchDogOptions(opts...) |
|
|
|
|
|
return &WatchDog{ |
|
|
timeout: o.busyTimeout, |
|
|
idletimeout: o.idleTimeout, |
|
|
pm: o.processManager, |
|
|
busyTime: make(map[string]time.Time), |
|
|
idleTime: make(map[string]time.Time), |
|
|
lastUsed: make(map[string]time.Time), |
|
|
addressMap: make(map[string]*process.Process), |
|
|
busyCheck: o.busyCheck, |
|
|
idleCheck: o.idleCheck, |
|
|
lruLimit: o.lruLimit, |
|
|
addressModelMap: make(map[string]string), |
|
|
stop: make(chan bool, 1), |
|
|
memoryReclaimerEnabled: o.memoryReclaimerEnabled, |
|
|
memoryReclaimerThreshold: o.memoryReclaimerThreshold, |
|
|
watchdogInterval: o.watchdogInterval, |
|
|
forceEvictionWhenBusy: o.forceEvictionWhenBusy, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) SetLRULimit(limit int) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
wd.lruLimit = limit |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) GetLRULimit() int { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
return wd.lruLimit |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) SetMemoryReclaimer(enabled bool, threshold float64) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
wd.memoryReclaimerEnabled = enabled |
|
|
wd.memoryReclaimerThreshold = threshold |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) GetMemoryReclaimerSettings() (enabled bool, threshold float64) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
return wd.memoryReclaimerEnabled, wd.memoryReclaimerThreshold |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) SetForceEvictionWhenBusy(force bool) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
wd.forceEvictionWhenBusy = force |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) Shutdown() { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
xlog.Info("[WatchDog] Shutting down watchdog") |
|
|
wd.stop <- true |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) AddAddressModelMap(address string, model string) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
wd.addressModelMap[address] = model |
|
|
|
|
|
} |
|
|
func (wd *WatchDog) Add(address string, p *process.Process) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
wd.addressMap[address] = p |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) Mark(address string) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
now := time.Now() |
|
|
wd.busyTime[address] = now |
|
|
wd.lastUsed[address] = now |
|
|
delete(wd.idleTime, address) |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) UnMark(ModelAddress string) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
now := time.Now() |
|
|
delete(wd.busyTime, ModelAddress) |
|
|
wd.idleTime[ModelAddress] = now |
|
|
wd.lastUsed[ModelAddress] = now |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (wd *WatchDog) UpdateLastUsed(address string) { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
wd.lastUsed[address] = time.Now() |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) GetLoadedModelCount() int { |
|
|
wd.Lock() |
|
|
defer wd.Unlock() |
|
|
return len(wd.addressModelMap) |
|
|
} |
|
|
|
|
|
|
|
|
type modelUsageInfo struct { |
|
|
address string |
|
|
model string |
|
|
lastUsed time.Time |
|
|
} |
|
|
|
|
|
|
|
|
type EnforceLRULimitResult struct { |
|
|
EvictedCount int |
|
|
NeedMore bool |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (wd *WatchDog) EnforceLRULimit(pendingLoads int) EnforceLRULimitResult { |
|
|
if wd.lruLimit <= 0 { |
|
|
return EnforceLRULimitResult{EvictedCount: 0, NeedMore: false} |
|
|
} |
|
|
|
|
|
wd.Lock() |
|
|
|
|
|
currentCount := len(wd.addressModelMap) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
modelsToEvict := currentCount - wd.lruLimit + pendingLoads + 1 |
|
|
forceEvictionWhenBusy := wd.forceEvictionWhenBusy |
|
|
if modelsToEvict <= 0 { |
|
|
wd.Unlock() |
|
|
return EnforceLRULimitResult{EvictedCount: 0, NeedMore: false} |
|
|
} |
|
|
|
|
|
xlog.Debug("[WatchDog] LRU enforcement triggered", "current", currentCount, "pendingLoads", pendingLoads, "limit", wd.lruLimit, "toEvict", modelsToEvict) |
|
|
|
|
|
|
|
|
var models []modelUsageInfo |
|
|
for address, model := range wd.addressModelMap { |
|
|
lastUsed := wd.lastUsed[address] |
|
|
if lastUsed.IsZero() { |
|
|
|
|
|
lastUsed = time.Time{} |
|
|
} |
|
|
models = append(models, modelUsageInfo{ |
|
|
address: address, |
|
|
model: model, |
|
|
lastUsed: lastUsed, |
|
|
}) |
|
|
} |
|
|
|
|
|
|
|
|
sort.Slice(models, func(i, j int) bool { |
|
|
return models[i].lastUsed.Before(models[j].lastUsed) |
|
|
}) |
|
|
|
|
|
|
|
|
var modelsToShutdown []string |
|
|
evictedCount := 0 |
|
|
skippedBusyCount := 0 |
|
|
for i := 0; evictedCount < modelsToEvict && i < len(models); i++ { |
|
|
m := models[i] |
|
|
|
|
|
_, isBusy := wd.busyTime[m.address] |
|
|
if isBusy && !forceEvictionWhenBusy { |
|
|
|
|
|
xlog.Warn("[WatchDog] Skipping LRU eviction for busy model", "model", m.model, "reason", "model has active API calls") |
|
|
skippedBusyCount++ |
|
|
continue |
|
|
} |
|
|
xlog.Info("[WatchDog] LRU evicting model", "model", m.model, "lastUsed", m.lastUsed, "busy", isBusy) |
|
|
modelsToShutdown = append(modelsToShutdown, m.model) |
|
|
|
|
|
wd.untrack(m.address) |
|
|
evictedCount++ |
|
|
} |
|
|
needMore := evictedCount < modelsToEvict && skippedBusyCount > 0 |
|
|
wd.Unlock() |
|
|
|
|
|
|
|
|
for _, model := range modelsToShutdown { |
|
|
if err := wd.pm.ShutdownModel(model); err != nil { |
|
|
xlog.Error("[WatchDog] error shutting down model during LRU eviction", "error", err, "model", model) |
|
|
} |
|
|
xlog.Debug("[WatchDog] LRU eviction complete", "model", model) |
|
|
} |
|
|
|
|
|
if needMore { |
|
|
xlog.Warn("[WatchDog] LRU eviction incomplete", "evicted", evictedCount, "needed", modelsToEvict, "skippedBusy", skippedBusyCount, "reason", "some models are busy with active API calls") |
|
|
} |
|
|
|
|
|
return EnforceLRULimitResult{ |
|
|
EvictedCount: len(modelsToShutdown), |
|
|
NeedMore: needMore, |
|
|
} |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) Run() { |
|
|
xlog.Info("[WatchDog] starting watchdog") |
|
|
|
|
|
for { |
|
|
select { |
|
|
case <-wd.stop: |
|
|
xlog.Info("[WatchDog] Stopping watchdog") |
|
|
return |
|
|
case <-time.After(wd.watchdogInterval): |
|
|
|
|
|
wd.Lock() |
|
|
busyCheck := wd.busyCheck |
|
|
idleCheck := wd.idleCheck |
|
|
memoryCheck := wd.memoryReclaimerEnabled |
|
|
wd.Unlock() |
|
|
|
|
|
if !busyCheck && !idleCheck && !memoryCheck { |
|
|
xlog.Info("[WatchDog] No checks enabled, stopping watchdog") |
|
|
return |
|
|
} |
|
|
if busyCheck { |
|
|
wd.checkBusy() |
|
|
} |
|
|
if idleCheck { |
|
|
wd.checkIdle() |
|
|
} |
|
|
if memoryCheck { |
|
|
wd.checkMemory() |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) checkIdle() { |
|
|
wd.Lock() |
|
|
xlog.Debug("[WatchDog] Watchdog checks for idle connections") |
|
|
|
|
|
|
|
|
var modelsToShutdown []string |
|
|
for address, t := range wd.idleTime { |
|
|
xlog.Debug("[WatchDog] idle connection", "address", address) |
|
|
if time.Since(t) > wd.idletimeout { |
|
|
xlog.Warn("[WatchDog] Address is idle for too long, killing it", "address", address) |
|
|
model, ok := wd.addressModelMap[address] |
|
|
if ok { |
|
|
modelsToShutdown = append(modelsToShutdown, model) |
|
|
} else { |
|
|
xlog.Warn("[WatchDog] Address unresolvable", "address", address) |
|
|
} |
|
|
wd.untrack(address) |
|
|
} |
|
|
} |
|
|
wd.Unlock() |
|
|
|
|
|
|
|
|
for _, model := range modelsToShutdown { |
|
|
if err := wd.pm.ShutdownModel(model); err != nil { |
|
|
xlog.Error("[watchdog] error shutting down model", "error", err, "model", model) |
|
|
} |
|
|
xlog.Debug("[WatchDog] model shut down", "model", model) |
|
|
} |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) checkBusy() { |
|
|
wd.Lock() |
|
|
xlog.Debug("[WatchDog] Watchdog checks for busy connections") |
|
|
|
|
|
|
|
|
var modelsToShutdown []string |
|
|
for address, t := range wd.busyTime { |
|
|
xlog.Debug("[WatchDog] active connection", "address", address) |
|
|
|
|
|
if time.Since(t) > wd.timeout { |
|
|
model, ok := wd.addressModelMap[address] |
|
|
if ok { |
|
|
xlog.Warn("[WatchDog] Model is busy for too long, killing it", "model", model) |
|
|
modelsToShutdown = append(modelsToShutdown, model) |
|
|
} else { |
|
|
xlog.Warn("[WatchDog] Address unresolvable", "address", address) |
|
|
} |
|
|
wd.untrack(address) |
|
|
} |
|
|
} |
|
|
wd.Unlock() |
|
|
|
|
|
|
|
|
for _, model := range modelsToShutdown { |
|
|
if err := wd.pm.ShutdownModel(model); err != nil { |
|
|
xlog.Error("[watchdog] error shutting down model", "error", err, "model", model) |
|
|
} |
|
|
xlog.Debug("[WatchDog] model shut down", "model", model) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) checkMemory() { |
|
|
wd.Lock() |
|
|
threshold := wd.memoryReclaimerThreshold |
|
|
enabled := wd.memoryReclaimerEnabled |
|
|
modelCount := len(wd.addressModelMap) |
|
|
wd.Unlock() |
|
|
|
|
|
if !enabled || threshold <= 0 || modelCount == 0 { |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
aggregate := xsysinfo.GetResourceAggregateInfo() |
|
|
if aggregate.TotalMemory == 0 { |
|
|
xlog.Debug("[WatchDog] No memory information available for memory reclaimer") |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
thresholdPercent := threshold * 100 |
|
|
|
|
|
memoryType := "GPU" |
|
|
if aggregate.GPUCount == 0 { |
|
|
memoryType = "RAM" |
|
|
} |
|
|
|
|
|
xlog.Debug("[WatchDog] Memory check", "type", memoryType, "usage_percent", aggregate.UsagePercent, "threshold_percent", thresholdPercent, "loaded_models", modelCount) |
|
|
|
|
|
|
|
|
if aggregate.UsagePercent > thresholdPercent { |
|
|
xlog.Warn("[WatchDog] Memory usage exceeds threshold, evicting LRU backend", "type", memoryType, "usage_percent", aggregate.UsagePercent, "threshold_percent", thresholdPercent) |
|
|
|
|
|
|
|
|
wd.evictLRUModel() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (wd *WatchDog) evictLRUModel() { |
|
|
wd.Lock() |
|
|
|
|
|
if len(wd.addressModelMap) == 0 { |
|
|
wd.Unlock() |
|
|
return |
|
|
} |
|
|
|
|
|
forceEvictionWhenBusy := wd.forceEvictionWhenBusy |
|
|
|
|
|
|
|
|
var models []modelUsageInfo |
|
|
for address, model := range wd.addressModelMap { |
|
|
lastUsed := wd.lastUsed[address] |
|
|
if lastUsed.IsZero() { |
|
|
lastUsed = time.Time{} |
|
|
} |
|
|
models = append(models, modelUsageInfo{ |
|
|
address: address, |
|
|
model: model, |
|
|
lastUsed: lastUsed, |
|
|
}) |
|
|
} |
|
|
|
|
|
if len(models) == 0 { |
|
|
wd.Unlock() |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
sort.Slice(models, func(i, j int) bool { |
|
|
return models[i].lastUsed.Before(models[j].lastUsed) |
|
|
}) |
|
|
|
|
|
|
|
|
var lruModel *modelUsageInfo |
|
|
for i := 0; i < len(models); i++ { |
|
|
m := models[i] |
|
|
_, isBusy := wd.busyTime[m.address] |
|
|
if isBusy && !forceEvictionWhenBusy { |
|
|
|
|
|
xlog.Warn("[WatchDog] Skipping memory reclaimer eviction for busy model", "model", m.model, "reason", "model has active API calls") |
|
|
continue |
|
|
} |
|
|
lruModel = &m |
|
|
break |
|
|
} |
|
|
|
|
|
if lruModel == nil { |
|
|
|
|
|
wd.Unlock() |
|
|
xlog.Warn("[WatchDog] Memory reclaimer cannot evict: all models are busy with active API calls") |
|
|
return |
|
|
} |
|
|
|
|
|
xlog.Info("[WatchDog] Memory reclaimer evicting LRU model", "model", lruModel.model, "lastUsed", lruModel.lastUsed) |
|
|
|
|
|
|
|
|
wd.untrack(lruModel.address) |
|
|
wd.Unlock() |
|
|
|
|
|
|
|
|
if err := wd.pm.ShutdownModel(lruModel.model); err != nil { |
|
|
xlog.Error("[WatchDog] error shutting down model during memory reclamation", "error", err, "model", lruModel.model) |
|
|
} else { |
|
|
xlog.Info("[WatchDog] Memory reclaimer eviction complete", "model", lruModel.model) |
|
|
} |
|
|
} |
|
|
|
|
|
func (wd *WatchDog) untrack(address string) { |
|
|
delete(wd.busyTime, address) |
|
|
delete(wd.idleTime, address) |
|
|
delete(wd.lastUsed, address) |
|
|
delete(wd.addressModelMap, address) |
|
|
delete(wd.addressMap, address) |
|
|
} |
|
|
|