from __future__ import annotations import asyncio import logging import os import threading from typing import Any, Dict, List from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) MODEL_PATH = os.getenv( "LLM_MODEL_PATH", "models/Llama-3.2-3B-Instruct-Q4_K_M.gguf", ) _llm = None _llm_lock = threading.Lock() def get_llm(): global _llm if _llm is not None: return _llm with _llm_lock: if _llm is not None: return _llm if not os.path.exists(MODEL_PATH): logger.error(f"[LLM] Model not found: {MODEL_PATH}") return None try: from llama_cpp import Llama logger.info(f"[LLM] Loading {MODEL_PATH} ...") _llm = Llama( model_path=MODEL_PATH, n_ctx=4096, n_threads=os.cpu_count() or 4, n_gpu_layers=0, verbose=False, ) logger.info("[LLM] Llama 3 8B Q4 ready OK") except ImportError: logger.error("[LLM] llama-cpp-python not installed.") except Exception as e: logger.error(f"[LLM] Load error: {e}") return _llm # ✅ This is the prompt that worked well — keeping exactly as is SYSTEM_PROMPT = ( "You are a construction contract expert specializing in building codes, " "engineering standards, and contract clauses.\n\n" "STRICT RULES:\n" "- Answer ONLY using the provided context.\n" "- Do NOT say phrases like 'Based on the provided context' or " "'According to the provided documents'.\n" "- Do NOT restate the question.\n" "- Do NOT add introductions or conclusions.\n" "- Be concise and professional.\n" "- If listing steps, conditions, triggers, calculations, or clauses, use bullet points.\n" "- If explanation is required, answer in one short paragraph.\n" "- Keep the answer under 180 words.\n" "- If information is missing, say: 'Not specified in the provided clauses.'\n" "- Always mention clause numbers and page numbers when clearly available.\n" ) def build_context(hits: List[Dict[str, Any]], max_chars: int = 3000) -> str: parts = [] total = 0 for i, hit in enumerate(hits, 1): snippet = ( f"[{i}] Source: {hit.get('source', 'unknown')} | " f"Page: {hit.get('page', '?')}\n" f"{hit['text']}" ) if total + len(snippet) > max_chars: break parts.append(snippet) total += len(snippet) + 2 return "\n\n---\n\n".join(parts) def _make_prompt(question: str, context: str) -> str: return ( "<|start_header_id|>system<|end_header_id|>\n" f"{SYSTEM_PROMPT}" "<|eot_id|>" "<|start_header_id|>user<|end_header_id|>\n" f"Context:\n{context}\n\n" f"Question: {question}" "<|eot_id|>" "<|start_header_id|>assistant<|end_header_id|>\n" ) def generate_answer( question: str, hits: List[Dict[str, Any]], max_tokens: int = 400, temperature: float = 0.1, top_p: float = 0.95, presence_penalty: float = 0.0, frequency_penalty: float = 0.0, repeat_penalty: float = 1.1, ) -> str: context = build_context(hits) llm = get_llm() if llm is None: return "LLM not available.\n\n" + context prompt = _make_prompt(question, context) try: output = llm( prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, repeat_penalty=repeat_penalty, stop=["<|eot_id|>", "<|end_of_text|>", "Question:", "Context:"], echo=False, ) answer = output["choices"][0]["text"].strip() # Hard trim — max 200 words words = answer.split() if len(words) > 200: answer = " ".join(words[:200]) return answer except Exception as e: logger.error(f"[LLM] Generation failed: {e}") return f"Generation error: {e}\n\nContext:\n{context}" def answer_query( searcher, query: str, top_k: int = 4, rerank: bool = True, max_tokens: int = 400, temperature: float = 0.1, top_p: float = 0.95, presence_penalty: float = 0.0, frequency_penalty: float = 0.0, ) -> Dict[str, Any]: hits = searcher._top_k_sync(query, k=top_k, rerank=rerank) print("\n RETRIEVED CHUNKS \n") for i, hit in enumerate(hits, 1): print(f"[{i}] Source: {hit.get('source')} | Page: {hit.get('page')} | Score: {hit.get('score'):.4f}") print(hit["text"]) print("-" * 80) answer = generate_answer( question=query, hits=hits, max_tokens=max_tokens, temperature=temperature, top_p=top_p, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, repeat_penalty=1.1, ) print("\n MODEL ANSWER \n") print(answer) return { "query": query, "answer": answer, "top_k": hits, } async def answer_query_async( searcher, query: str, top_k: int = 4, rerank: bool = True, max_tokens: int = 400, temperature: float = 0.1, top_p: float = 0.95, presence_penalty: float = 0.0, frequency_penalty: float = 0.0, ) -> Dict[str, Any]: loop = asyncio.get_event_loop() hits = await loop.run_in_executor( None, searcher._top_k_sync, query, top_k, rerank ) print("\n RETRIEVED CHUNKS \n") for i, hit in enumerate(hits, 1): print(f"[{i}] Source: {hit.get('source')} | Page: {hit.get('page')} | Score: {hit.get('score'):.4f}") print(hit["text"]) print("-" * 80) # Kabla top_p na repeat_penalty hazikupelekwa — ndio sababu jibu lilibadilika answer = await loop.run_in_executor( None, lambda: generate_answer( question=query, hits=hits, max_tokens=max_tokens, temperature=temperature, top_p=top_p, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, repeat_penalty=1.1, ) ) print("\n MODEL ANSWER \n") print(answer) return { "query": query, "answer": answer, "top_k": hits, } # # app/rag/utils.py # from __future__ import annotations # from typing import Dict, Any # import re # class RAGSearcher: # """ # Simple retriever wrapper. You can implement your actual retriever here. # """ # def __init__(self, index): # self.index = index # your FAISS/Chroma/etc index # def top_k(self, query: str, k: int = 5, rerank: bool = True) -> list[Dict[str, Any]]: # """ # Retrieve top_k chunks from the index. # This should return a list of dicts like: # [{"id": 1, "source": "file.pdf", "page": 2, "text": "...", "score": 0.8}, ...] # """ # # Replace this with your actual retrieval logic # hits = self.index.search(query, k=k, rerank=rerank) # return hits # def answer_query(searcher: RAGSearcher, query: str, top_k: int = 5, rerank: bool = True) -> Dict[str, Any]: # """ # Retrieve top_k chunks and produce a short, human-readable answer. # No LLM required — uses simple keyword extraction. # """ # # Step 1: retrieve # hits = searcher.top_k(query, k=top_k, rerank=rerank) # # Step 2: combine text for context # all_text = " ".join([h["text"] for h in hits]) # # Step 3: extract relevant sentence(s) # query_words = re.findall(r"\w+", query.lower()) # sentences = re.split(r"(?<=[.!?])\s+", all_text) # scored_sentences = [] # for s in sentences: # score = sum(1 for w in query_words if w in s.lower()) # if score > 0: # scored_sentences.append((score, s.strip())) # if scored_sentences: # scored_sentences.sort(key=lambda x: x[0], reverse=True) # answer = scored_sentences[0][1] # else: # # fallback # answer = hits[0]["text"] if hits else "" # return { # "query": query, # "answer": answer.strip(), # "top_k": hits # } # # Optional: helper to simulate FAISS-like search for testing # class DummyIndex: # def __init__(self, docs): # self.docs = docs # def search(self, query, k=5, rerank=True): # # very simple: return first k docs with fake scores # return [{"id": i, "source": doc.get("source", ""), "page": doc.get("page", 1), # "text": doc["text"], "score": 1.0 - 0.1*i} for i, doc in enumerate(self.docs[:k])]