import os from pathlib import Path from typing import Optional, Dict, Generator, List import json import logging # Try to import llama-cpp-python try: from llama_cpp import Llama LLAMA_AVAILABLE = True except ImportError: LLAMA_AVAILABLE = False Llama = None logging.warning("llama-cpp-python not installed. Install with: pip install llama-cpp-python") class ModelManager: """Manages loading and inference of GGUF models""" def __init__(self): self.model: Optional[Llama] = None self.model_path: Optional[str] = None self.context_size: int = 2048 self.gpu_layers: int = 0 def is_loaded(self) -> bool: """Check if a model is loaded""" return self.model is not None def load_model( self, model_path: str, context_size: int = 2048, gpu_layers: int = 0, n_ctx: Optional[int] = None, n_gpu_layers: Optional[int] = None, verbose: bool = True ) -> bool: """Load a GGUF model""" if not LLAMA_AVAILABLE: logging.error("llama-cpp-python is not installed") return False try: # Unload existing model if any if self.model: self.unload_model() # Set parameters self.context_size = n_ctx or context_size self.gpu_layers = n_gpu_layers or gpu_layers self.model_path = model_path # Load the model self.model = Llama( model_path=model_path, n_ctx=self.context_size, n_gpu_layers=self.gpu_layers, verbose=verbose, embedding=False, f16_kv=True, use_mmap=True, use_mlock=False, logits_all=False, vocab_only=False ) logging.info(f"Model loaded successfully: {model_path}") return True except Exception as e: logging.error(f"Failed to load model: {str(e)}") self.model = None self.model_path = None return False def unload_model(self): """Unload the current model""" if self.model: del self.model self.model = None self.model_path = None logging.info("Model unloaded") def generate( self, prompt: str, temperature: float = 0.7, max_tokens: int = 512, top_p: float = 0.9, repeat_penalty: float = 1.1, stop: Optional[List[str]] = None, stream: bool = True ) -> Generator[str, None, None]: """Generate text from the model""" if not self.model: raise ValueError("No model loaded") try: # Generate response if stream: for chunk in self.model( prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, repeat_penalty=repeat_penalty, stop=stop or [], stream=True ): if chunk["choices"]: yield chunk["choices"][0]["text"] else: output = self.model( prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, repeat_penalty=repeat_penalty, stop=stop or [], stream=False ) yield output["choices"][0]["text"] except Exception as e: logging.error(f"Generation error: {str(e)}") raise def get_model_info(self) -> Optional[Dict]: """Get information about the loaded model""" if not self.model: return None try: # Extract model metadata metadata = getattr(self.model, 'metadata', {}) # Try to get tokenizer info try: vocab_size = len(self.model._model.tokenizer().vocab()) except: vocab_size = None # Basic model info info = { "model_path": self.model_path, "context_size": self.context_size, "gpu_layers": self.gpu_layers, "vocab_size": vocab_size, } # Add metadata if available if metadata: # Extract common metadata fields common_fields = [ "general.architecture", "llama.vocab_size", "llama.context_length", "llama.embedding_length", "llama.block_count", "llama.feed_forward_length", "llama.attention.head_count", "llama.attention.head_count_kv", "llama.rope.dimension_count", "llama.attention.layer_norm_rms_epsilon", "tokenizer.ggml.model", "tokenizer.ggml.tokens", ] for field in common_fields: if field in metadata: info[field] = metadata[field] # Add all metadata as raw for debugging info["raw_metadata"] = {k: v for k, v in metadata.items() if not isinstance(v, (bytes, bytearray))} return info except Exception as e: logging.error(f"Error getting model info: {str(e)}") return {"error": str(e)} def tokenize(self, text: str) -> List[int]: """Tokenize text""" if not self.model: raise ValueError("No model loaded") try: return self.model.tokenize(text.encode("utf-8")) except Exception as e: logging.error(f"Tokenization error: {str(e)}") return [] def detokenize(self, tokens: List[int]) -> str: """Detokenize tokens""" if not self.model: raise ValueError("No model loaded") try: return self.model.detokenize(tokens).decode("utf-8") except Exception as e: logging.error(f"Detokenization error: {str(e)}") return "" def check_model_compatibility(model_path: str) -> Dict: """Check if a model file is compatible""" result = { "exists": False, "readable": False, "gguf": False, "size_mb": 0, "error": None } try: path = Path(model_path) result["exists"] = path.exists() if result["exists"]: result["size_mb"] = path.stat().st_size / (1024 * 1024) result["gguf"] = path.suffix.lower() == ".gguf" # Try to read file header try: with open(path, "rb") as f: header = f.read(4) result["readable"] = len(header) == 4 except: result["readable"] = False except Exception as e: result["error"] = str(e) return result