import os import re import json import time import faiss import numpy as np from openai import OpenAI from typing import List, Dict # ── Swahili / acronym expansion ────────────────────────────────────────────── QUERY_EXPANSION = { "bora afya": "good health primary health care", "afya": "health", "dawa": "medicine drugs pharmacy", "hospitali": "hospital health facility", "maji": "water sanitation", "chakula": "food safety nutrition", "mazingira": "environment environmental", "uchafuzi": "pollution environmental contamination", "eia": "environmental impact assessment", "eis": "environmental impact study", "nema": "national environment management authority", "osha": "occupational safety health act", "kebs": "kenya bureau of standards", "kra": "kenya revenue authority", "epa": "environment protection authority", "wrc": "water resources commission authority", "neema": "national environment management authority", "esia": "environmental social impact assessment", "ehs": "environmental health safety", "hse": "health safety environment", "ppe": "personal protective equipment", "shif": "social health insurance fund", "nhif": "national hospital insurance fund", "nssf": "national social security fund", } CONFIDENCE_STRONG = 0.45 CONFIDENCE_PARTIAL = 0.25 K_RETRIEVE = 12 MAX_HISTORY_TURNS = 6 # Groq's OWN direct API — a completely separate free quota from HuggingFace's # shared Inference Providers billing. Free, no credit card, ~14,400 req/day. # Get a key at https://console.groq.com/keys MODEL_CANDIDATES = [ "llama-3.3-70b-versatile", # strongest general-purpose model on Groq "qwen/qwen3-32b", # strong multilingual fallback "deepseek-r1-distill-llama-70b", # strong reasoning fallback "gemma2-9b-it", # fast, reliable final fallback ] GROQ_BASE_URL = "https://api.groq.com/openai/v1" # Marker the LLM uses to separate its main answer from suggested follow-ups. # Frontend never sees this — it's stripped out and the questions are returned # as a separate "suggestions" array for rendering as clickable pills. SUGGESTION_MARKER = "[[SUGGESTIONS]]" # ── Vague follow-up detection ───────────────────────────────────────────────── VAGUE_PATTERNS = [ r"^explain\s*(further|more|again|that|it|this)?\.?$", r"^tell\s+me\s+more\.?$", r"^(go\s+on|continue|what\s+else|more\s+details?)\.?$", r"^(what\s+about\s+(that|this|it)|can\s+you\s+elaborate)\.?$", r"^(give\s+me\s+more|expand\s+on\s+(that|this|it))\.?$", r"^(ok|okay|i\s+see|yes|sure|got\s+it|noted)[,.]?\s*$", r"^(what\s+(are\s+)?(the\s+)?(penalties|fines|consequences)\s*(for\s+(that|this))?)\.?$", r"^(how\s+(does|do)\s+(that|this|it)\s+work)\.?$", r"^(what\s+does\s+(that|this)\s+mean)\.?$", r"^(any\s+examples?)\.?$", r"^(summarize|summarise)\s*(that|this|it)?\.?$", ] DOMAIN_WORDS = { "act", "section", "cap", "osha", "nema", "eia", "emca", "law", "regulation", "penalty", "fine", "license", "permit", "waste", "health", "safety", "environment", "chemical", "noise", "fire", "pollution", "factory", "worker", "employer", "committee", } def _is_vague(text: str) -> bool: t = text.strip().lower() if len(t.split()) <= 4 and not any(w in DOMAIN_WORDS for w in t.split()): return True return any(re.match(p, t) for p in VAGUE_PATTERNS) class RAGEngine: def __init__(self): base_dir = os.path.dirname(os.path.abspath(__file__)) index_path = os.path.join(base_dir, "indexfinal.faiss") chunks_path = os.path.join(base_dir, "chunksfinal.npy") if not os.path.exists(index_path) or not os.path.exists(chunks_path): raise FileNotFoundError(f"FAISS index or chunks missing at {base_dir}") self.index = faiss.read_index(index_path) self.chunks = np.load(chunks_path, allow_pickle=True) # ── Groq's direct API — separate free quota from HF's shared billing ── groq_key = os.environ.get("GROQ_API_KEY", "").strip() if groq_key: print(f"✅ GROQ_API_KEY found (len={len(groq_key)})") else: print("⚠️ GROQ_API_KEY not set — get a free key at https://console.groq.com/keys") self.client = OpenAI( base_url=GROQ_BASE_URL, api_key=groq_key or "placeholder", ) self.model_name = MODEL_CANDIDATES[0] # display default; actual call tries all candidates print(f"✅ RAGEngine ready — provider=groq (direct), candidates={MODEL_CANDIDATES}") # ── Query expansion ─────────────────────────────────────────────────────── def expand_query(self, text: str) -> str: lower = text.lower().strip() expansions = [exp for term, exp in QUERY_EXPANSION.items() if re.search(r'\b' + re.escape(term) + r'\b', lower)] return (text + " " + " ".join(expansions)).strip() if expansions else text # ── Enrich vague follow-ups with topic context before FAISS search ──────── def build_retrieval_query(self, query_text: str, history: List[Dict]) -> str: if not history or not _is_vague(query_text): return query_text last_user_q = "" last_ai_text = "" for entry in reversed(history): role = entry.get("role", "") text = entry.get("text", "").strip() if not text: continue if role == "user" and not last_user_q and not _is_vague(text): last_user_q = text[:300] if role == "ai" and not last_ai_text: clean = re.sub(r'[#*_`>|]', '', text) last_ai_text = re.sub(r'\s+', ' ', clean).strip()[:200] if last_user_q and last_ai_text: break parts = [p for p in [last_user_q, last_ai_text, query_text] if p] return " ".join(parts) # ── FAISS search ────────────────────────────────────────────────────────── def _search(self, query_vector: np.ndarray): qv = np.array(query_vector).astype("float32").reshape(1, -1) distances, indices = self.index.search(qv, k=K_RETRIEVE) mask = indices[0] != -1 valid_idx = indices[0][mask] valid_dst = distances[0][mask] return valid_idx, valid_dst def _confidence_tier(self, distances: np.ndarray) -> str: if len(distances) == 0: return "none" best = float(distances[0]) if best >= CONFIDENCE_STRONG: return "strong" if best >= CONFIDENCE_PARTIAL: return "partial" return "none" # ── Build OpenAI-style history (role: user/assistant) ────────────────────── def _build_history_messages(self, history: List[Dict]) -> list: max_msgs = MAX_HISTORY_TURNS * 2 trimmed = history[-max_msgs:] if len(history) > max_msgs else history messages = [] for entry in trimmed: role = entry.get("role", "user") text = entry.get("text", "").strip() if not text: continue messages.append({ "role": "assistant" if role == "ai" else "user", "content": text, }) return messages # ── Extract suggested follow-up questions from raw LLM output ───────────── def _extract_suggestions(self, raw_text: str): """ The LLM is instructed to end its response with a marker followed by a JSON array of 3 short follow-up questions, e.g.: ...main answer... [[SUGGESTIONS]]["Question 1?", "Question 2?", "Question 3?"] Returns (main_text_without_marker, list_of_suggestions). Falls back to an empty list if parsing fails for any reason — never lets a malformed suggestion block break the main answer. """ if SUGGESTION_MARKER not in raw_text: return raw_text, [] main, _, tail = raw_text.partition(SUGGESTION_MARKER) suggestions = [] try: start = tail.index('[') end = tail.rindex(']') + 1 parsed = json.loads(tail[start:end]) if isinstance(parsed, list): suggestions = [str(s).strip() for s in parsed if str(s).strip()][:4] except Exception: suggestions = [] return main.strip(), suggestions # ── Output cleaning ─────────────────────────────────────────────────────── def clean_output(self, text: str) -> str: if not text: return "⚠️ The AI returned an empty response. Please try again." text = str(text) text = re.sub(r'.*?', '', text, flags=re.DOTALL) text = re.sub(r'<\|think\|>.*?<\|/think\|>', '', text, flags=re.DOTALL) text = text.replace("§", "Section ") text = re.sub(r'Section\s+Section', 'Section', text) text = re.sub(r'(?i)Cap\s*\(\s*1\s*\)', 'Cap 242', text) text = re.sub(r'(?i)public health act\s*\(\s*1\s*\)', 'Public Health Act, Cap 242', text) text = re.sub(r'(?i)public health act\s*\(cap\s*1\)', 'Public Health Act, Cap 242', text) text = re.sub(r'(?i)\(cap\s*1\)', '(Cap 242)', text) text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'[ \t]{2,}', ' ', text) return text.strip() # ── System prompt ───────────────────────────────────────────────────────── def _system_prompt(self, tier: str, has_history: bool) -> str: continuity = ( "CONVERSATION CONTINUITY: You are mid-conversation. " "The chat history already provided shows what was discussed. " "Use it for follow-up questions, pronouns and back-references. " "NEVER treat the current message as a new topic if history exists. " "Do NOT ask the user to repeat anything already in the history.\n\n" ) if has_history else "" base = ( "You are ENVH.AI Senior Legal Architect — a world-class consultant in " "Kenyan Environmental Health and Safety (EHS) law. " "Be professional, authoritative, warm, and genuinely helpful.\n\n" + continuity + "FACTORIES ACT NOTE: The Factories Act (Cap 514) is repealed by OSHA 2007. " "L.N. 31/2004 Safety & Health Committees Rules remain active under OSHA.\n\n" "CITATION RULES — NEVER BREAK:\n" "1. Use ONLY information from the CONTEXT provided below.\n" "2. Cite every legal requirement with its Act/Cap/Section.\n" "3. Never invent fines or section numbers.\n" "4. Write 'Section' not '§'.\n" "5. 'public health act (1)' or 'Cap(1)' = 'Public Health Act, Cap 242'.\n\n" ) if tier == "strong": return base + ( "TASK: Give a complete authoritative answer using ONLY the context.\n\n" "FORMAT:\n" "### [Issue Title]\n" "**Bottom Line:** [one clear sentence]\n\n" "**Statutory Requirements:**\n" "- [Requirement] ([Act / Cap / Section])\n\n" "**Risk & Penalty Matrix:**\n" "| Requirement | Provision | Penalty/Risk |\n" "| :--- | :--- | :--- |\n\n" "**Action Plan:**\n" "1. [Step 1]\n" "2. [Step 2]\n" "3. [Step 3]\n\n" + self._suggestion_instruction() ) else: return base + ( "TASK: Answer what you CAN from context. Clearly flag any gaps.\n\n" "FORMAT:\n" "### [Issue Title]\n" "**What We Know:** [answer with citations]\n\n" "**Statutory Requirements:**\n" "- [Requirement] ([citation])\n\n" "**Knowledge Gaps:**\n" "- [What's missing from the database]\n\n" "> 📩 For complete guidance contact **support@envhai.co.ke**\n\n" + self._suggestion_instruction() ) @staticmethod def _suggestion_instruction() -> str: return ( "AFTER your main answer, on a new line, output exactly this marker " f"followed immediately by a JSON array of 3 short, specific follow-up " f"questions the user would naturally ask next (based on what you just " f"answered) — no other text after it:\n\n" f"{SUGGESTION_MARKER}[\"...\", \"...\", \"...\"]\n\n" "Each question must be under 12 words, directly related to the topic " "just discussed, and phrased the way a user would type it (not a " "command). Valid JSON array of strings only — no markdown, no trailing text." ) # ── Main RAG call ───────────────────────────────────────────────────────── def search_and_ask( self, query_text: str, query_vector: np.ndarray, history: List[Dict] = None, ) -> dict: """Always returns {"answer": str, "suggestions": list[str]}.""" if history is None: history = [] valid_idx, valid_dst = self._search(query_vector) tier = self._confidence_tier(valid_dst) if tier == "none" or len(valid_idx) == 0: if history and _is_vague(query_text): return { "answer": ( "I'd like to continue our discussion, but I need a little more " "detail to find the right legal provisions.\n\n" "Try rephrasing — for example:\n" "- *\"What are the penalties under the Public Health Act Cap 242?\"*\n" "- *\"Explain the Cabinet Secretary's powers under Section 17\"*\n\n" "That helps me pull the exact statutory language. 📋" ), "suggestions": [], } return {"answer": self._helpful_redirect(query_text), "suggestions": []} min_score = CONFIDENCE_PARTIAL * 0.8 good_idx = [valid_idx[i] for i in range(len(valid_idx)) if valid_dst[i] >= min_score] or list(valid_idx[:5]) context = "\n---\n".join(str(self.chunks[i]) for i in good_idx) context = re.sub(r'(?i)public health act\s*\(\s*1\s*\)', 'Public Health Act, Cap 242', context) if history and _is_vague(query_text): last_q = next( (e["text"] for e in reversed(history) if e.get("role") == "user" and not _is_vague(e.get("text", ""))), "" ) user_msg = ( f"CONTEXT:\n{context}\n\n" f"ORIGINAL TOPIC: {last_q}\n" f"FOLLOW-UP: {query_text}\n\n" f"Continue the discussion on the original topic using the context above." ) else: user_msg = f"CONTEXT:\n{context}\n\nQUERY: {query_text}" system_prompt = self._system_prompt(tier, bool(history)) history_messages = self._build_history_messages(history) messages = ( [{"role": "system", "content": system_prompt}] + history_messages + [{"role": "user", "content": user_msg}] ) # ── Try each candidate model in order. Auto-falls through to the # next model if one isn't currently available via any provider, # is gated, or is rate-limited after its own retries. last_error = None for model_name in MODEL_CANDIDATES: for attempt in range(2): try: response = self.client.chat.completions.create( model=model_name, messages=messages, temperature=0, max_tokens=2000, ) self.model_name = model_name # remember which one worked print(f"✅ Answered using {model_name}") raw = response.choices[0].message.content main_text, suggestions = self._extract_suggestions(raw) return { "answer": self.clean_output(main_text), "suggestions": suggestions, } except Exception as e: err = str(e).lower() last_error = e print(f"❌ {model_name} attempt {attempt+1} error: {e}") # Token-level auth problems are the same for every model — # no point trying the rest of the candidates. if ("401" in err or "credential" in err or "api key" in err or ("permission" in err and "gated" not in err)): return { "answer": ( "⚠️ **Permission Error:** Your GROQ_API_KEY may be missing or invalid.\n\n" "**Fix:**\n" "1. Go to https://console.groq.com/keys\n" "2. Create a free API key (no credit card needed)\n" "3. In your HF Space → Settings → Secrets, add/update **GROQ_API_KEY**\n" "4. Factory reboot the Space" ), "suggestions": [], } # Rate-limited — back off and retry the SAME model once if "rate" in err or "429" in err: if attempt < 1: time.sleep(2 ** (attempt + 1)) continue break # exhausted retries on this model — try next # Model unavailable / gated / not supported by any # provider right now — move on to the next candidate. break # Every candidate failed return { "answer": ( f"⚠️ **All models temporarily unavailable.**\n\n" f"Tried: {', '.join(MODEL_CANDIDATES)}\n\n" f"Last error: `{str(last_error)}`\n\n" f"Please try again shortly, or contact support@envhai.co.ke." ), "suggestions": [], } # ── Helpful redirect when no chunks match ───────────────────────────────── def _helpful_redirect(self, query_text: str) -> str: q = query_text.lower() if any(w in q for w in ["health", "hospital", "clinic", "afya"]): hint = ( "- *'Licensing requirements for a clinic under the Public Health Act?'*\n" "- *'OSHA requirements for healthcare workers?'*" ) elif any(w in q for w in ["eia", "nema", "environmental impact", "esia"]): hint = ( "- *'What projects require an EIA under EMCA?'*\n" "- *'How to register as an EIA Lead Expert with NEMA?'*" ) elif any(w in q for w in ["law", "act", "regulation", "statute"]): hint = ( "- *'OSHA 2007 requirements for workplace safety committees?'*\n" "- *'Penalties under EMCA for pollution?'*" ) else: hint = ( "- *'Occupational health requirements for a factory?'*\n" "- *'Food business permits under Cap 254?'*" ) return ( f"### We're Working on This For You\n\n" f"No direct match found for **\"{query_text}\"**.\n\n" f"Try rephrasing with a specific Act or sector:\n{hint}\n\n" f"> 📩 **support@envhai.co.ke** — responds within 1 business day.\n\n" f"*Tip: The more specific your question, the better the answer.*" )