File size: 9,561 Bytes
0f07ba7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
package model
import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"
grpc "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/xlog"
"github.com/phayes/freeport"
)
const (
LLamaCPP = "llama-cpp"
)
var Aliases map[string]string = map[string]string{
"go-llama": LLamaCPP,
"llama": LLamaCPP,
"embedded-store": LocalStoreBackend,
"huggingface-embeddings": TransformersBackend,
"langchain-huggingface": LCHuggingFaceBackend,
"transformers-musicgen": TransformersBackend,
"sentencetransformers": TransformersBackend,
"mamba": TransformersBackend,
"stablediffusion": StableDiffusionGGMLBackend,
}
var TypeAlias map[string]string = map[string]string{
"sentencetransformers": "SentenceTransformer",
"huggingface-embeddings": "SentenceTransformer",
"mamba": "Mamba",
"transformers-musicgen": "MusicgenForConditionalGeneration",
}
const (
WhisperBackend = "whisper"
StableDiffusionGGMLBackend = "stablediffusion-ggml"
LCHuggingFaceBackend = "huggingface"
TransformersBackend = "transformers"
LocalStoreBackend = "local-store"
)
// starts the grpcModelProcess for the backend, and returns a grpc client
// It also loads the model
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string, string) (*Model, error) {
return func(modelID, modelName, modelFile string) (*Model, error) {
xlog.Debug("Loading Model with gRPC", "modelID", modelID, "file", modelFile, "backend", backend, "options", *o)
var client *Model
getFreeAddress := func() (string, error) {
port, err := freeport.GetFreePort()
if err != nil {
return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
}
return fmt.Sprintf("127.0.0.1:%d", port), nil
}
// If no specific model path is set for transformers/HF, set it to the model path
for _, env := range []string{"HF_HOME", "TRANSFORMERS_CACHE", "HUGGINGFACE_HUB_CACHE"} {
if os.Getenv(env) == "" {
err := os.Setenv(env, ml.ModelPath)
if err != nil {
xlog.Error("unable to set environment variable to modelPath", "error", err, "name", env, "modelPath", ml.ModelPath)
}
}
}
// Check if the backend is provided as external
if uri, ok := ml.GetAllExternalBackends(o)[backend]; ok {
xlog.Debug("Loading external backend", "uri", uri)
// check if uri is a file or a address
if fi, err := os.Stat(uri); err == nil {
xlog.Debug("external backend is file", "file", fi)
serverAddress, err := getFreeAddress()
if err != nil {
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
}
// Make sure the process is executable
process, err := ml.startProcess(uri, modelID, serverAddress)
if err != nil {
xlog.Error("failed to launch", "error", err, "path", uri)
return nil, err
}
xlog.Debug("GRPC Service Started")
client = NewModel(modelID, serverAddress, process)
} else {
xlog.Debug("external backend is a uri")
// address
client = NewModel(modelID, uri, nil)
}
} else {
xlog.Error("Backend not found", "backend", backend)
return nil, fmt.Errorf("backend not found: %s", backend)
}
xlog.Debug("Wait for the service to start up")
xlog.Debug("Options", "options", o.gRPCOptions)
// Wait for the service to start up
ready := false
for i := 0; i < o.grpcAttempts; i++ {
alive, err := client.GRPC(o.parallelRequests, ml.wd).HealthCheck(context.Background())
if alive {
xlog.Debug("GRPC Service Ready")
ready = true
break
}
if err != nil && i == o.grpcAttempts-1 {
xlog.Error("failed starting/connecting to the gRPC service", "error", err)
}
time.Sleep(time.Duration(o.grpcAttemptsDelay) * time.Second)
}
if !ready {
xlog.Debug("GRPC Service NOT ready")
if process := client.Process(); process != nil {
process.Stop()
}
return nil, fmt.Errorf("grpc service not ready")
}
options := *o.gRPCOptions
options.Model = modelName
options.ModelFile = modelFile
options.ModelPath = ml.ModelPath
xlog.Debug("GRPC: Loading model with options", "options", options)
res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options)
if err != nil {
if process := client.Process(); process != nil {
process.Stop()
}
return nil, fmt.Errorf("could not load model: %w", err)
}
if !res.Success {
if process := client.Process(); process != nil {
process.Stop()
}
return nil, fmt.Errorf("could not load model (no success): %s", res.Message)
}
return client, nil
}
}
func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err error) {
o := NewOptions(opts...)
xlog.Info("BackendLoader starting", "modelID", o.modelID, "backend", o.backendString, "model", o.model)
backend := strings.ToLower(o.backendString)
if realBackend, exists := Aliases[backend]; exists {
typeAlias, exists := TypeAlias[backend]
if exists {
xlog.Debug("alias is a type alias", "alias", backend, "realBackend", realBackend, "type", typeAlias)
o.gRPCOptions.Type = typeAlias
} else {
xlog.Debug("alias", "alias", backend, "realBackend", realBackend)
}
backend = realBackend
}
model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o))
if err != nil {
if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil {
xlog.Error("error stopping model", "error", stopErr, "model", o.modelID)
}
xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString)
return nil, err
}
return model.GRPC(o.parallelRequests, ml.wd), nil
}
// enforceLRULimit enforces the LRU limit before loading a new model.
// This is called before loading a model to ensure we don't exceed the limit.
// It accounts for models that are currently being loaded by other goroutines.
// If models are busy and can't be evicted, it will wait and retry until space is available.
func (ml *ModelLoader) enforceLRULimit() {
if ml.wd == nil {
return
}
// Get the count of models currently being loaded to account for concurrent requests
pendingLoads := ml.GetLoadingCount()
// Get retry settings from ModelLoader
ml.mu.Lock()
maxRetries := ml.lruEvictionMaxRetries
retryInterval := ml.lruEvictionRetryInterval
ml.mu.Unlock()
for attempt := 0; attempt < maxRetries; attempt++ {
result := ml.wd.EnforceLRULimit(pendingLoads)
if !result.NeedMore {
// Successfully evicted enough models (or no eviction needed)
if result.EvictedCount > 0 {
xlog.Info("[ModelLoader] LRU enforcement complete", "evicted", result.EvictedCount)
}
return
}
// Need more evictions but models are busy - wait and retry
if attempt < maxRetries-1 {
xlog.Info("[ModelLoader] Waiting for busy models to become idle before eviction",
"evicted", result.EvictedCount,
"attempt", attempt+1,
"maxRetries", maxRetries,
"retryIn", retryInterval)
time.Sleep(retryInterval)
} else {
// Last attempt - log warning but proceed (might fail to load, but at least we tried)
xlog.Warn("[ModelLoader] LRU enforcement incomplete after max retries",
"evicted", result.EvictedCount,
"reason", "models are still busy with active API calls")
}
}
}
// updateModelLastUsed updates the last used time for a model (for LRU tracking)
func (ml *ModelLoader) updateModelLastUsed(m *Model) {
if ml.wd == nil || m == nil {
return
}
ml.wd.UpdateLastUsed(m.address)
}
func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
o := NewOptions(opts...)
// Return earlier if we have a model already loaded
// (avoid looping through all the backends)
if m := ml.CheckIsLoaded(o.modelID); m != nil {
xlog.Debug("Model already loaded", "model", o.modelID)
// Update last used time for LRU tracking
ml.updateModelLastUsed(m)
return m.GRPC(o.parallelRequests, ml.wd), nil
}
// Enforce LRU limit before loading a new model
ml.enforceLRULimit()
// if a backend is defined, return the loader directly
if o.backendString != "" {
client, err := ml.backendLoader(opts...)
if err != nil {
return nil, err
}
return client, nil
}
// Otherwise scan for backends in the asset directory
var err error
// get backends embedded in the binary
autoLoadBackends := []string{}
// append externalBackends supplied by the user via the CLI
for b := range ml.GetAllExternalBackends(o) {
autoLoadBackends = append(autoLoadBackends, b)
}
if len(autoLoadBackends) == 0 {
xlog.Error("No backends found")
return nil, fmt.Errorf("no backends found")
}
xlog.Debug("Loading from the following backends (in order)", "backends", autoLoadBackends)
xlog.Info("Trying to load the model", "modelID", o.modelID, "backends", autoLoadBackends)
for _, key := range autoLoadBackends {
xlog.Info("Attempting to load", "backend", key)
options := append(opts, []Option{
WithBackendString(key),
}...)
model, modelerr := ml.backendLoader(options...)
if modelerr == nil && model != nil {
xlog.Info("Loads OK", "backend", key)
return model, nil
} else if modelerr != nil {
err = errors.Join(err, fmt.Errorf("[%s]: %w", key, modelerr))
xlog.Info("Fails", "backend", key, "error", modelerr.Error())
} else if model == nil {
err = errors.Join(err, fmt.Errorf("backend %s returned no usable model", key))
xlog.Info("Fails", "backend", key, "error", "backend returned no usable model")
}
}
return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error())
}
|