File size: 7,037 Bytes
8cde7d1
 
 
 
 
 
 
cdff838
8cde7d1
 
 
 
329abd1
8cde7d1
 
 
329abd1
8cde7d1
 
 
 
 
 
 
 
7c2f84b
8cde7d1
 
 
c14ac43
7c2f84b
c14ac43
8cde7d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fb1531e
7c2f84b
8cde7d1
 
fb1531e
8cde7d1
 
 
 
 
7c2f84b
fb1531e
8cde7d1
 
 
7c2f84b
8cde7d1
c14ac43
 
 
 
 
 
 
 
 
 
8cde7d1
cdff838
 
 
 
 
8cde7d1
 
 
 
 
 
 
 
329abd1
8cde7d1
 
329abd1
 
 
 
 
 
 
 
 
 
8cde7d1
 
 
 
db4996d
 
 
 
 
 
8cde7d1
 
db4996d
 
329abd1
8cde7d1
 
 
9d2cc15
329abd1
 
 
 
 
 
 
 
 
 
 
9d2cc15
db4996d
8cde7d1
db4996d
 
8cde7d1
 
 
329abd1
8cde7d1
 
9d2cc15
 
db4996d
9d2cc15
8cde7d1
 
 
 
 
 
 
 
 
baa08b7
 
 
 
 
 
 
 
 
c14ac43
7c2f84b
 
baa08b7
 
