File size: 9,561 Bytes
0f07ba7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package model

import (
	"context"
	"errors"
	"fmt"
	"os"
	"strings"
	"time"

	grpc "github.com/mudler/LocalAI/pkg/grpc"
	"github.com/mudler/xlog"
	"github.com/phayes/freeport"
)

const (
	LLamaCPP = "llama-cpp"
)

var Aliases map[string]string = map[string]string{
	"go-llama":               LLamaCPP,
	"llama":                  LLamaCPP,
	"embedded-store":         LocalStoreBackend,
	"huggingface-embeddings": TransformersBackend,
	"langchain-huggingface":  LCHuggingFaceBackend,
	"transformers-musicgen":  TransformersBackend,
	"sentencetransformers":   TransformersBackend,
	"mamba":                  TransformersBackend,
	"stablediffusion":        StableDiffusionGGMLBackend,
}

var TypeAlias map[string]string = map[string]string{
	"sentencetransformers":   "SentenceTransformer",
	"huggingface-embeddings": "SentenceTransformer",
	"mamba":                  "Mamba",
	"transformers-musicgen":  "MusicgenForConditionalGeneration",
}

const (
	WhisperBackend             = "whisper"
	StableDiffusionGGMLBackend = "stablediffusion-ggml"
	LCHuggingFaceBackend       = "huggingface"

	TransformersBackend = "transformers"
	LocalStoreBackend   = "local-store"
)

// starts the grpcModelProcess for the backend, and returns a grpc client
// It also loads the model
func (ml *ModelLoader) grpcModel(backend string, o *Options) func(string, string, string) (*Model, error) {
	return func(modelID, modelName, modelFile string) (*Model, error) {

		xlog.Debug("Loading Model with gRPC", "modelID", modelID, "file", modelFile, "backend", backend, "options", *o)

		var client *Model

		getFreeAddress := func() (string, error) {
			port, err := freeport.GetFreePort()
			if err != nil {
				return "", fmt.Errorf("failed allocating free ports: %s", err.Error())
			}
			return fmt.Sprintf("127.0.0.1:%d", port), nil
		}

		// If no specific model path is set for transformers/HF, set it to the model path
		for _, env := range []string{"HF_HOME", "TRANSFORMERS_CACHE", "HUGGINGFACE_HUB_CACHE"} {
			if os.Getenv(env) == "" {
				err := os.Setenv(env, ml.ModelPath)
				if err != nil {
					xlog.Error("unable to set environment variable to modelPath", "error", err, "name", env, "modelPath", ml.ModelPath)
				}
			}
		}

		// Check if the backend is provided as external
		if uri, ok := ml.GetAllExternalBackends(o)[backend]; ok {
			xlog.Debug("Loading external backend", "uri", uri)
			// check if uri is a file or a address
			if fi, err := os.Stat(uri); err == nil {
				xlog.Debug("external backend is file", "file", fi)
				serverAddress, err := getFreeAddress()
				if err != nil {
					return nil, fmt.Errorf("failed allocating free ports: %s", err.Error())
				}
				// Make sure the process is executable
				process, err := ml.startProcess(uri, modelID, serverAddress)
				if err != nil {
					xlog.Error("failed to launch", "error", err, "path", uri)
					return nil, err
				}

				xlog.Debug("GRPC Service Started")

				client = NewModel(modelID, serverAddress, process)
			} else {
				xlog.Debug("external backend is a uri")
				// address
				client = NewModel(modelID, uri, nil)
			}
		} else {
			xlog.Error("Backend not found", "backend", backend)
			return nil, fmt.Errorf("backend not found: %s", backend)
		}

		xlog.Debug("Wait for the service to start up")
		xlog.Debug("Options", "options", o.gRPCOptions)

		// Wait for the service to start up
		ready := false
		for i := 0; i < o.grpcAttempts; i++ {
			alive, err := client.GRPC(o.parallelRequests, ml.wd).HealthCheck(context.Background())
			if alive {
				xlog.Debug("GRPC Service Ready")
				ready = true
				break
			}
			if err != nil && i == o.grpcAttempts-1 {
				xlog.Error("failed starting/connecting to the gRPC service", "error", err)
			}
			time.Sleep(time.Duration(o.grpcAttemptsDelay) * time.Second)
		}

		if !ready {
			xlog.Debug("GRPC Service NOT ready")
			if process := client.Process(); process != nil {
				process.Stop()
			}
			return nil, fmt.Errorf("grpc service not ready")
		}

		options := *o.gRPCOptions
		options.Model = modelName
		options.ModelFile = modelFile
		options.ModelPath = ml.ModelPath

		xlog.Debug("GRPC: Loading model with options", "options", options)

		res, err := client.GRPC(o.parallelRequests, ml.wd).LoadModel(o.context, &options)
		if err != nil {
			if process := client.Process(); process != nil {
				process.Stop()
			}
			return nil, fmt.Errorf("could not load model: %w", err)
		}
		if !res.Success {
			if process := client.Process(); process != nil {
				process.Stop()
			}
			return nil, fmt.Errorf("could not load model (no success): %s", res.Message)
		}

		return client, nil
	}
}

