carraraig's picture
finish (#8)
5dd4236 verified
"""
Model Manager Module
This module provides centralized management of AI models for the HiveGPT Agent
system. It handles loading, caching, and lifecycle management of both LLM and
reranking models with thread-safe operations.
The ModelManager class offers:
- Lazy loading and caching of language models
- Thread-safe model access with async locks
- Integration with ModelRouter for model discovery
- Memory-efficient model reuse across requests
Key Features:
- Singleton pattern for consistent model access
- Async/await support for non-blocking operations
- Automatic model caching to improve performance
- Error handling for model loading failures
Author: HiveNetCode
License: Private
"""
import asyncio
from typing import Dict, Any, Optional
from langchain_openai import ChatOpenAI
from ComputeAgent.models.model_router import ModelRouter, LLMModel
from constant import Constants
class ModelManager:
"""
Centralized manager for AI model loading, caching, and lifecycle management.
This class implements a thread-safe caching system for language models and
reranking models, providing efficient model reuse across the application.
It integrates with ModelRouter to discover available models and handles
the initialization and configuration of ChatOpenAI instances.
The ModelManager follows a singleton-like pattern where models are cached
at the class level to ensure memory efficiency and consistent model access
throughout the application lifecycle.
Attributes:
_llm_models: Cache of loaded language models
_reranker_models: Cache of loaded reranking models
_llm_lock: Async lock for thread-safe LLM loading
_reranker_lock: Async lock for thread-safe reranker loading
model_router: Interface to model discovery service
reranking_model_name: Name of the default reranking model
reranker: Cached reranking model instance
"""
def __init__(self):
"""
Initialize the ModelManager with empty caches and async locks.
Sets up the internal data structures for model caching and thread-safe
access. Initializes the ModelRouter for model discovery and sets the
default reranking model configuration.
"""
# Model caches for efficient reuse
self._llm_models: Dict[str, ChatOpenAI] = {}
# Thread safety locks for concurrent access
self._llm_lock = asyncio.Lock()
# Model discovery and configuration
self.model_router = ModelRouter()
async def load_llm_model(self, model_name: str) -> ChatOpenAI:
"""
Asynchronously loads and returns a language model for the specified model name.
This method checks if the model is already loaded and cached in the class-level
dictionary `_llm_models`. If not, it acquires a lock to ensure thread-safe
model loading, retrieves the model information from the Model Router, initializes
a `ChatOpenAI` instance with the given parameters, and caches it for future use.
Args:
model_name (str): The name of the language model to load.
Returns:
ChatOpenAI: An instance of the loaded language model.
"""
if model_name in self._llm_models:
return self._llm_models[model_name]
async with self._llm_lock:
if model_name not in self._llm_models:
loaded_model: LLMModel = self.model_router.get_llm_model(model_name)
llm = ChatOpenAI(
model_name=model_name,
api_key=Constants.MODEL_ROUTER_TOKEN,
base_url=loaded_model.openai_endpoint,
temperature=0.1,
)
self._llm_models[model_name] = llm
return self._llm_models[model_name]