Spaces:
Sleeping
Sleeping
| """ | |
| GGUF Model implementation using llama-cpp-python. | |
| Highly optimized for CPU inference. | |
| """ | |
| import os | |
| import asyncio | |
| import traceback | |
| from typing import List, Dict, Any, Optional | |
| from app.models.base_llm import BaseLLM | |
| try: | |
| from llama_cpp import Llama, LlamaGrammar | |
| HAS_LLAMA_CPP = True | |
| except ImportError: | |
| HAS_LLAMA_CPP = False | |
| LlamaGrammar = None | |
| class LlamaCppModel(BaseLLM): | |
| """ | |
| Wrapper for GGUF models using llama.cpp. | |
| Provides significant speedups on CPU compared to Transformers. | |
| """ | |
| def __init__(self, name: str, model_id: str, model_path: str = None, n_ctx: int = 4096, grammar_path: str = None, n_gpu_layers: int = -1): | |
| super().__init__(name, model_id) | |
| self.model_path = model_path | |
| self.n_ctx = n_ctx | |
| self.grammar_path = grammar_path | |
| self.n_gpu_layers = n_gpu_layers | |
| self.default_grammar = None # Will be loaded from file if provided | |
| self.llm = None | |
| self._response_cache = {} | |
| self._max_cache_size = 100 | |
| if not HAS_LLAMA_CPP: | |
| raise ImportError("llama-cpp-python is not installed. Cannot use GGUF models.") | |
| async def initialize(self) -> None: | |
| """Load GGUF model.""" | |
| if self._initialized: | |
| return | |
| if not self.model_path or not os.path.exists(self.model_path): | |
| # If exact path isn't provided, try to find it in the model directory | |
| # logic handled in registry usually, but safety check here | |
| raise FileNotFoundError(f"GGUF model file not found at: {self.model_path}") | |
| try: | |
| print(f"[{self.name}] Loading GGUF model from: {self.model_path}") | |
| print(f"[{self.name}] File size: {os.path.getsize(self.model_path) / (1024*1024):.2f} MB") | |
| print(f"[{self.name}] n_ctx={self.n_ctx}, n_threads={os.cpu_count()}, n_gpu_layers={self.n_gpu_layers}") | |
| # Load model in a thread to avoid blocking event loop | |
| # Enable verbose to see llama.cpp errors | |
| self.llm = await asyncio.to_thread( | |
| Llama, | |
| model_path=self.model_path, | |
| n_ctx=self.n_ctx, | |
| n_threads=os.cpu_count(), # Use all available cores | |
| n_gpu_layers=self.n_gpu_layers, # GPU layer offloading | |
| verbose=True # Enable verbose to see loading errors | |
| ) | |
| self._initialized = True | |
| print(f"[{self.name}] GGUF Model loaded successfully (n_ctx={self.n_ctx}, n_gpu_layers={self.n_gpu_layers})") | |
| # Load grammar file if provided | |
| if self.grammar_path: | |
| grammar_full_path = os.path.join(os.path.dirname(__file__), "..", "logic", self.grammar_path) | |
| if os.path.exists(grammar_full_path): | |
| with open(grammar_full_path, 'r', encoding='utf-8') as f: | |
| self.default_grammar = f.read() | |
| print(f"[{self.name}] Loaded grammar from: {grammar_full_path}") | |
| else: | |
| print(f"[{self.name}] Grammar file not found: {grammar_full_path}") | |
| except Exception as e: | |
| error_msg = str(e) if str(e) else repr(e) | |
| print(f"[{self.name}] Failed to load GGUF model: {error_msg}") | |
| print(f"[{self.name}] Full traceback:") | |
| traceback.print_exc() | |
| raise RuntimeError(f"Failed to load GGUF model: {error_msg}") from e | |
| async def generate( | |
| self, | |
| prompt: str = None, | |
| chat_messages: List[Dict[str, str]] = None, | |
| max_new_tokens: int = 150, | |
| temperature: float = 0.7, | |
| top_p: float = 0.9, | |
| grammar: str = None, | |
| **kwargs | |
| ) -> str: | |
| """Generate text using llama.cpp | |
| Args: | |
| prompt: Simple text prompt (converted to user message) | |
| chat_messages: List of chat messages with role/content | |
| max_new_tokens: Maximum tokens to generate | |
| temperature: Sampling temperature (lower = more deterministic) | |
| top_p: Nucleus sampling threshold | |
| grammar: Optional GBNF grammar string to constrain output | |
| """ | |
| if not self._initialized or self.llm is None: | |
| raise RuntimeError(f"[{self.name}] Model not initialized") | |
| # Ensure we have a list of messages | |
| messages = chat_messages | |
| if not messages and prompt: | |
| messages = [{"role": "user", "content": prompt}] | |
| if not messages: | |
| raise ValueError("Either prompt or chat_messages required") | |
| # Cache Check - using stringified messages for the key | |
| import json | |
| cache_key = f"{json.dumps(messages)}_{max_new_tokens}_{temperature}_{top_p}_{grammar is not None}" | |
| if cache_key in self._response_cache: | |
| return self._response_cache[cache_key] | |
| print(f"DEBUG: Generating with messages: {messages}", flush=True) | |
| if grammar: | |
| print(f"DEBUG: Using GBNF grammar constraint", flush=True) | |
| # Prepare grammar object if provided | |
| llama_grammar = None | |
| if grammar and LlamaGrammar: | |
| try: | |
| llama_grammar = LlamaGrammar.from_string(grammar) | |
| except Exception as e: | |
| print(f"DEBUG: Failed to parse grammar: {e}", flush=True) | |
| llama_grammar = None | |
| # Generate using chat completion to leverage internal templates | |
| output = await asyncio.to_thread( | |
| self.llm.create_chat_completion, | |
| messages=messages, | |
| max_tokens=max_new_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| grammar=llama_grammar, | |
| ) | |
| print(f"DEBUG: Raw output object: {output}", flush=True) | |
| response_text = output['choices'][0]['message']['content'].strip() | |
| print(f"DEBUG: Extracted text: {response_text}", flush=True) | |
| # Cache Store | |
| if len(self._response_cache) >= self._max_cache_size: | |
| first_key = next(iter(self._response_cache)) | |
| del self._response_cache[first_key] | |
| self._response_cache[cache_key] = response_text | |
| return response_text | |
| def get_info(self) -> Dict[str, Any]: | |
| """Return model information for /models endpoint.""" | |
| return { | |
| "name": self.name, | |
| "model_id": self.model_id, | |
| "type": "gguf", | |
| "backend": "llama.cpp", | |
| "context_length": self.n_ctx, | |
| "loaded": self._initialized, | |
| "model_path": self.model_path, | |
| "has_grammar": self.default_grammar is not None, | |
| "gpu_layers": self.n_gpu_layers | |
| } | |
| async def cleanup(self) -> None: | |
| """Free memory.""" | |
| if self.llm: | |
| del self.llm | |
| self.llm = None | |
| self._initialized = False | |
| print(f"[{self.name}] GGUF Model unloaded") | |