construction-rag / app /rag /utils.py
Ashanasri's picture
Upload app/rag/utils.py with huggingface_hub
35da6df verified
from __future__ import annotations
import asyncio
import logging
import os
import threading
from typing import Any, Dict, List
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
MODEL_PATH = os.getenv(
"LLM_MODEL_PATH",
"models/Llama-3.2-3B-Instruct-Q4_K_M.gguf",
)
_llm = None
_llm_lock = threading.Lock()
def get_llm():
global _llm
if _llm is not None:
return _llm
with _llm_lock:
if _llm is not None:
return _llm
if not os.path.exists(MODEL_PATH):
logger.error(f"[LLM] Model not found: {MODEL_PATH}")
return None
try:
from llama_cpp import Llama
logger.info(f"[LLM] Loading {MODEL_PATH} ...")
_llm = Llama(
model_path=MODEL_PATH,
n_ctx=4096,
n_threads=os.cpu_count() or 4,
n_gpu_layers=0,
verbose=False,
)
logger.info("[LLM] Llama 3 8B Q4 ready OK")
except ImportError:
logger.error("[LLM] llama-cpp-python not installed.")
except Exception as e:
logger.error(f"[LLM] Load error: {e}")
return _llm
# ✅ This is the prompt that worked well — keeping exactly as is
SYSTEM_PROMPT = (
"You are a construction contract expert specializing in building codes, "
"engineering standards, and contract clauses.\n\n"
"STRICT RULES:\n"
"- Answer ONLY using the provided context.\n"
"- Do NOT say phrases like 'Based on the provided context' or "
"'According to the provided documents'.\n"
"- Do NOT restate the question.\n"
"- Do NOT add introductions or conclusions.\n"
"- Be concise and professional.\n"
"- If listing steps, conditions, triggers, calculations, or clauses, use bullet points.\n"
"- If explanation is required, answer in one short paragraph.\n"
"- Keep the answer under 180 words.\n"
"- If information is missing, say: 'Not specified in the provided clauses.'\n"
"- Always mention clause numbers and page numbers when clearly available.\n"
)
def build_context(hits: List[Dict[str, Any]], max_chars: int = 3000) -> str:
parts = []
total = 0
for i, hit in enumerate(hits, 1):
snippet = (
f"[{i}] Source: {hit.get('source', 'unknown')} | "
f"Page: {hit.get('page', '?')}\n"
f"{hit['text']}"
)
if total + len(snippet) > max_chars:
break
parts.append(snippet)
total += len(snippet) + 2
return "\n\n---\n\n".join(parts)
def _make_prompt(question: str, context: str) -> str:
return (
"<|start_header_id|>system<|end_header_id|>\n"
f"{SYSTEM_PROMPT}"
"<|eot_id|>"
"<|start_header_id|>user<|end_header_id|>\n"
f"Context:\n{context}\n\n"
f"Question: {question}"
"<|eot_id|>"
"<|start_header_id|>assistant<|end_header_id|>\n"
)
def generate_answer(
question: str,
hits: List[Dict[str, Any]],
max_tokens: int = 400,
temperature: float = 0.1,
top_p: float = 0.95,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
repeat_penalty: float = 1.1,
) -> str:
context = build_context(hits)
llm = get_llm()
if llm is None:
return "LLM not available.\n\n" + context
prompt = _make_prompt(question, context)
try:
output = llm(
prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
repeat_penalty=repeat_penalty,
stop=["<|eot_id|>", "<|end_of_text|>", "Question:", "Context:"],
echo=False,
)
answer = output["choices"][0]["text"].strip()
# Hard trim — max 200 words
words = answer.split()
if len(words) > 200:
answer = " ".join(words[:200])
return answer
except Exception as e:
logger.error(f"[LLM] Generation failed: {e}")
return f"Generation error: {e}\n\nContext:\n{context}"
def answer_query(
searcher,
query: str,
top_k: int = 4,
rerank: bool = True,
max_tokens: int = 400,
temperature: float = 0.1,
top_p: float = 0.95,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
) -> Dict[str, Any]:
hits = searcher._top_k_sync(query, k=top_k, rerank=rerank)
print("\n RETRIEVED CHUNKS \n")
for i, hit in enumerate(hits, 1):
print(f"[{i}] Source: {hit.get('source')} | Page: {hit.get('page')} | Score: {hit.get('score'):.4f}")
print(hit["text"])
print("-" * 80)
answer = generate_answer(
question=query,
hits=hits,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
repeat_penalty=1.1,
)
print("\n MODEL ANSWER \n")
print(answer)
return {
"query": query,
"answer": answer,
"top_k": hits,
}
async def answer_query_async(
searcher,
query: str,
top_k: int = 4,
rerank: bool = True,
max_tokens: int = 400,
temperature: float = 0.1,
top_p: float = 0.95,
presence_penalty: float = 0.0,
frequency_penalty: float = 0.0,
) -> Dict[str, Any]:
loop = asyncio.get_event_loop()
hits = await loop.run_in_executor(
None, searcher._top_k_sync, query, top_k, rerank
)
print("\n RETRIEVED CHUNKS \n")
for i, hit in enumerate(hits, 1):
print(f"[{i}] Source: {hit.get('source')} | Page: {hit.get('page')} | Score: {hit.get('score'):.4f}")
print(hit["text"])
print("-" * 80)
# Kabla top_p na repeat_penalty hazikupelekwa — ndio sababu jibu lilibadilika
answer = await loop.run_in_executor(
None,
lambda: generate_answer(
question=query,
hits=hits,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
presence_penalty=presence_penalty,
frequency_penalty=frequency_penalty,
repeat_penalty=1.1,
)
)
print("\n MODEL ANSWER \n")
print(answer)
return {
"query": query,
"answer": answer,
"top_k": hits,
}
# # app/rag/utils.py
# from __future__ import annotations
# from typing import Dict, Any
# import re
# class RAGSearcher:
# """
# Simple retriever wrapper. You can implement your actual retriever here.
# """
# def __init__(self, index):
# self.index = index # your FAISS/Chroma/etc index
# def top_k(self, query: str, k: int = 5, rerank: bool = True) -> list[Dict[str, Any]]:
# """
# Retrieve top_k chunks from the index.
# This should return a list of dicts like:
# [{"id": 1, "source": "file.pdf", "page": 2, "text": "...", "score": 0.8}, ...]
# """
# # Replace this with your actual retrieval logic
# hits = self.index.search(query, k=k, rerank=rerank)
# return hits
# def answer_query(searcher: RAGSearcher, query: str, top_k: int = 5, rerank: bool = True) -> Dict[str, Any]:
# """
# Retrieve top_k chunks and produce a short, human-readable answer.
# No LLM required — uses simple keyword extraction.
# """
# # Step 1: retrieve
# hits = searcher.top_k(query, k=top_k, rerank=rerank)
# # Step 2: combine text for context
# all_text = " ".join([h["text"] for h in hits])
# # Step 3: extract relevant sentence(s)
# query_words = re.findall(r"\w+", query.lower())
# sentences = re.split(r"(?<=[.!?])\s+", all_text)
# scored_sentences = []
# for s in sentences:
# score = sum(1 for w in query_words if w in s.lower())
# if score > 0:
# scored_sentences.append((score, s.strip()))
# if scored_sentences:
# scored_sentences.sort(key=lambda x: x[0], reverse=True)
# answer = scored_sentences[0][1]
# else:
# # fallback
# answer = hits[0]["text"] if hits else ""
# return {
# "query": query,
# "answer": answer.strip(),
# "top_k": hits
# }
# # Optional: helper to simulate FAISS-like search for testing
# class DummyIndex:
# def __init__(self, docs):
# self.docs = docs
# def search(self, query, k=5, rerank=True):
# # very simple: return first k docs with fake scores
# return [{"id": i, "source": doc.get("source", ""), "page": doc.get("page", 1),
# "text": doc["text"], "score": 1.0 - 0.1*i} for i, doc in enumerate(self.docs[:k])]