8cde7d1
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
GGUF Model implementation using llama-cpp-python.
Highly optimized for CPU inference.
"""

import os
import asyncio
import traceback
from typing import List, Dict, Any, Optional
from app.models.base_llm import BaseLLM

try:
    from llama_cpp import Llama, LlamaGrammar
    HAS_LLAMA_CPP = True
except ImportError:
    HAS_LLAMA_CPP = False
    LlamaGrammar = None


class LlamaCppModel(BaseLLM):
    """
    Wrapper for GGUF models using llama.cpp.
    Provides significant speedups on CPU compared to Transformers.
    """
    
    def __init__(self, name: str, model_id: str, model_path: str = None, n_ctx: int = 4096, grammar_path: str = None, n_gpu_layers: int = -1):
        super().__init__(name, model_id)
        self.model_path = model_path
        self.n_ctx = n_ctx
        self.grammar_path = grammar_path
        self.n_gpu_layers = n_gpu_layers
        self.default_grammar = None  # Will be loaded from file if provided
        self.llm = None
        self._response_cache = {}
        self._max_cache_size = 100
        
        if not HAS_LLAMA_CPP:
            raise ImportError("llama-cpp-python is not installed. Cannot use GGUF models.")

    async def initialize(self) -> None:
        """Load GGUF model."""
        if self._initialized:
            return

        if not self.model_path or not os.path.exists(self.model_path):
             # If exact path isn't provided, try to find it in the model directory
             # logic handled in registry usually, but safety check here
             raise FileNotFoundError(f"GGUF model file not found at: {self.model_path}")

        try:
            print(f"[{self.name}] Loading GGUF model from: {self.model_path}")
            print(f"[{self.name}] File size: {os.path.getsize(self.model_path) / (1024*1024):.2f} MB")
            print(f"[{self.name}] n_ctx={self.n_ctx}, n_threads={os.cpu_count()}, n_gpu_layers={self.n_gpu_layers}")
            
            # Load model in a thread to avoid blocking event loop
            # Enable verbose to see llama.cpp errors
            self.llm = await asyncio.to_thread(
                Llama,
                model_path=self.model_path,
                n_ctx=self.n_ctx,
                n_threads=os.cpu_count(), # Use all available cores
                n_gpu_layers=self.n_gpu_layers,  # GPU layer offloading
                verbose=True  # Enable verbose to see loading errors
            )
            
            self._initialized = True
            print(f"[{self.name}] GGUF Model loaded successfully (n_ctx={self.n_ctx}, n_gpu_layers={self.n_gpu_layers})")
            
            # Load grammar file if provided
            if self.grammar_path:
                grammar_full_path = os.path.join(os.path.dirname(__file__), "..", "logic", self.grammar_path)
                if os.path.exists(grammar_full_path):
                    with open(grammar_full_path, 'r', encoding='utf-8') as f:
                        self.default_grammar = f.read()
                    print(f"[{self.name}] Loaded grammar from: {grammar_full_path}")
                else:
                    print(f"[{self.name}] Grammar file not found: {grammar_full_path}")
            
        except Exception as e:
            error_msg = str(e) if str(e) else repr(e)
            print(f"[{self.name}] Failed to load GGUF model: {error_msg}")
            print(f"[{self.name}] Full traceback:")
            traceback.print_exc()
            raise RuntimeError(f"Failed to load GGUF model: {error_msg}") from e

    async def generate(
        self,
        prompt: str = None,
        chat_messages: List[Dict[str, str]] = None,
        max_new_tokens: int = 150,
        temperature: float = 0.7,
        top_p: float = 0.9,
        grammar: str = None,
        **kwargs
    ) -> str:
        """Generate text using llama.cpp
        
        Args:
            prompt: Simple text prompt (converted to user message)
            chat_messages: List of chat messages with role/content
            max_new_tokens: Maximum tokens to generate
            temperature: Sampling temperature (lower = more deterministic)
            top_p: Nucleus sampling threshold
            grammar: Optional GBNF grammar string to constrain output
        """
        
        if not self._initialized or self.llm is None:
            raise RuntimeError(f"[{self.name}] Model not initialized")
            
        # Ensure we have a list of messages
        messages = chat_messages
        if not messages and prompt:
            messages = [{"role": "user", "content": prompt}]
        
        if not messages:
            raise ValueError("Either prompt or chat_messages required")

        # Cache Check - using stringified messages for the key
        import json
        cache_key = f"{json.dumps(messages)}_{max_new_tokens}_{temperature}_{top_p}_{grammar is not None}"
        if cache_key in self._response_cache:
            return self._response_cache[cache_key]

        print(f"DEBUG: Generating with messages: {messages}", flush=True)
        if grammar:
            print(f"DEBUG: Using GBNF grammar constraint", flush=True)

        # Prepare grammar object if provided
        llama_grammar = None
        if grammar and LlamaGrammar:
            try:
                llama_grammar = LlamaGrammar.from_string(grammar)
            except Exception as e:
                print(f"DEBUG: Failed to parse grammar: {e}", flush=True)
                llama_grammar = None

        # Generate using chat completion to leverage internal templates
        output = await asyncio.to_thread(
            self.llm.create_chat_completion,
            messages=messages,
            max_tokens=max_new_tokens,
            temperature=temperature,
            top_p=top_p,
            grammar=llama_grammar,
        )
        
        print(f"DEBUG: Raw output object: {output}", flush=True)
        
        response_text = output['choices'][0]['message']['content'].strip()
        print(f"DEBUG: Extracted text: {response_text}", flush=True)

        # Cache Store
        if len(self._response_cache) >= self._max_cache_size:
            first_key = next(iter(self._response_cache))
            del self._response_cache[first_key]
        self._response_cache[cache_key] = response_text
        
        return response_text

    def get_info(self) -> Dict[str, Any]:
        """Return model information for /models endpoint."""
        return {
            "name": self.name,
            "model_id": self.model_id,
            "type": "gguf",
            "backend": "llama.cpp",
            "context_length": self.n_ctx,
            "loaded": self._initialized,
            "model_path": self.model_path,
            "has_grammar": self.default_grammar is not None,
            "gpu_layers": self.n_gpu_layers
        }

    async def cleanup(self) -> None:
        """Free memory."""
        if self.llm:
            del self.llm
            self.llm = None
        self._initialized = False
        print(f"[{self.name}] GGUF Model unloaded")