Spaces:
Running
Running
Amlan-109
feat: Initial commit of LocalAI Amlan Edition with premium branding and personalization
750bbe6
| package model | |
| import ( | |
| "context" | |
| "errors" | |
| "fmt" | |
| "os" | |
| "strings" | |
| "time" | |
| grpc "github.com/mudler/LocalAI/pkg/grpc" | |
| "github.com/mudler/xlog" | |
| "github.com/phayes/freeport" | |
| ) | |
| const ( | |
| LLamaCPP = "llama-cpp" | |
| ) | |
| var Aliases = map[string]string{ | |
| "go-llama": LLamaCPP, | |
| "llama": LLamaCPP, | |
| "embedded-store": LocalStoreBackend, | |
| "huggingface-embeddings": TransformersBackend, | |
| "langchain-huggingface": LCHuggingFaceBackend, | |
| "transformers-musicgen": TransformersBackend, | |
| "sentencetransformers": TransformersBackend, | |
| "mamba": TransformersBackend, | |
| "stablediffusion": StableDiffusionGGMLBackend, | |
| } | |
| var TypeAlias = map[string]string{ | |
| "sentencetransformers": "SentenceTransformer", | |
| "huggingface-embeddings": "SentenceTransformer", | |
| "mamba": "Mamba", | |
| "transformers-musicgen": "MusicgenForConditionalGeneration", | |
| } | |
| const ( | |
| WhisperBackend = "whisper" | |
| StableDiffusionGGMLBackend = "stablediffusion-ggml" | |
| LCHuggingFaceBackend = "huggingface" | |
| TransformersBackend = "transformers" | |
| LocalStoreBackend = "local-store" | |
| ) | |
| // starts the grpcModelProcess for the backend, and returns a grpc client | |
| // It also loads the model | |
| func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string, string) (*Model, error) { | |
| return func(modelID, modelName, modelFile string) (*Model, error) { | |
| xlog.Debug("Loading Model with gRPC", "modelID", modelID, "file", modelFile, "backend", backend, "options", *o) | |
| var client *Model | |
| getFreeAddress := func() (string, error) { | |
| port, err := freeport.GetFreePort() | |
| if err != nil { | |
| return "", fmt.Errorf("failed allocating free ports: %s", err.Error()) | |
| } | |
| return fmt.Sprintf("127.0.0.1:%d", port), nil | |
| } | |
| // If no specific model path is set for transformers/HF, set it to the model path | |
| for _, env := range []string{"HF_HOME", "TRANSFORMERS_CACHE", "HUGGINGFACE_HUB_CACHE"} { | |
| if os.Getenv(env) == "" { | |
| err := os.Setenv(env, ml.ModelPath) | |
| if err != nil { | |
| xlog.Error("unable to set environment variable to modelPath", "error", err, "name", env, "modelPath", ml.ModelPath) | |
| } | |
| } | |
| } | |
| // Check if the backend is provided as external | |
| if uri, ok := ml.GetAllExternalBackends(o)[backend]; ok { | |
| xlog.Debug("Loading external backend", "uri", uri) | |
| // check if uri is a file or an address | |
| if fi, err := os.Stat(uri); err == nil { | |
| xlog.Debug("external backend is file", "file", fi) | |
| serverAddress, err := getFreeAddress() | |
| if err != nil { | |
| return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) | |
| } | |
| // Make sure the process is executable | |
| process, err := ml.startProcess(uri, modelID, serverAddress) | |
| if err != nil { | |
| xlog.Error("failed to launch", "error", err, "path", uri) | |
| return nil, err | |
| } | |
| xlog.Debug("GRPC Service Started") | |
| client = NewModel(modelID, serverAddress, process) | |
| } else { | |
| xlog.Debug("external backend is a uri") | |
| // address | |
| client = NewModel(modelID, uri, nil) | |
| } | |
| } else { | |
| xlog.Error("Backend not found", "backend", backend) | |
| return nil, fmt.Errorf("backend not found: %s", backend) | |
| } | |
| xlog.Debug("Wait for the service to start up") | |
| xlog.Debug("Options", "options", o.gRPCOptions) | |
| // Wait for the service to start up | |
| ready := false | |
| for i := 0; i < o.grpcAttempts; i++ { | |
| alive, err := client.GRPC(o.parallelRequests, ml.wd).HealthCheck(context.Background()) | |
| if alive { | |
| xlog.Debug("GRPC Service Ready") | |
| ready = true | |
| break | |
| } | |
| if err != nil && i == o.grpcAttempts-1 { | |
| xlog.Error("failed starting/connecting to the gRPC service", "error", err) | |
| } | |
| time.Sleep(time.Duration(o.grpcAttemptsDelay) * time.Second) | |
| } | |
| if !ready { | |
| xlog.Debug("GRPC Service NOT ready") | |
| if process := client.Process(); process != nil { | |
| process.Stop() | |
| } | |
| return nil, fmt.Errorf("grpc service not ready") | |
| } | |
| options := *o.gRPCOptions | |
| options.Model = modelName | |
| options.ModelFile = modelFile | |
| options.ModelPath = ml.ModelPath | |
| xlog.Debug("GRPC: Loading model with options", "options", options) | |
| res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options) | |
| if err != nil { | |
| if process := client.Process(); process != nil { | |
| process.Stop() | |
| } | |
| return nil, fmt.Errorf("could not load model: %w", err) | |
| } | |
| if !res.Success { | |
| if process := client.Process(); process != nil { | |
| process.Stop() | |
| } | |
| return nil, fmt.Errorf("could not load model (no success): %s", res.Message) | |
| } | |
| return client, nil | |
| } | |
| } | |
| func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err error) { | |
| o := NewOptions(opts...) | |
| xlog.Info("BackendLoader starting", "modelID", o.modelID, "backend", o.backendString, "model", o.model) | |
| backend := strings.ToLower(o.backendString) | |
| if realBackend, exists := Aliases[backend]; exists { | |
| typeAlias, exists := TypeAlias[backend] | |
| if exists { | |
| xlog.Debug("alias is a type alias", "alias", backend, "realBackend", realBackend, "type", typeAlias) | |
| o.gRPCOptions.Type = typeAlias | |
| } else { | |
| xlog.Debug("alias", "alias", backend, "realBackend", realBackend) | |
| } | |
| backend = realBackend | |
| } | |
| model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o)) | |
| if err != nil { | |
| if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil { | |
| xlog.Error("error stopping model", "error", stopErr, "model", o.modelID) | |
| } | |
| xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString) | |
| return nil, err | |
| } | |
| return model.GRPC(o.parallelRequests, ml.wd), nil | |
| } | |
| // enforceLRULimit enforces the LRU limit before loading a new model. | |
| // This is called before loading a model to ensure we don't exceed the limit. | |
| // It accounts for models that are currently being loaded by other goroutines. | |
| // If models are busy and can't be evicted, it will wait and retry until space is available. | |
| func (ml *ModelLoader) enforceLRULimit() { | |
| if ml.wd == nil { | |
| return | |
| } | |
| // Get the count of models currently being loaded to account for concurrent requests | |
| pendingLoads := ml.GetLoadingCount() | |
| // Get retry settings from ModelLoader | |
| ml.mu.Lock() | |
| maxRetries := ml.lruEvictionMaxRetries | |
| retryInterval := ml.lruEvictionRetryInterval | |
| ml.mu.Unlock() | |
| for attempt := 0; attempt < maxRetries; attempt++ { | |
| result := ml.wd.EnforceLRULimit(pendingLoads) | |
| if !result.NeedMore { | |
| // Successfully evicted enough models (or no eviction needed) | |
| if result.EvictedCount > 0 { | |
| xlog.Info("[ModelLoader] LRU enforcement complete", "evicted", result.EvictedCount) | |
| } | |
| return | |
| } | |
| // Need more evictions but models are busy - wait and retry | |
| if attempt < maxRetries-1 { | |
| xlog.Info("[ModelLoader] Waiting for busy models to become idle before eviction", | |
| "evicted", result.EvictedCount, | |
| "attempt", attempt+1, | |
| "maxRetries", maxRetries, | |
| "retryIn", retryInterval) | |
| time.Sleep(retryInterval) | |
| } else { | |
| // Last attempt - log warning but proceed (might fail to load, but at least we tried) | |
| xlog.Warn("[ModelLoader] LRU enforcement incomplete after max retries", | |
| "evicted", result.EvictedCount, | |
| "reason", "models are still busy with active API calls") | |
| } | |
| } | |
| } | |
| // updateModelLastUsed updates the last used time for a model (for LRU tracking) | |
| func (ml *ModelLoader) updateModelLastUsed(m *Model) { | |
| if ml.wd == nil || m == nil { | |
| return | |
| } | |
| ml.wd.UpdateLastUsed(m.address) | |
| } | |
| func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) { | |
| o := NewOptions(opts...) | |
| // Return earlier if we have a model already loaded | |
| // (avoid looping through all the backends) | |
| if m := ml.CheckIsLoaded(o.modelID); m != nil { | |
| xlog.Debug("Model already loaded", "model", o.modelID) | |
| // Update last used time for LRU tracking | |
| ml.updateModelLastUsed(m) | |
| return m.GRPC(o.parallelRequests, ml.wd), nil | |
| } | |
| // Enforce LRU limit before loading a new model | |
| ml.enforceLRULimit() | |
| // if a backend is defined, return the loader directly | |
| if o.backendString != "" { | |
| client, err := ml.backendLoader(opts...) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return client, nil | |
| } | |
| // Otherwise scan for backends in the asset directory | |
| var err error | |
| // get backends embedded in the binary | |
| autoLoadBackends := []string{} | |
| // append externalBackends supplied by the user via the CLI | |
| for b := range ml.GetAllExternalBackends(o) { | |
| autoLoadBackends = append(autoLoadBackends, b) | |
| } | |
| if len(autoLoadBackends) == 0 { | |
| xlog.Error("No backends found") | |
| return nil, fmt.Errorf("no backends found") | |
| } | |
| xlog.Debug("Loading from the following backends (in order)", "backends", autoLoadBackends) | |
| xlog.Info("Trying to load the model", "modelID", o.modelID, "backends", autoLoadBackends) | |
| for _, key := range autoLoadBackends { | |
| xlog.Info("Attempting to load", "backend", key) | |
| options := append(opts, []Option{ | |
| WithBackendString(key), | |
| }...) | |
| model, modelerr := ml.backendLoader(options...) | |
| if modelerr == nil && model != nil { | |
| xlog.Info("Loads OK", "backend", key) | |
| return model, nil | |
| } else if modelerr != nil { | |
| err = errors.Join(err, fmt.Errorf("[%s]: %w", key, modelerr)) | |
| xlog.Info("Fails", "backend", key, "error", modelerr.Error()) | |
| } else if model == nil { | |
| err = errors.Join(err, fmt.Errorf("backend %s returned no usable model", key)) | |
| xlog.Info("Fails", "backend", key, "error", "backend returned no usable model") | |
| } | |
| } | |
| return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error()) | |
| } | |