fastragbackend / engine.py
Rofati's picture
Update engine.py
7b5ba51 verified
Raw
History Blame Contribute Delete
21.1 kB
import os
import re
import json
import time
import faiss
import numpy as np
from openai import OpenAI
from typing import List, Dict
# ── Swahili / acronym expansion ──────────────────────────────────────────────
QUERY_EXPANSION = {
"bora afya": "good health primary health care",
"afya": "health",
"dawa": "medicine drugs pharmacy",
"hospitali": "hospital health facility",
"maji": "water sanitation",
"chakula": "food safety nutrition",
"mazingira": "environment environmental",
"uchafuzi": "pollution environmental contamination",
"eia": "environmental impact assessment",
"eis": "environmental impact study",
"nema": "national environment management authority",
"osha": "occupational safety health act",
"kebs": "kenya bureau of standards",
"kra": "kenya revenue authority",
"epa": "environment protection authority",
"wrc": "water resources commission authority",
"neema": "national environment management authority",
"esia": "environmental social impact assessment",
"ehs": "environmental health safety",
"hse": "health safety environment",
"ppe": "personal protective equipment",
"shif": "social health insurance fund",
"nhif": "national hospital insurance fund",
"nssf": "national social security fund",
}
CONFIDENCE_STRONG = 0.45
CONFIDENCE_PARTIAL = 0.25
K_RETRIEVE = 12
MAX_HISTORY_TURNS = 6
# Groq's OWN direct API β€” a completely separate free quota from HuggingFace's
# shared Inference Providers billing. Free, no credit card, ~14,400 req/day.
# Get a key at https://console.groq.com/keys
MODEL_CANDIDATES = [
"llama-3.3-70b-versatile", # strongest general-purpose model on Groq
"qwen/qwen3-32b", # strong multilingual fallback
"deepseek-r1-distill-llama-70b", # strong reasoning fallback
"gemma2-9b-it", # fast, reliable final fallback
]
GROQ_BASE_URL = "https://api.groq.com/openai/v1"
# Marker the LLM uses to separate its main answer from suggested follow-ups.
# Frontend never sees this β€” it's stripped out and the questions are returned
# as a separate "suggestions" array for rendering as clickable pills.
SUGGESTION_MARKER = "[[SUGGESTIONS]]"
# ── Vague follow-up detection ─────────────────────────────────────────────────
VAGUE_PATTERNS = [
r"^explain\s*(further|more|again|that|it|this)?\.?$",
r"^tell\s+me\s+more\.?$",
r"^(go\s+on|continue|what\s+else|more\s+details?)\.?$",
r"^(what\s+about\s+(that|this|it)|can\s+you\s+elaborate)\.?$",
r"^(give\s+me\s+more|expand\s+on\s+(that|this|it))\.?$",
r"^(ok|okay|i\s+see|yes|sure|got\s+it|noted)[,.]?\s*$",
r"^(what\s+(are\s+)?(the\s+)?(penalties|fines|consequences)\s*(for\s+(that|this))?)\.?$",
r"^(how\s+(does|do)\s+(that|this|it)\s+work)\.?$",
r"^(what\s+does\s+(that|this)\s+mean)\.?$",
r"^(any\s+examples?)\.?$",
r"^(summarize|summarise)\s*(that|this|it)?\.?$",
]
DOMAIN_WORDS = {
"act", "section", "cap", "osha", "nema", "eia", "emca", "law",
"regulation", "penalty", "fine", "license", "permit", "waste",
"health", "safety", "environment", "chemical", "noise", "fire",
"pollution", "factory", "worker", "employer", "committee",
}
def _is_vague(text: str) -> bool:
t = text.strip().lower()
if len(t.split()) <= 4 and not any(w in DOMAIN_WORDS for w in t.split()):
return True
return any(re.match(p, t) for p in VAGUE_PATTERNS)
class RAGEngine:
def __init__(self):
base_dir = os.path.dirname(os.path.abspath(__file__))
index_path = os.path.join(base_dir, "indexfinal.faiss")
chunks_path = os.path.join(base_dir, "chunksfinal.npy")
if not os.path.exists(index_path) or not os.path.exists(chunks_path):
raise FileNotFoundError(f"FAISS index or chunks missing at {base_dir}")
self.index = faiss.read_index(index_path)
self.chunks = np.load(chunks_path, allow_pickle=True)
# ── Groq's direct API β€” separate free quota from HF's shared billing ──
groq_key = os.environ.get("GROQ_API_KEY", "").strip()
if groq_key:
print(f"βœ… GROQ_API_KEY found (len={len(groq_key)})")
else:
print("⚠️ GROQ_API_KEY not set β€” get a free key at https://console.groq.com/keys")
self.client = OpenAI(
base_url=GROQ_BASE_URL,
api_key=groq_key or "placeholder",
)
self.model_name = MODEL_CANDIDATES[0] # display default; actual call tries all candidates
print(f"βœ… RAGEngine ready β€” provider=groq (direct), candidates={MODEL_CANDIDATES}")
# ── Query expansion ───────────────────────────────────────────────────────
def expand_query(self, text: str) -> str:
lower = text.lower().strip()
expansions = [exp for term, exp in QUERY_EXPANSION.items()
if re.search(r'\b' + re.escape(term) + r'\b', lower)]
return (text + " " + " ".join(expansions)).strip() if expansions else text
# ── Enrich vague follow-ups with topic context before FAISS search ────────
def build_retrieval_query(self, query_text: str, history: List[Dict]) -> str:
if not history or not _is_vague(query_text):
return query_text
last_user_q = ""
last_ai_text = ""
for entry in reversed(history):
role = entry.get("role", "")
text = entry.get("text", "").strip()
if not text:
continue
if role == "user" and not last_user_q and not _is_vague(text):
last_user_q = text[:300]
if role == "ai" and not last_ai_text:
clean = re.sub(r'[#*_`>|]', '', text)
last_ai_text = re.sub(r'\s+', ' ', clean).strip()[:200]
if last_user_q and last_ai_text:
break
parts = [p for p in [last_user_q, last_ai_text, query_text] if p]
return " ".join(parts)
# ── FAISS search ──────────────────────────────────────────────────────────
def _search(self, query_vector: np.ndarray):
qv = np.array(query_vector).astype("float32").reshape(1, -1)
distances, indices = self.index.search(qv, k=K_RETRIEVE)
mask = indices[0] != -1
valid_idx = indices[0][mask]
valid_dst = distances[0][mask]
return valid_idx, valid_dst
def _confidence_tier(self, distances: np.ndarray) -> str:
if len(distances) == 0:
return "none"
best = float(distances[0])
if best >= CONFIDENCE_STRONG: return "strong"
if best >= CONFIDENCE_PARTIAL: return "partial"
return "none"
# ── Build OpenAI-style history (role: user/assistant) ──────────────────────
def _build_history_messages(self, history: List[Dict]) -> list:
max_msgs = MAX_HISTORY_TURNS * 2
trimmed = history[-max_msgs:] if len(history) > max_msgs else history
messages = []
for entry in trimmed:
role = entry.get("role", "user")
text = entry.get("text", "").strip()
if not text:
continue
messages.append({
"role": "assistant" if role == "ai" else "user",
"content": text,
})
return messages
# ── Extract suggested follow-up questions from raw LLM output ─────────────
def _extract_suggestions(self, raw_text: str):
"""
The LLM is instructed to end its response with a marker followed by
a JSON array of 3 short follow-up questions, e.g.:
...main answer...
[[SUGGESTIONS]]["Question 1?", "Question 2?", "Question 3?"]
Returns (main_text_without_marker, list_of_suggestions).
Falls back to an empty list if parsing fails for any reason β€”
never lets a malformed suggestion block break the main answer.
"""
if SUGGESTION_MARKER not in raw_text:
return raw_text, []
main, _, tail = raw_text.partition(SUGGESTION_MARKER)
suggestions = []
try:
start = tail.index('[')
end = tail.rindex(']') + 1
parsed = json.loads(tail[start:end])
if isinstance(parsed, list):
suggestions = [str(s).strip() for s in parsed if str(s).strip()][:4]
except Exception:
suggestions = []
return main.strip(), suggestions
# ── Output cleaning ───────────────────────────────────────────────────────
def clean_output(self, text: str) -> str:
if not text:
return "⚠️ The AI returned an empty response. Please try again."
text = str(text)
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
text = re.sub(r'<\|think\|>.*?<\|/think\|>', '', text, flags=re.DOTALL)
text = text.replace("Β§", "Section ")
text = re.sub(r'Section\s+Section', 'Section', text)
text = re.sub(r'(?i)Cap\s*\(\s*1\s*\)', 'Cap 242', text)
text = re.sub(r'(?i)public health act\s*\(\s*1\s*\)', 'Public Health Act, Cap 242', text)
text = re.sub(r'(?i)public health act\s*\(cap\s*1\)', 'Public Health Act, Cap 242', text)
text = re.sub(r'(?i)\(cap\s*1\)', '(Cap 242)', text)
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'[ \t]{2,}', ' ', text)
return text.strip()
# ── System prompt ─────────────────────────────────────────────────────────
def _system_prompt(self, tier: str, has_history: bool) -> str:
continuity = (
"CONVERSATION CONTINUITY: You are mid-conversation. "
"The chat history already provided shows what was discussed. "
"Use it for follow-up questions, pronouns and back-references. "
"NEVER treat the current message as a new topic if history exists. "
"Do NOT ask the user to repeat anything already in the history.\n\n"
) if has_history else ""
base = (
"You are ENVH.AI Senior Legal Architect β€” a world-class consultant in "
"Kenyan Environmental Health and Safety (EHS) law. "
"Be professional, authoritative, warm, and genuinely helpful.\n\n"
+ continuity +
"FACTORIES ACT NOTE: The Factories Act (Cap 514) is repealed by OSHA 2007. "
"L.N. 31/2004 Safety & Health Committees Rules remain active under OSHA.\n\n"
"CITATION RULES β€” NEVER BREAK:\n"
"1. Use ONLY information from the CONTEXT provided below.\n"
"2. Cite every legal requirement with its Act/Cap/Section.\n"
"3. Never invent fines or section numbers.\n"
"4. Write 'Section' not 'Β§'.\n"
"5. 'public health act (1)' or 'Cap(1)' = 'Public Health Act, Cap 242'.\n\n"
)
if tier == "strong":
return base + (
"TASK: Give a complete authoritative answer using ONLY the context.\n\n"
"FORMAT:\n"
"### [Issue Title]\n"
"**Bottom Line:** [one clear sentence]\n\n"
"**Statutory Requirements:**\n"
"- [Requirement] ([Act / Cap / Section])\n\n"
"**Risk & Penalty Matrix:**\n"
"| Requirement | Provision | Penalty/Risk |\n"
"| :--- | :--- | :--- |\n\n"
"**Action Plan:**\n"
"1. [Step 1]\n"
"2. [Step 2]\n"
"3. [Step 3]\n\n"
+ self._suggestion_instruction()
)
else:
return base + (
"TASK: Answer what you CAN from context. Clearly flag any gaps.\n\n"
"FORMAT:\n"
"### [Issue Title]\n"
"**What We Know:** [answer with citations]\n\n"
"**Statutory Requirements:**\n"
"- [Requirement] ([citation])\n\n"
"**Knowledge Gaps:**\n"
"- [What's missing from the database]\n\n"
"> πŸ“© For complete guidance contact **support@envhai.co.ke**\n\n"
+ self._suggestion_instruction()
)
@staticmethod
def _suggestion_instruction() -> str:
return (
"AFTER your main answer, on a new line, output exactly this marker "
f"followed immediately by a JSON array of 3 short, specific follow-up "
f"questions the user would naturally ask next (based on what you just "
f"answered) β€” no other text after it:\n\n"
f"{SUGGESTION_MARKER}[\"...\", \"...\", \"...\"]\n\n"
"Each question must be under 12 words, directly related to the topic "
"just discussed, and phrased the way a user would type it (not a "
"command). Valid JSON array of strings only β€” no markdown, no trailing text."
)
# ── Main RAG call ─────────────────────────────────────────────────────────
def search_and_ask(
self,
query_text: str,
query_vector: np.ndarray,
history: List[Dict] = None,
) -> dict:
"""Always returns {"answer": str, "suggestions": list[str]}."""
if history is None:
history = []
valid_idx, valid_dst = self._search(query_vector)
tier = self._confidence_tier(valid_dst)
if tier == "none" or len(valid_idx) == 0:
if history and _is_vague(query_text):
return {
"answer": (
"I'd like to continue our discussion, but I need a little more "
"detail to find the right legal provisions.\n\n"
"Try rephrasing β€” for example:\n"
"- *\"What are the penalties under the Public Health Act Cap 242?\"*\n"
"- *\"Explain the Cabinet Secretary's powers under Section 17\"*\n\n"
"That helps me pull the exact statutory language. πŸ“‹"
),
"suggestions": [],
}
return {"answer": self._helpful_redirect(query_text), "suggestions": []}
min_score = CONFIDENCE_PARTIAL * 0.8
good_idx = [valid_idx[i] for i in range(len(valid_idx))
if valid_dst[i] >= min_score] or list(valid_idx[:5])
context = "\n---\n".join(str(self.chunks[i]) for i in good_idx)
context = re.sub(r'(?i)public health act\s*\(\s*1\s*\)',
'Public Health Act, Cap 242', context)
if history and _is_vague(query_text):
last_q = next(
(e["text"] for e in reversed(history)
if e.get("role") == "user" and not _is_vague(e.get("text", ""))),
""
)
user_msg = (
f"CONTEXT:\n{context}\n\n"
f"ORIGINAL TOPIC: {last_q}\n"
f"FOLLOW-UP: {query_text}\n\n"
f"Continue the discussion on the original topic using the context above."
)
else:
user_msg = f"CONTEXT:\n{context}\n\nQUERY: {query_text}"
system_prompt = self._system_prompt(tier, bool(history))
history_messages = self._build_history_messages(history)
messages = (
[{"role": "system", "content": system_prompt}]
+ history_messages
+ [{"role": "user", "content": user_msg}]
)
# ── Try each candidate model in order. Auto-falls through to the
# next model if one isn't currently available via any provider,
# is gated, or is rate-limited after its own retries.
last_error = None
for model_name in MODEL_CANDIDATES:
for attempt in range(2):
try:
response = self.client.chat.completions.create(
model=model_name,
messages=messages,
temperature=0,
max_tokens=2000,
)
self.model_name = model_name # remember which one worked
print(f"βœ… Answered using {model_name}")
raw = response.choices[0].message.content
main_text, suggestions = self._extract_suggestions(raw)
return {
"answer": self.clean_output(main_text),
"suggestions": suggestions,
}
except Exception as e:
err = str(e).lower()
last_error = e
print(f"❌ {model_name} attempt {attempt+1} error: {e}")
# Token-level auth problems are the same for every model β€”
# no point trying the rest of the candidates.
if ("401" in err or "credential" in err or "api key" in err or
("permission" in err and "gated" not in err)):
return {
"answer": (
"⚠️ **Permission Error:** Your GROQ_API_KEY may be missing or invalid.\n\n"
"**Fix:**\n"
"1. Go to https://console.groq.com/keys\n"
"2. Create a free API key (no credit card needed)\n"
"3. In your HF Space β†’ Settings β†’ Secrets, add/update **GROQ_API_KEY**\n"
"4. Factory reboot the Space"
),
"suggestions": [],
}
# Rate-limited β€” back off and retry the SAME model once
if "rate" in err or "429" in err:
if attempt < 1:
time.sleep(2 ** (attempt + 1))
continue
break # exhausted retries on this model β€” try next
# Model unavailable / gated / not supported by any
# provider right now β€” move on to the next candidate.
break
# Every candidate failed
return {
"answer": (
f"⚠️ **All models temporarily unavailable.**\n\n"
f"Tried: {', '.join(MODEL_CANDIDATES)}\n\n"
f"Last error: `{str(last_error)}`\n\n"
f"Please try again shortly, or contact support@envhai.co.ke."
),
"suggestions": [],
}
# ── Helpful redirect when no chunks match ─────────────────────────────────
def _helpful_redirect(self, query_text: str) -> str:
q = query_text.lower()
if any(w in q for w in ["health", "hospital", "clinic", "afya"]):
hint = (
"- *'Licensing requirements for a clinic under the Public Health Act?'*\n"
"- *'OSHA requirements for healthcare workers?'*"
)
elif any(w in q for w in ["eia", "nema", "environmental impact", "esia"]):
hint = (
"- *'What projects require an EIA under EMCA?'*\n"
"- *'How to register as an EIA Lead Expert with NEMA?'*"
)
elif any(w in q for w in ["law", "act", "regulation", "statute"]):
hint = (
"- *'OSHA 2007 requirements for workplace safety committees?'*\n"
"- *'Penalties under EMCA for pollution?'*"
)
else:
hint = (
"- *'Occupational health requirements for a factory?'*\n"
"- *'Food business permits under Cap 254?'*"
)
return (
f"### We're Working on This For You\n\n"
f"No direct match found for **\"{query_text}\"**.\n\n"
f"Try rephrasing with a specific Act or sector:\n{hint}\n\n"
f"> πŸ“© **support@envhai.co.ke** β€” responds within 1 business day.\n\n"
f"*Tip: The more specific your question, the better the answer.*"
)