|
|
package model |
|
|
|
|
|
import ( |
|
|
"context" |
|
|
"errors" |
|
|
"fmt" |
|
|
"os" |
|
|
"strings" |
|
|
"time" |
|
|
|
|
|
grpc "github.com/mudler/LocalAI/pkg/grpc" |
|
|
"github.com/mudler/xlog" |
|
|
"github.com/phayes/freeport" |
|
|
) |
|
|
|
|
|
const ( |
|
|
LLamaCPP = "llama-cpp" |
|
|
) |
|
|
|
|
|
var Aliases map[string]string = map[string]string{ |
|
|
"go-llama": LLamaCPP, |
|
|
"llama": LLamaCPP, |
|
|
"embedded-store": LocalStoreBackend, |
|
|
"huggingface-embeddings": TransformersBackend, |
|
|
"langchain-huggingface": LCHuggingFaceBackend, |
|
|
"transformers-musicgen": TransformersBackend, |
|
|
"sentencetransformers": TransformersBackend, |
|
|
"mamba": TransformersBackend, |
|
|
"stablediffusion": StableDiffusionGGMLBackend, |
|
|
} |
|
|
|
|
|
var TypeAlias map[string]string = map[string]string{ |
|
|
"sentencetransformers": "SentenceTransformer", |
|
|
"huggingface-embeddings": "SentenceTransformer", |
|
|
"mamba": "Mamba", |
|
|
"transformers-musicgen": "MusicgenForConditionalGeneration", |
|
|
} |
|
|
|
|
|
const ( |
|
|
WhisperBackend = "whisper" |
|
|
StableDiffusionGGMLBackend = "stablediffusion-ggml" |
|
|
LCHuggingFaceBackend = "huggingface" |
|
|
|
|
|
TransformersBackend = "transformers" |
|
|
LocalStoreBackend = "local-store" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string, string) (*Model, error) { |
|
|
return func(modelID, modelName, modelFile string) (*Model, error) { |
|
|
|
|
|
xlog.Debug("Loading Model with gRPC", "modelID", modelID, "file", modelFile, "backend", backend, "options", *o) |
|
|
|
|
|
var client *Model |
|
|
|
|
|
getFreeAddress := func() (string, error) { |
|
|
port, err := freeport.GetFreePort() |
|
|
if err != nil { |
|
|
return "", fmt.Errorf("failed allocating free ports: %s", err.Error()) |
|
|
} |
|
|
return fmt.Sprintf("127.0.0.1:%d", port), nil |
|
|
} |
|
|
|
|
|
|
|
|
for _, env := range []string{"HF_HOME", "TRANSFORMERS_CACHE", "HUGGINGFACE_HUB_CACHE"} { |
|
|
if os.Getenv(env) == "" { |
|
|
err := os.Setenv(env, ml.ModelPath) |
|
|
if err != nil { |
|
|
xlog.Error("unable to set environment variable to modelPath", "error", err, "name", env, "modelPath", ml.ModelPath) |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if uri, ok := ml.GetAllExternalBackends(o)[backend]; ok { |
|
|
xlog.Debug("Loading external backend", "uri", uri) |
|
|
|
|
|
if fi, err := os.Stat(uri); err == nil { |
|
|
xlog.Debug("external backend is file", "file", fi) |
|
|
serverAddress, err := getFreeAddress() |
|
|
if err != nil { |
|
|
return nil, fmt.Errorf("failed allocating free ports: %s", err.Error()) |
|
|
} |
|
|
|
|
|
process, err := ml.startProcess(uri, modelID, serverAddress) |
|
|
if err != nil { |
|
|
xlog.Error("failed to launch", "error", err, "path", uri) |
|
|
return nil, err |
|
|
} |
|
|
|
|
|
xlog.Debug("GRPC Service Started") |
|
|
|
|
|
client = NewModel(modelID, serverAddress, process) |
|
|
} else { |
|
|
xlog.Debug("external backend is a uri") |
|
|
|
|
|
client = NewModel(modelID, uri, nil) |
|
|
} |
|
|
} else { |
|
|
xlog.Error("Backend not found", "backend", backend) |
|
|
return nil, fmt.Errorf("backend not found: %s", backend) |
|
|
} |
|
|
|
|
|
xlog.Debug("Wait for the service to start up") |
|
|
xlog.Debug("Options", "options", o.gRPCOptions) |
|
|
|
|
|
|
|
|
ready := false |
|
|
for i := 0; i < o.grpcAttempts; i++ { |
|
|
alive, err := client.GRPC(o.parallelRequests, ml.wd).HealthCheck(context.Background()) |
|
|
if alive { |
|
|
xlog.Debug("GRPC Service Ready") |
|
|
ready = true |
|
|
break |
|
|
} |
|
|
if err != nil && i == o.grpcAttempts-1 { |
|
|
xlog.Error("failed starting/connecting to the gRPC service", "error", err) |
|
|
} |
|
|
time.Sleep(time.Duration(o.grpcAttemptsDelay) * time.Second) |
|
|
} |
|
|
|
|
|
if !ready { |
|
|
xlog.Debug("GRPC Service NOT ready") |
|
|
if process := client.Process(); process != nil { |
|
|
process.Stop() |
|
|
} |
|
|
return nil, fmt.Errorf("grpc service not ready") |
|
|
} |
|
|
|
|
|
options := *o.gRPCOptions |
|
|
options.Model = modelName |
|
|
options.ModelFile = modelFile |
|
|
options.ModelPath = ml.ModelPath |
|
|
|
|
|
xlog.Debug("GRPC: Loading model with options", "options", options) |
|
|
|
|
|
res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options) |
|
|
if err != nil { |
|
|
if process := client.Process(); process != nil { |
|
|
process.Stop() |
|
|
} |
|
|
return nil, fmt.Errorf("could not load model: %w", err) |
|
|
} |
|
|
if !res.Success { |
|
|
if process := client.Process(); process != nil { |
|
|
process.Stop() |
|
|
} |
|
|
return nil, fmt.Errorf("could not load model (no success): %s", res.Message) |
|
|
} |
|
|
|
|
|
return client, nil |
|
|
} |
|
|
} |
|
|
|
|
|
func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err error) { |
|
|
o := NewOptions(opts...) |
|
|
|
|
|
xlog.Info("BackendLoader starting", "modelID", o.modelID, "backend", o.backendString, "model", o.model) |
|
|
|
|
|
backend := strings.ToLower(o.backendString) |
|
|
if realBackend, exists := Aliases[backend]; exists { |
|
|
typeAlias, exists := TypeAlias[backend] |
|
|
if exists { |
|
|
xlog.Debug("alias is a type alias", "alias", backend, "realBackend", realBackend, "type", typeAlias) |
|
|
o.gRPCOptions.Type = typeAlias |
|
|
} else { |
|
|
xlog.Debug("alias", "alias", backend, "realBackend", realBackend) |
|
|
} |
|
|
|
|
|
backend = realBackend |
|
|
} |
|
|
|
|
|
model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o)) |
|
|
if err != nil { |
|
|
if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil { |
|
|
xlog.Error("error stopping model", "error", stopErr, "model", o.modelID) |
|
|
} |
|
|
xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString) |
|
|
return nil, err |
|
|
} |
|
|
|
|
|
return model.GRPC(o.parallelRequests, ml.wd), nil |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (ml *ModelLoader) enforceLRULimit() { |
|
|
if ml.wd == nil { |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
pendingLoads := ml.GetLoadingCount() |
|
|
|
|
|
|
|
|
ml.mu.Lock() |
|
|
maxRetries := ml.lruEvictionMaxRetries |
|
|
retryInterval := ml.lruEvictionRetryInterval |
|
|
ml.mu.Unlock() |
|
|
|
|
|
for attempt := 0; attempt < maxRetries; attempt++ { |
|
|
result := ml.wd.EnforceLRULimit(pendingLoads) |
|
|
|
|
|
if !result.NeedMore { |
|
|
|
|
|
if result.EvictedCount > 0 { |
|
|
xlog.Info("[ModelLoader] LRU enforcement complete", "evicted", result.EvictedCount) |
|
|
} |
|
|
return |
|
|
} |
|
|
|
|
|
|
|
|
if attempt < maxRetries-1 { |
|
|
xlog.Info("[ModelLoader] Waiting for busy models to become idle before eviction", |
|
|
"evicted", result.EvictedCount, |
|
|
"attempt", attempt+1, |
|
|
"maxRetries", maxRetries, |
|
|
"retryIn", retryInterval) |
|
|
time.Sleep(retryInterval) |
|
|
} else { |
|
|
|
|
|
xlog.Warn("[ModelLoader] LRU enforcement incomplete after max retries", |
|
|
"evicted", result.EvictedCount, |
|
|
"reason", "models are still busy with active API calls") |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (ml *ModelLoader) updateModelLastUsed(m *Model) { |
|
|
if ml.wd == nil || m == nil { |
|
|
return |
|
|
} |
|
|
ml.wd.UpdateLastUsed(m.address) |
|
|
} |
|
|
|
|
|
func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) { |
|
|
o := NewOptions(opts...) |
|
|
|
|
|
|
|
|
|
|
|
if m := ml.CheckIsLoaded(o.modelID); m != nil { |
|
|
xlog.Debug("Model already loaded", "model", o.modelID) |
|
|
|
|
|
ml.updateModelLastUsed(m) |
|
|
return m.GRPC(o.parallelRequests, ml.wd), nil |
|
|
} |
|
|
|
|
|
|
|
|
ml.enforceLRULimit() |
|
|
|
|
|
|
|
|
if o.backendString != "" { |
|
|
client, err := ml.backendLoader(opts...) |
|
|
if err != nil { |
|
|
return nil, err |
|
|
} |
|
|
return client, nil |
|
|
} |
|
|
|
|
|
|
|
|
var err error |
|
|
|
|
|
|
|
|
autoLoadBackends := []string{} |
|
|
|
|
|
|
|
|
for b := range ml.GetAllExternalBackends(o) { |
|
|
autoLoadBackends = append(autoLoadBackends, b) |
|
|
} |
|
|
|
|
|
if len(autoLoadBackends) == 0 { |
|
|
xlog.Error("No backends found") |
|
|
return nil, fmt.Errorf("no backends found") |
|
|
} |
|
|
|
|
|
xlog.Debug("Loading from the following backends (in order)", "backends", autoLoadBackends) |
|
|
|
|
|
xlog.Info("Trying to load the model", "modelID", o.modelID, "backends", autoLoadBackends) |
|
|
|
|
|
for _, key := range autoLoadBackends { |
|
|
xlog.Info("Attempting to load", "backend", key) |
|
|
options := append(opts, []Option{ |
|
|
WithBackendString(key), |
|
|
}...) |
|
|
|
|
|
model, modelerr := ml.backendLoader(options...) |
|
|
if modelerr == nil && model != nil { |
|
|
xlog.Info("Loads OK", "backend", key) |
|
|
return model, nil |
|
|
} else if modelerr != nil { |
|
|
err = errors.Join(err, fmt.Errorf("[%s]: %w", key, modelerr)) |
|
|
xlog.Info("Fails", "backend", key, "error", modelerr.Error()) |
|
|
} else if model == nil { |
|
|
err = errors.Join(err, fmt.Errorf("backend %s returned no usable model", key)) |
|
|
xlog.Info("Fails", "backend", key, "error", "backend returned no usable model") |
|
|
} |
|
|
} |
|
|
|
|
|
return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error()) |
|
|
} |
|
|
|