Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import os | |
| import threading | |
| from typing import Any, Dict, List | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| MODEL_PATH = os.getenv( | |
| "LLM_MODEL_PATH", | |
| "models/Llama-3.2-3B-Instruct-Q4_K_M.gguf", | |
| ) | |
| _llm = None | |
| _llm_lock = threading.Lock() | |
| def get_llm(): | |
| global _llm | |
| if _llm is not None: | |
| return _llm | |
| with _llm_lock: | |
| if _llm is not None: | |
| return _llm | |
| if not os.path.exists(MODEL_PATH): | |
| logger.error(f"[LLM] Model not found: {MODEL_PATH}") | |
| return None | |
| try: | |
| from llama_cpp import Llama | |
| logger.info(f"[LLM] Loading {MODEL_PATH} ...") | |
| _llm = Llama( | |
| model_path=MODEL_PATH, | |
| n_ctx=4096, | |
| n_threads=os.cpu_count() or 4, | |
| n_gpu_layers=0, | |
| verbose=False, | |
| ) | |
| logger.info("[LLM] Llama 3 8B Q4 ready OK") | |
| except ImportError: | |
| logger.error("[LLM] llama-cpp-python not installed.") | |
| except Exception as e: | |
| logger.error(f"[LLM] Load error: {e}") | |
| return _llm | |
| # ✅ This is the prompt that worked well — keeping exactly as is | |
| SYSTEM_PROMPT = ( | |
| "You are a construction contract expert specializing in building codes, " | |
| "engineering standards, and contract clauses.\n\n" | |
| "STRICT RULES:\n" | |
| "- Answer ONLY using the provided context.\n" | |
| "- Do NOT say phrases like 'Based on the provided context' or " | |
| "'According to the provided documents'.\n" | |
| "- Do NOT restate the question.\n" | |
| "- Do NOT add introductions or conclusions.\n" | |
| "- Be concise and professional.\n" | |
| "- If listing steps, conditions, triggers, calculations, or clauses, use bullet points.\n" | |
| "- If explanation is required, answer in one short paragraph.\n" | |
| "- Keep the answer under 180 words.\n" | |
| "- If information is missing, say: 'Not specified in the provided clauses.'\n" | |
| "- Always mention clause numbers and page numbers when clearly available.\n" | |
| ) | |
| def build_context(hits: List[Dict[str, Any]], max_chars: int = 3000) -> str: | |
| parts = [] | |
| total = 0 | |
| for i, hit in enumerate(hits, 1): | |
| snippet = ( | |
| f"[{i}] Source: {hit.get('source', 'unknown')} | " | |
| f"Page: {hit.get('page', '?')}\n" | |
| f"{hit['text']}" | |
| ) | |
| if total + len(snippet) > max_chars: | |
| break | |
| parts.append(snippet) | |
| total += len(snippet) + 2 | |
| return "\n\n---\n\n".join(parts) | |
| def _make_prompt(question: str, context: str) -> str: | |
| return ( | |
| "<|start_header_id|>system<|end_header_id|>\n" | |
| f"{SYSTEM_PROMPT}" | |
| "<|eot_id|>" | |
| "<|start_header_id|>user<|end_header_id|>\n" | |
| f"Context:\n{context}\n\n" | |
| f"Question: {question}" | |
| "<|eot_id|>" | |
| "<|start_header_id|>assistant<|end_header_id|>\n" | |
| ) | |
| def generate_answer( | |
| question: str, | |
| hits: List[Dict[str, Any]], | |
| max_tokens: int = 400, | |
| temperature: float = 0.1, | |
| top_p: float = 0.95, | |
| presence_penalty: float = 0.0, | |
| frequency_penalty: float = 0.0, | |
| repeat_penalty: float = 1.1, | |
| ) -> str: | |
| context = build_context(hits) | |
| llm = get_llm() | |
| if llm is None: | |
| return "LLM not available.\n\n" + context | |
| prompt = _make_prompt(question, context) | |
| try: | |
| output = llm( | |
| prompt, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| repeat_penalty=repeat_penalty, | |
| stop=["<|eot_id|>", "<|end_of_text|>", "Question:", "Context:"], | |
| echo=False, | |
| ) | |
| answer = output["choices"][0]["text"].strip() | |
| # Hard trim — max 200 words | |
| words = answer.split() | |
| if len(words) > 200: | |
| answer = " ".join(words[:200]) | |
| return answer | |
| except Exception as e: | |
| logger.error(f"[LLM] Generation failed: {e}") | |
| return f"Generation error: {e}\n\nContext:\n{context}" | |
| def answer_query( | |
| searcher, | |
| query: str, | |
| top_k: int = 4, | |
| rerank: bool = True, | |
| max_tokens: int = 400, | |
| temperature: float = 0.1, | |
| top_p: float = 0.95, | |
| presence_penalty: float = 0.0, | |
| frequency_penalty: float = 0.0, | |
| ) -> Dict[str, Any]: | |
| hits = searcher._top_k_sync(query, k=top_k, rerank=rerank) | |
| print("\n RETRIEVED CHUNKS \n") | |
| for i, hit in enumerate(hits, 1): | |
| print(f"[{i}] Source: {hit.get('source')} | Page: {hit.get('page')} | Score: {hit.get('score'):.4f}") | |
| print(hit["text"]) | |
| print("-" * 80) | |
| answer = generate_answer( | |
| question=query, | |
| hits=hits, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| presence_penalty=presence_penalty, | |
| frequency_penalty=frequency_penalty, | |
| repeat_penalty=1.1, | |
| ) | |
| print("\n MODEL ANSWER \n") | |
| print(answer) | |
| return { | |
| "query": query, | |
| "answer": answer, | |
| "top_k": hits, | |
| } | |
| async def answer_query_async( | |
| searcher, | |
| query: str, | |
| top_k: int = 4, | |
| rerank: bool = True, | |
| max_tokens: int = 400, | |
| temperature: float = 0.1, | |
| top_p: float = 0.95, | |
| presence_penalty: float = 0.0, | |
| frequency_penalty: float = 0.0, | |
| ) -> Dict[str, Any]: | |
| loop = asyncio.get_event_loop() | |
| hits = await loop.run_in_executor( | |
| None, searcher._top_k_sync, query, top_k, rerank | |
| ) | |
| print("\n RETRIEVED CHUNKS \n") | |
| for i, hit in enumerate(hits, 1): | |
| print(f"[{i}] Source: {hit.get('source')} | Page: {hit.get('page')} | Score: {hit.get('score'):.4f}") | |
| print(hit["text"]) | |
| print("-" * 80) | |
| # Kabla top_p na repeat_penalty hazikupelekwa — ndio sababu jibu lilibadilika | |
| answer = await loop.run_in_executor( | |
| None, | |
| lambda: generate_answer( | |
| question=query, | |
| hits=hits, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| presence_penalty=presence_penalty, | |
| frequency_penalty=frequency_penalty, | |
| repeat_penalty=1.1, | |
| ) | |
| ) | |
| print("\n MODEL ANSWER \n") | |
| print(answer) | |
| return { | |
| "query": query, | |
| "answer": answer, | |
| "top_k": hits, | |
| } | |
| # # app/rag/utils.py | |
| # from __future__ import annotations | |
| # from typing import Dict, Any | |
| # import re | |
| # class RAGSearcher: | |
| # """ | |
| # Simple retriever wrapper. You can implement your actual retriever here. | |
| # """ | |
| # def __init__(self, index): | |
| # self.index = index # your FAISS/Chroma/etc index | |
| # def top_k(self, query: str, k: int = 5, rerank: bool = True) -> list[Dict[str, Any]]: | |
| # """ | |
| # Retrieve top_k chunks from the index. | |
| # This should return a list of dicts like: | |
| # [{"id": 1, "source": "file.pdf", "page": 2, "text": "...", "score": 0.8}, ...] | |
| # """ | |
| # # Replace this with your actual retrieval logic | |
| # hits = self.index.search(query, k=k, rerank=rerank) | |
| # return hits | |
| # def answer_query(searcher: RAGSearcher, query: str, top_k: int = 5, rerank: bool = True) -> Dict[str, Any]: | |
| # """ | |
| # Retrieve top_k chunks and produce a short, human-readable answer. | |
| # No LLM required — uses simple keyword extraction. | |
| # """ | |
| # # Step 1: retrieve | |
| # hits = searcher.top_k(query, k=top_k, rerank=rerank) | |
| # # Step 2: combine text for context | |
| # all_text = " ".join([h["text"] for h in hits]) | |
| # # Step 3: extract relevant sentence(s) | |
| # query_words = re.findall(r"\w+", query.lower()) | |
| # sentences = re.split(r"(?<=[.!?])\s+", all_text) | |
| # scored_sentences = [] | |
| # for s in sentences: | |
| # score = sum(1 for w in query_words if w in s.lower()) | |
| # if score > 0: | |
| # scored_sentences.append((score, s.strip())) | |
| # if scored_sentences: | |
| # scored_sentences.sort(key=lambda x: x[0], reverse=True) | |
| # answer = scored_sentences[0][1] | |
| # else: | |
| # # fallback | |
| # answer = hits[0]["text"] if hits else "" | |
| # return { | |
| # "query": query, | |
| # "answer": answer.strip(), | |
| # "top_k": hits | |
| # } | |
| # # Optional: helper to simulate FAISS-like search for testing | |
| # class DummyIndex: | |
| # def __init__(self, docs): | |
| # self.docs = docs | |
| # def search(self, query, k=5, rerank=True): | |
| # # very simple: return first k docs with fake scores | |
| # return [{"id": i, "source": doc.get("source", ""), "page": doc.get("page", 1), | |
| # "text": doc["text"], "score": 1.0 - 0.1*i} for i, doc in enumerate(self.docs[:k])] | |