func (ml *ModelLoader) backendLoader(opts ...Option) (client grpc.Backend, err error) {
	o := NewOptions(opts...)

	xlog.Info("BackendLoader starting", "modelID", o.modelID, "backend", o.backendString, "model", o.model)

	backend := strings.ToLower(o.backendString)
	if realBackend, exists := Aliases[backend]; exists {
		typeAlias, exists := TypeAlias[backend]
		if exists {
			xlog.Debug("alias is a type alias", "alias", backend, "realBackend", realBackend, "type", typeAlias)
			o.gRPCOptions.Type = typeAlias
		} else {
			xlog.Debug("alias", "alias", backend, "realBackend", realBackend)
		}

		backend = realBackend
	}

	model, err := ml.LoadModel(o.modelID, o.model, ml.grpcModel(backend, o))
	if err != nil {
		if stopErr := ml.StopGRPC(only(o.modelID)); stopErr != nil {
			xlog.Error("error stopping model", "error", stopErr, "model", o.modelID)
		}
		xlog.Error("Failed to load model", "modelID", o.modelID, "error", err, "backend", o.backendString)
		return nil, err
	}

	return model.GRPC(o.parallelRequests, ml.wd), nil
}

// enforceLRULimit enforces the LRU limit before loading a new model.
// This is called before loading a model to ensure we don't exceed the limit.
// It accounts for models that are currently being loaded by other goroutines.
// If models are busy and can't be evicted, it will wait and retry until space is available.
func (ml *ModelLoader) enforceLRULimit() {
	if ml.wd == nil {
		return
	}

	// Get the count of models currently being loaded to account for concurrent requests
	pendingLoads := ml.GetLoadingCount()

	// Get retry settings from ModelLoader
	ml.mu.Lock()
	maxRetries := ml.lruEvictionMaxRetries
	retryInterval := ml.lruEvictionRetryInterval
	ml.mu.Unlock()

	for attempt := 0; attempt < maxRetries; attempt++ {
		result := ml.wd.EnforceLRULimit(pendingLoads)

		if !result.NeedMore {
			// Successfully evicted enough models (or no eviction needed)
			if result.EvictedCount > 0 {
				xlog.Info("[ModelLoader] LRU enforcement complete", "evicted", result.EvictedCount)
			}
			return
		}

		// Need more evictions but models are busy - wait and retry
		if attempt < maxRetries-1 {
			xlog.Info("[ModelLoader] Waiting for busy models to become idle before eviction",
				"evicted", result.EvictedCount,
				"attempt", attempt+1,
				"maxRetries", maxRetries,
				"retryIn", retryInterval)
			time.Sleep(retryInterval)
		} else {
			// Last attempt - log warning but proceed (might fail to load, but at least we tried)
			xlog.Warn("[ModelLoader] LRU enforcement incomplete after max retries",
				"evicted", result.EvictedCount,
				"reason", "models are still busy with active API calls")
		}
	}
}

// updateModelLastUsed updates the last used time for a model (for LRU tracking)
func (ml *ModelLoader) updateModelLastUsed(m *Model) {
	if ml.wd == nil || m == nil {
		return
	}
	ml.wd.UpdateLastUsed(m.address)
}

func (ml *ModelLoader) Load(opts ...Option) (grpc.Backend, error) {
	o := NewOptions(opts...)

	// Return earlier if we have a model already loaded
	// (avoid looping through all the backends)
	if m := ml.CheckIsLoaded(o.modelID); m != nil {
		xlog.Debug("Model already loaded", "model", o.modelID)
		// Update last used time for LRU tracking
		ml.updateModelLastUsed(m)
		return m.GRPC(o.parallelRequests, ml.wd), nil
	}

	// Enforce LRU limit before loading a new model
	ml.enforceLRULimit()

	// if a backend is defined, return the loader directly
	if o.backendString != "" {
		client, err := ml.backendLoader(opts...)
		if err != nil {
			return nil, err
		}
		return client, nil
	}

	// Otherwise scan for backends in the asset directory
	var err error

	// get backends embedded in the binary
	autoLoadBackends := []string{}

	// append externalBackends supplied by the user via the CLI
	for b := range ml.GetAllExternalBackends(o) {
		autoLoadBackends = append(autoLoadBackends, b)
	}

	if len(autoLoadBackends) == 0 {
		xlog.Error("No backends found")
		return nil, fmt.Errorf("no backends found")
	}

	xlog.Debug("Loading from the following backends (in order)", "backends", autoLoadBackends)

	xlog.Info("Trying to load the model", "modelID", o.modelID, "backends", autoLoadBackends)

	for _, key := range autoLoadBackends {
		xlog.Info("Attempting to load", "backend", key)
		options := append(opts, []Option{
			WithBackendString(key),
		}...)

		model, modelerr := ml.backendLoader(options...)
		if modelerr == nil && model != nil {
			xlog.Info("Loads OK", "backend", key)
			return model, nil
		} else if modelerr != nil {
			err = errors.Join(err, fmt.Errorf("[%s]: %w", key, modelerr))
			xlog.Info("Fails", "backend", key, "error", modelerr.Error())
		} else if model == nil {
			err = errors.Join(err, fmt.Errorf("backend %s returned no usable model", key))
			xlog.Info("Fails", "backend", key, "error", "backend returned no usable model")
		}
	}

	return nil, fmt.Errorf("could not load model - all backends returned error: %s", err.Error())
}