| import logging |
| import re |
| import httpx |
| import json |
| import asyncio |
| import os |
| import threading |
| from typing import Optional, Dict, Any, List |
| from app.core.config import get_settings |
|
|
| print(">>> LLM MODULE IMPORTED", flush=True) |
|
|
| |
| HAS_LLAMA_CPP = False |
| try: |
| import llama_cpp |
| HAS_LLAMA_CPP = True |
| except Exception: |
| |
| |
| pass |
|
|
| settings = get_settings() |
| logger = logging.getLogger(__name__) |
|
|
| class LocalLLMEngine: |
| """Handles local execution of GGUF models using llama-cpp-python.""" |
| |
| def __init__(self): |
| self._model = None |
| self._lock = threading.Lock() |
| self.repo_id = settings.LOCAL_MODEL_REPO |
| self.filename = settings.LOCAL_MODEL_FILE |
| self.models_dir = settings.LOCAL_MODEL_DIR |
| |
| def _ensure_model(self): |
| """Lazy load and potentially download the model. Thread-safe.""" |
| with self._lock: |
| if self._model: |
| return self._model |
| |
| try: |
| from llama_cpp import Llama |
| from huggingface_hub import hf_hub_download |
| except Exception as e: |
| logger.error(f"Cannot load local LLM engine (likely missing system dependencies for llama_cpp): {e}") |
| return None |
| |
| try: |
| |
| os.makedirs(self.models_dir, exist_ok=True) |
| |
| |
| model_path = os.path.abspath(os.path.join(self.models_dir, self.filename)) |
| logger.info(f"LocalLLMEngine: Checking for model at {model_path}") |
| |
| if os.path.exists(model_path): |
| file_size = os.path.getsize(model_path) |
| logger.info(f"LocalLLMEngine: Model file found. Size: {file_size / (1024*1024):.2f} MB") |
| if file_size < 100 * 1024 * 1024: |
| logger.warning(f"LocalLLMEngine: Model file seems too small ({file_size} bytes). It might be an LFS pointer. Re-downloading...") |
| os.remove(model_path) |
| |
| if not os.path.exists(model_path): |
| logger.warning(f"LocalLLMEngine: Model not found at expected path or removed. Attempting download from {self.repo_id}...") |
| downloaded_path = hf_hub_download( |
| repo_id=self.repo_id, |
| filename=self.filename, |
| local_dir=self.models_dir |
| ) |
| model_path = os.path.abspath(downloaded_path) |
| logger.info(f"LocalLLMEngine: Download complete. Size: {os.path.getsize(model_path) / (1024*1024):.2f} MB") |
| |
| |
| |
| |
| |
| self._model = Llama( |
| model_path=model_path, |
| n_ctx=settings.LOCAL_LLM_CONTEXT, |
| n_threads=1, |
| n_gpu_layers=0, |
| logits_all=False, |
| verbose=False |
| ) |
| logger.info(f"Local LLM engine initialized (ctx: {settings.LOCAL_LLM_CONTEXT}, threads: 1).") |
| return self._model |
| except Exception as e: |
| logger.error(f"Failed to initialize local LLM engine: {e}") |
| return None |
|
|
| def _strip_thoughts(self, text: str) -> str: |
| """Removes internal thinking blocks from Gemma 4 output.""" |
| if not text: |
| return text |
| |
| text = re.sub(r'<\|channel>thought.*?<channel\|>', '', text, flags=re.DOTALL) |
| return text.strip() |
|
|
| def generate(self, prompt: str, system_prompt: str, temperature: float) -> Optional[str]: |
| """Generate response using the local model.""" |
| model = self._ensure_model() |
| if not model: |
| return None |
| |
| try: |
| |
| formatted_prompt = f"<|turn>system\n{system_prompt} <turn|>\n<|turn>user\n{prompt} <turn|>\n<|turn>model\n" |
| |
| logger.debug(f"LocalLLMEngine: Starting inference with Gemma 4...") |
| output = model( |
| formatted_prompt, |
| max_tokens=512, |
| temperature=temperature, |
| stop=["<turn|>", "<|turn>", "<|im_end|>", "<|endoftext|>"], |
| echo=False |
| ) |
| raw_text = output['choices'][0]['text'].strip() |
| text = self._strip_thoughts(raw_text) |
| logger.info(f"LocalLLMEngine: Inference complete. Generated {len(text)} characters (stripped thoughts).") |
| return text |
| except Exception as e: |
| logger.error(f"Error during local LLM inference: {e}") |
| return None |
|
|
| class LLMService: |
| """Centralized service for Large Language Model interactions.""" |
| |
| def __init__(self): |
| self.groq_api_key = settings.GROQ_API_KEY |
| self.groq_model = settings.GROQ_MODEL |
| self.groq_url = "https://api.groq.com/openai/v1/chat/completions" |
| self.local_engine = LocalLLMEngine() |
| |
| |
| self._pii_patterns = [ |
| |
| (re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), '<EMAIL>'), |
| (re.compile(r'(?:\+?91|0)?[6-9]\d{9}'), '<PHONE>'), |
| (re.compile(r'(?:\d[ -]*?){12,19}'), '<CARD>'), |
| (re.compile(r'[Xx]+\d{3,6}'), '<ACCOUNT>'), |
| (re.compile(r'[A-Z]{5}[0-9]{4}[A-Z]{1}'), '<PAN>'), |
| (re.compile(r'\d{4}\s\d{4}\s\d{4}'), '<AADHAAR>'), |
| ] |
|
|
| @property |
| def is_enabled(self) -> bool: |
| """Check if any LLM service is available.""" |
| return HAS_LLAMA_CPP |
|
|
| def _sanitize_for_external(self, text: str) -> str: |
| """Extra PII scrub before sending to any external/third-party LLM API.""" |
| if not text: |
| return text |
| text = re.sub(r'(?i)(Dear|Hello|Hi)\s+[A-Za-z\s]+,', r'\1 Customer,', text) |
| for pattern, label in self._pii_patterns: |
| text = pattern.sub(label, text) |
| return text |
|
|
| async def generate_response( |
| self, |
| prompt: str, |
| system_prompt: Optional[str] = "You are a helpful financial assistant.", |
| temperature: float = 0.5, |
| response_format: Optional[str] = None, |
| timeout: float = 120.0 |
| ) -> Optional[str]: |
| """Generic method to generate a response, prioritizing local execution.""" |
| global HAS_LLAMA_CPP |
| |
| |
| if HAS_LLAMA_CPP: |
| |
| try: |
| loop = asyncio.get_event_loop() |
| res = await loop.run_in_executor( |
| None, |
| self.local_engine.generate, |
| prompt, |
| system_prompt, |
| temperature |
| ) |
| if res: |
| logger.info(">>> LLM_ENGINE: Local (Gemma 4) success.") |
| return res |
| |
| except Exception as e: |
| |
| |
| if "shared object file" in str(e) or "libc" in str(e).lower(): |
| HAS_LLAMA_CPP = False |
| logger.error(f">>> LLM_ENGINE: Fatal library error. Disabling local LLM: {e}") |
| else: |
| logger.warning(f">>> LLM_ENGINE: Local engine runtime error: {e}") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| logger.warning("Local LLM engine is unavailable. Groq fallback is disabled.") |
| return None |
|
|
| async def _call_groq( |
| self, |
| prompt: str, |
| system_prompt: str, |
| temperature: float, |
| response_format: Optional[str], |
| timeout: float |
| ) -> Optional[str]: |
| """Call the Groq API (Fallback provider). Content must be pre-sanitized.""" |
| headers = { |
| "Authorization": f"Bearer {self.groq_api_key}", |
| "Content-Type": "application/json" |
| } |
|
|
| messages = [] |
| if system_prompt: |
| messages.append({"role": "system", "content": system_prompt}) |
| messages.append({"role": "user", "content": prompt}) |
|
|
| payload = { |
| "model": self.groq_model, |
| "messages": messages, |
| "temperature": temperature |
| } |
|
|
| if response_format == "json_object": |
| payload["response_format"] = {"type": "json_object"} |
|
|
| try: |
| async with httpx.AsyncClient() as client: |
| resp = await client.post(self.groq_url, headers=headers, json=payload, timeout=timeout) |
| if resp.status_code == 200: |
| return resp.json()['choices'][0]['message']['content'] |
| elif resp.status_code == 429: |
| logger.error(f"Groq API Rate Limit Reached (429). Falling back to Regex engine.") |
| return None |
| else: |
| logger.error(f"Groq API Error ({resp.status_code}): {resp.text[:200]}") |
| return None |
| except Exception as e: |
| logger.error(f"Groq Connection Error: {type(e).__name__}: {e}") |
| return None |
|
|
| async def generate_json( |
| self, |
| prompt: str, |
| system_prompt: Optional[str] = "You are a financial intelligence engine. Always output valid JSON objects.", |
| temperature: float = 0.2, |
| timeout: float = 60.0 |
| ) -> Optional[Dict[str, Any]]: |
| """Method specifically for JSON responses with robust parsing.""" |
| content = await self.generate_response( |
| prompt=prompt, |
| system_prompt=system_prompt, |
| temperature=temperature, |
| response_format="json_object", |
| timeout=timeout |
| ) |
| |
| if not content: |
| return None |
| |
| try: |
| json_match = re.search(r'(\{.*\})', content, re.DOTALL) |
| if json_match: |
| content = json_match.group(1) |
| |
| content = content.strip().replace('```json', '').replace('```', '').strip() |
| return json.loads(content) |
| except (json.JSONDecodeError, IndexError) as e: |
| logger.error(f"LLM JSON Decode Error: {e}. Content: {content[:200]}...") |
| try: |
| |
| cleaned = re.sub(r',\s*([\]\}])', r'\1', content) |
| return json.loads(cleaned) |
| except Exception: |
| return None |
|
|
| |
| _llm_service = None |
|
|
| def get_llm_service() -> LLMService: |
| global _llm_service |
| if _llm_service is None: |
| _llm_service = LLMService() |
| return _llm_service |
|
|