Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import faiss | |
| import numpy as np | |
| from openai import OpenAI | |
| from typing import List, Dict | |
| # ββ Swahili / acronym expansion ββββββββββββββββββββββββββββββββββββββββββββββ | |
| QUERY_EXPANSION = { | |
| "bora afya": "good health primary health care", | |
| "afya": "health", | |
| "dawa": "medicine drugs pharmacy", | |
| "hospitali": "hospital health facility", | |
| "maji": "water sanitation", | |
| "chakula": "food safety nutrition", | |
| "mazingira": "environment environmental", | |
| "uchafuzi": "pollution environmental contamination", | |
| "eia": "environmental impact assessment", | |
| "eis": "environmental impact study", | |
| "nema": "national environment management authority", | |
| "osha": "occupational safety health act", | |
| "kebs": "kenya bureau of standards", | |
| "kra": "kenya revenue authority", | |
| "epa": "environment protection authority", | |
| "wrc": "water resources commission authority", | |
| "neema": "national environment management authority", | |
| "esia": "environmental social impact assessment", | |
| "ehs": "environmental health safety", | |
| "hse": "health safety environment", | |
| "ppe": "personal protective equipment", | |
| "shif": "social health insurance fund", | |
| "nhif": "national hospital insurance fund", | |
| "nssf": "national social security fund", | |
| } | |
| CONFIDENCE_STRONG = 0.45 | |
| CONFIDENCE_PARTIAL = 0.25 | |
| K_RETRIEVE = 12 | |
| MAX_HISTORY_TURNS = 6 | |
| # Groq's OWN direct API β a completely separate free quota from HuggingFace's | |
| # shared Inference Providers billing. Free, no credit card, ~14,400 req/day. | |
| # Get a key at https://console.groq.com/keys | |
| MODEL_CANDIDATES = [ | |
| "llama-3.3-70b-versatile", # strongest general-purpose model on Groq | |
| "qwen/qwen3-32b", # strong multilingual fallback | |
| "deepseek-r1-distill-llama-70b", # strong reasoning fallback | |
| "gemma2-9b-it", # fast, reliable final fallback | |
| ] | |
| GROQ_BASE_URL = "https://api.groq.com/openai/v1" | |
| # Marker the LLM uses to separate its main answer from suggested follow-ups. | |
| # Frontend never sees this β it's stripped out and the questions are returned | |
| # as a separate "suggestions" array for rendering as clickable pills. | |
| SUGGESTION_MARKER = "[[SUGGESTIONS]]" | |
| # ββ Vague follow-up detection βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| VAGUE_PATTERNS = [ | |
| r"^explain\s*(further|more|again|that|it|this)?\.?$", | |
| r"^tell\s+me\s+more\.?$", | |
| r"^(go\s+on|continue|what\s+else|more\s+details?)\.?$", | |
| r"^(what\s+about\s+(that|this|it)|can\s+you\s+elaborate)\.?$", | |
| r"^(give\s+me\s+more|expand\s+on\s+(that|this|it))\.?$", | |
| r"^(ok|okay|i\s+see|yes|sure|got\s+it|noted)[,.]?\s*$", | |
| r"^(what\s+(are\s+)?(the\s+)?(penalties|fines|consequences)\s*(for\s+(that|this))?)\.?$", | |
| r"^(how\s+(does|do)\s+(that|this|it)\s+work)\.?$", | |
| r"^(what\s+does\s+(that|this)\s+mean)\.?$", | |
| r"^(any\s+examples?)\.?$", | |
| r"^(summarize|summarise)\s*(that|this|it)?\.?$", | |
| ] | |
| DOMAIN_WORDS = { | |
| "act", "section", "cap", "osha", "nema", "eia", "emca", "law", | |
| "regulation", "penalty", "fine", "license", "permit", "waste", | |
| "health", "safety", "environment", "chemical", "noise", "fire", | |
| "pollution", "factory", "worker", "employer", "committee", | |
| } | |
| def _is_vague(text: str) -> bool: | |
| t = text.strip().lower() | |
| if len(t.split()) <= 4 and not any(w in DOMAIN_WORDS for w in t.split()): | |
| return True | |
| return any(re.match(p, t) for p in VAGUE_PATTERNS) | |
| class RAGEngine: | |
| def __init__(self): | |
| base_dir = os.path.dirname(os.path.abspath(__file__)) | |
| index_path = os.path.join(base_dir, "indexfinal.faiss") | |
| chunks_path = os.path.join(base_dir, "chunksfinal.npy") | |
| if not os.path.exists(index_path) or not os.path.exists(chunks_path): | |
| raise FileNotFoundError(f"FAISS index or chunks missing at {base_dir}") | |
| self.index = faiss.read_index(index_path) | |
| self.chunks = np.load(chunks_path, allow_pickle=True) | |
| # ββ Groq's direct API β separate free quota from HF's shared billing ββ | |
| groq_key = os.environ.get("GROQ_API_KEY", "").strip() | |
| if groq_key: | |
| print(f"β GROQ_API_KEY found (len={len(groq_key)})") | |
| else: | |
| print("β οΈ GROQ_API_KEY not set β get a free key at https://console.groq.com/keys") | |
| self.client = OpenAI( | |
| base_url=GROQ_BASE_URL, | |
| api_key=groq_key or "placeholder", | |
| ) | |
| self.model_name = MODEL_CANDIDATES[0] # display default; actual call tries all candidates | |
| print(f"β RAGEngine ready β provider=groq (direct), candidates={MODEL_CANDIDATES}") | |
| # ββ Query expansion βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def expand_query(self, text: str) -> str: | |
| lower = text.lower().strip() | |
| expansions = [exp for term, exp in QUERY_EXPANSION.items() | |
| if re.search(r'\b' + re.escape(term) + r'\b', lower)] | |
| return (text + " " + " ".join(expansions)).strip() if expansions else text | |
| # ββ Enrich vague follow-ups with topic context before FAISS search ββββββββ | |
| def build_retrieval_query(self, query_text: str, history: List[Dict]) -> str: | |
| if not history or not _is_vague(query_text): | |
| return query_text | |
| last_user_q = "" | |
| last_ai_text = "" | |
| for entry in reversed(history): | |
| role = entry.get("role", "") | |
| text = entry.get("text", "").strip() | |
| if not text: | |
| continue | |
| if role == "user" and not last_user_q and not _is_vague(text): | |
| last_user_q = text[:300] | |
| if role == "ai" and not last_ai_text: | |
| clean = re.sub(r'[#*_`>|]', '', text) | |
| last_ai_text = re.sub(r'\s+', ' ', clean).strip()[:200] | |
| if last_user_q and last_ai_text: | |
| break | |
| parts = [p for p in [last_user_q, last_ai_text, query_text] if p] | |
| return " ".join(parts) | |
| # ββ FAISS search ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _search(self, query_vector: np.ndarray): | |
| qv = np.array(query_vector).astype("float32").reshape(1, -1) | |
| distances, indices = self.index.search(qv, k=K_RETRIEVE) | |
| mask = indices[0] != -1 | |
| valid_idx = indices[0][mask] | |
| valid_dst = distances[0][mask] | |
| return valid_idx, valid_dst | |
| def _confidence_tier(self, distances: np.ndarray) -> str: | |
| if len(distances) == 0: | |
| return "none" | |
| best = float(distances[0]) | |
| if best >= CONFIDENCE_STRONG: return "strong" | |
| if best >= CONFIDENCE_PARTIAL: return "partial" | |
| return "none" | |
| # ββ Build OpenAI-style history (role: user/assistant) ββββββββββββββββββββββ | |
| def _build_history_messages(self, history: List[Dict]) -> list: | |
| max_msgs = MAX_HISTORY_TURNS * 2 | |
| trimmed = history[-max_msgs:] if len(history) > max_msgs else history | |
| messages = [] | |
| for entry in trimmed: | |
| role = entry.get("role", "user") | |
| text = entry.get("text", "").strip() | |
| if not text: | |
| continue | |
| messages.append({ | |
| "role": "assistant" if role == "ai" else "user", | |
| "content": text, | |
| }) | |
| return messages | |
| # ββ Extract suggested follow-up questions from raw LLM output βββββββββββββ | |
| def _extract_suggestions(self, raw_text: str): | |
| """ | |
| The LLM is instructed to end its response with a marker followed by | |
| a JSON array of 3 short follow-up questions, e.g.: | |
| ...main answer... | |
| [[SUGGESTIONS]]["Question 1?", "Question 2?", "Question 3?"] | |
| Returns (main_text_without_marker, list_of_suggestions). | |
| Falls back to an empty list if parsing fails for any reason β | |
| never lets a malformed suggestion block break the main answer. | |
| """ | |
| if SUGGESTION_MARKER not in raw_text: | |
| return raw_text, [] | |
| main, _, tail = raw_text.partition(SUGGESTION_MARKER) | |
| suggestions = [] | |
| try: | |
| start = tail.index('[') | |
| end = tail.rindex(']') + 1 | |
| parsed = json.loads(tail[start:end]) | |
| if isinstance(parsed, list): | |
| suggestions = [str(s).strip() for s in parsed if str(s).strip()][:4] | |
| except Exception: | |
| suggestions = [] | |
| return main.strip(), suggestions | |
| # ββ Output cleaning βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def clean_output(self, text: str) -> str: | |
| if not text: | |
| return "β οΈ The AI returned an empty response. Please try again." | |
| text = str(text) | |
| text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL) | |
| text = re.sub(r'<\|think\|>.*?<\|/think\|>', '', text, flags=re.DOTALL) | |
| text = text.replace("Β§", "Section ") | |
| text = re.sub(r'Section\s+Section', 'Section', text) | |
| text = re.sub(r'(?i)Cap\s*\(\s*1\s*\)', 'Cap 242', text) | |
| text = re.sub(r'(?i)public health act\s*\(\s*1\s*\)', 'Public Health Act, Cap 242', text) | |
| text = re.sub(r'(?i)public health act\s*\(cap\s*1\)', 'Public Health Act, Cap 242', text) | |
| text = re.sub(r'(?i)\(cap\s*1\)', '(Cap 242)', text) | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| text = re.sub(r'[ \t]{2,}', ' ', text) | |
| return text.strip() | |
| # ββ System prompt βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _system_prompt(self, tier: str, has_history: bool) -> str: | |
| continuity = ( | |
| "CONVERSATION CONTINUITY: You are mid-conversation. " | |
| "The chat history already provided shows what was discussed. " | |
| "Use it for follow-up questions, pronouns and back-references. " | |
| "NEVER treat the current message as a new topic if history exists. " | |
| "Do NOT ask the user to repeat anything already in the history.\n\n" | |
| ) if has_history else "" | |
| base = ( | |
| "You are ENVH.AI Senior Legal Architect β a world-class consultant in " | |
| "Kenyan Environmental Health and Safety (EHS) law. " | |
| "Be professional, authoritative, warm, and genuinely helpful.\n\n" | |
| + continuity + | |
| "FACTORIES ACT NOTE: The Factories Act (Cap 514) is repealed by OSHA 2007. " | |
| "L.N. 31/2004 Safety & Health Committees Rules remain active under OSHA.\n\n" | |
| "CITATION RULES β NEVER BREAK:\n" | |
| "1. Use ONLY information from the CONTEXT provided below.\n" | |
| "2. Cite every legal requirement with its Act/Cap/Section.\n" | |
| "3. Never invent fines or section numbers.\n" | |
| "4. Write 'Section' not 'Β§'.\n" | |
| "5. 'public health act (1)' or 'Cap(1)' = 'Public Health Act, Cap 242'.\n\n" | |
| ) | |
| if tier == "strong": | |
| return base + ( | |
| "TASK: Give a complete authoritative answer using ONLY the context.\n\n" | |
| "FORMAT:\n" | |
| "### [Issue Title]\n" | |
| "**Bottom Line:** [one clear sentence]\n\n" | |
| "**Statutory Requirements:**\n" | |
| "- [Requirement] ([Act / Cap / Section])\n\n" | |
| "**Risk & Penalty Matrix:**\n" | |
| "| Requirement | Provision | Penalty/Risk |\n" | |
| "| :--- | :--- | :--- |\n\n" | |
| "**Action Plan:**\n" | |
| "1. [Step 1]\n" | |
| "2. [Step 2]\n" | |
| "3. [Step 3]\n\n" | |
| + self._suggestion_instruction() | |
| ) | |
| else: | |
| return base + ( | |
| "TASK: Answer what you CAN from context. Clearly flag any gaps.\n\n" | |
| "FORMAT:\n" | |
| "### [Issue Title]\n" | |
| "**What We Know:** [answer with citations]\n\n" | |
| "**Statutory Requirements:**\n" | |
| "- [Requirement] ([citation])\n\n" | |
| "**Knowledge Gaps:**\n" | |
| "- [What's missing from the database]\n\n" | |
| "> π© For complete guidance contact **support@envhai.co.ke**\n\n" | |
| + self._suggestion_instruction() | |
| ) | |
| def _suggestion_instruction() -> str: | |
| return ( | |
| "AFTER your main answer, on a new line, output exactly this marker " | |
| f"followed immediately by a JSON array of 3 short, specific follow-up " | |
| f"questions the user would naturally ask next (based on what you just " | |
| f"answered) β no other text after it:\n\n" | |
| f"{SUGGESTION_MARKER}[\"...\", \"...\", \"...\"]\n\n" | |
| "Each question must be under 12 words, directly related to the topic " | |
| "just discussed, and phrased the way a user would type it (not a " | |
| "command). Valid JSON array of strings only β no markdown, no trailing text." | |
| ) | |
| # ββ Main RAG call βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def search_and_ask( | |
| self, | |
| query_text: str, | |
| query_vector: np.ndarray, | |
| history: List[Dict] = None, | |
| ) -> dict: | |
| """Always returns {"answer": str, "suggestions": list[str]}.""" | |
| if history is None: | |
| history = [] | |
| valid_idx, valid_dst = self._search(query_vector) | |
| tier = self._confidence_tier(valid_dst) | |
| if tier == "none" or len(valid_idx) == 0: | |
| if history and _is_vague(query_text): | |
| return { | |
| "answer": ( | |
| "I'd like to continue our discussion, but I need a little more " | |
| "detail to find the right legal provisions.\n\n" | |
| "Try rephrasing β for example:\n" | |
| "- *\"What are the penalties under the Public Health Act Cap 242?\"*\n" | |
| "- *\"Explain the Cabinet Secretary's powers under Section 17\"*\n\n" | |
| "That helps me pull the exact statutory language. π" | |
| ), | |
| "suggestions": [], | |
| } | |
| return {"answer": self._helpful_redirect(query_text), "suggestions": []} | |
| min_score = CONFIDENCE_PARTIAL * 0.8 | |
| good_idx = [valid_idx[i] for i in range(len(valid_idx)) | |
| if valid_dst[i] >= min_score] or list(valid_idx[:5]) | |
| context = "\n---\n".join(str(self.chunks[i]) for i in good_idx) | |
| context = re.sub(r'(?i)public health act\s*\(\s*1\s*\)', | |
| 'Public Health Act, Cap 242', context) | |
| if history and _is_vague(query_text): | |
| last_q = next( | |
| (e["text"] for e in reversed(history) | |
| if e.get("role") == "user" and not _is_vague(e.get("text", ""))), | |
| "" | |
| ) | |
| user_msg = ( | |
| f"CONTEXT:\n{context}\n\n" | |
| f"ORIGINAL TOPIC: {last_q}\n" | |
| f"FOLLOW-UP: {query_text}\n\n" | |
| f"Continue the discussion on the original topic using the context above." | |
| ) | |
| else: | |
| user_msg = f"CONTEXT:\n{context}\n\nQUERY: {query_text}" | |
| system_prompt = self._system_prompt(tier, bool(history)) | |
| history_messages = self._build_history_messages(history) | |
| messages = ( | |
| [{"role": "system", "content": system_prompt}] | |
| + history_messages | |
| + [{"role": "user", "content": user_msg}] | |
| ) | |
| # ββ Try each candidate model in order. Auto-falls through to the | |
| # next model if one isn't currently available via any provider, | |
| # is gated, or is rate-limited after its own retries. | |
| last_error = None | |
| for model_name in MODEL_CANDIDATES: | |
| for attempt in range(2): | |
| try: | |
| response = self.client.chat.completions.create( | |
| model=model_name, | |
| messages=messages, | |
| temperature=0, | |
| max_tokens=2000, | |
| ) | |
| self.model_name = model_name # remember which one worked | |
| print(f"β Answered using {model_name}") | |
| raw = response.choices[0].message.content | |
| main_text, suggestions = self._extract_suggestions(raw) | |
| return { | |
| "answer": self.clean_output(main_text), | |
| "suggestions": suggestions, | |
| } | |
| except Exception as e: | |
| err = str(e).lower() | |
| last_error = e | |
| print(f"β {model_name} attempt {attempt+1} error: {e}") | |
| # Token-level auth problems are the same for every model β | |
| # no point trying the rest of the candidates. | |
| if ("401" in err or "credential" in err or "api key" in err or | |
| ("permission" in err and "gated" not in err)): | |
| return { | |
| "answer": ( | |
| "β οΈ **Permission Error:** Your GROQ_API_KEY may be missing or invalid.\n\n" | |
| "**Fix:**\n" | |
| "1. Go to https://console.groq.com/keys\n" | |
| "2. Create a free API key (no credit card needed)\n" | |
| "3. In your HF Space β Settings β Secrets, add/update **GROQ_API_KEY**\n" | |
| "4. Factory reboot the Space" | |
| ), | |
| "suggestions": [], | |
| } | |
| # Rate-limited β back off and retry the SAME model once | |
| if "rate" in err or "429" in err: | |
| if attempt < 1: | |
| time.sleep(2 ** (attempt + 1)) | |
| continue | |
| break # exhausted retries on this model β try next | |
| # Model unavailable / gated / not supported by any | |
| # provider right now β move on to the next candidate. | |
| break | |
| # Every candidate failed | |
| return { | |
| "answer": ( | |
| f"β οΈ **All models temporarily unavailable.**\n\n" | |
| f"Tried: {', '.join(MODEL_CANDIDATES)}\n\n" | |
| f"Last error: `{str(last_error)}`\n\n" | |
| f"Please try again shortly, or contact support@envhai.co.ke." | |
| ), | |
| "suggestions": [], | |
| } | |
| # ββ Helpful redirect when no chunks match βββββββββββββββββββββββββββββββββ | |
| def _helpful_redirect(self, query_text: str) -> str: | |
| q = query_text.lower() | |
| if any(w in q for w in ["health", "hospital", "clinic", "afya"]): | |
| hint = ( | |
| "- *'Licensing requirements for a clinic under the Public Health Act?'*\n" | |
| "- *'OSHA requirements for healthcare workers?'*" | |
| ) | |
| elif any(w in q for w in ["eia", "nema", "environmental impact", "esia"]): | |
| hint = ( | |
| "- *'What projects require an EIA under EMCA?'*\n" | |
| "- *'How to register as an EIA Lead Expert with NEMA?'*" | |
| ) | |
| elif any(w in q for w in ["law", "act", "regulation", "statute"]): | |
| hint = ( | |
| "- *'OSHA 2007 requirements for workplace safety committees?'*\n" | |
| "- *'Penalties under EMCA for pollution?'*" | |
| ) | |
| else: | |
| hint = ( | |
| "- *'Occupational health requirements for a factory?'*\n" | |
| "- *'Food business permits under Cap 254?'*" | |
| ) | |
| return ( | |
| f"### We're Working on This For You\n\n" | |
| f"No direct match found for **\"{query_text}\"**.\n\n" | |
| f"Try rephrasing with a specific Act or sector:\n{hint}\n\n" | |
| f"> π© **support@envhai.co.ke** β responds within 1 business day.\n\n" | |
| f"*Tip: The more specific your question, the better the answer.*" | |
| ) |