Spaces:
Sleeping
Sleeping
| import hashlib | |
| import html | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import tempfile | |
| import threading | |
| import time | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import faiss | |
| import gradio as gr | |
| from sentence_transformers import SentenceTransformer | |
| from extracted_phase2_core import AgenticSelfRAG, Chunk, K_PASSAGES | |
| APP_NAME = "SourceTruth" | |
| APP_TAGLINE = "Ask grounded questions over the preloaded Phase 2 project corpus and inspect cited evidence." | |
| APP_ROOT = Path(__file__).resolve().parent | |
| UPLOAD_ROOT = APP_ROOT / "testing_uploads" | |
| LOG_ROOT = APP_ROOT / "testing_logs" | |
| EVENT_LOG_PATH = LOG_ROOT / "events.jsonl" | |
| INTERACTION_LOG_PATH = LOG_ROOT / "interactions.jsonl" | |
| CORPUS_CANDIDATES = [ | |
| APP_ROOT / "phase2_corpus", | |
| APP_ROOT / "phase 2 corpus", | |
| APP_ROOT, | |
| ] | |
| LOCAL_CORPUS_DIR = os.getenv("LOCAL_CORPUS_DIR", "").strip() | |
| if LOCAL_CORPUS_DIR: | |
| CORPUS_CANDIDATES.append(Path(LOCAL_CORPUS_DIR).expanduser()) | |
| MAX_FILE_SIZE_MB = int(os.getenv("MAX_FILE_SIZE_MB", "20")) | |
| MAX_PAGES = int(os.getenv("MAX_PAGES", "75")) | |
| MAX_CHUNKS = int(os.getenv("MAX_CHUNKS", "250")) | |
| CHUNK_WORDS = int(os.getenv("CHUNK_WORDS", "300")) | |
| CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "50")) | |
| SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", str(30 * 60))) | |
| MAX_QUESTIONS_PER_MINUTE = int(os.getenv("MAX_QUESTIONS_PER_MINUTE", "8")) | |
| QUEUE_CONCURRENCY = int(os.getenv("QUEUE_CONCURRENCY", "2")) | |
| QUEUE_MAX_SIZE = int(os.getenv("QUEUE_MAX_SIZE", "20")) | |
| LOAD_IN_4BIT = os.getenv("LOAD_IN_4BIT", "0") == "1" | |
| MAX_SUMMARY_SENTENCES = int(os.getenv("MAX_SUMMARY_SENTENCES", "3")) | |
| PRIVACY_NOTICE = ( | |
| "The preloaded project PDFs are processed only to answer your questions and produce citations. " | |
| "Documents are not used to train models. Interaction logs may store the question, answer, citation, " | |
| "and proxy evaluation metrics for testing analysis. Avoid using the application for confidential, " | |
| "personal, medical, or legal decisions without direct document verification." | |
| ) | |
| CSS = """ | |
| .gradio-container { | |
| background: | |
| radial-gradient(circle at top left, rgba(59,130,246,0.08), transparent 28%), | |
| radial-gradient(circle at top right, rgba(16,185,129,0.08), transparent 22%), | |
| linear-gradient(180deg, #f8fbff 0%, #f4f7fb 100%); | |
| } | |
| #ask_btn { | |
| background: linear-gradient(135deg, #0f172a 0%, #1d4ed8 100%) !important; | |
| color: white !important; | |
| border: none !important; | |
| } | |
| """ | |
| PERSON_RE = re.compile(r"\b(?:Dr\.?\s+)?[A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2}\b") | |
| CURRENCY_RE = re.compile(r"(?:₹\s*[\d,]+(?:\.\d+)?|\b(?:INR|Rs\.?)\s*[\d,]+(?:\.\d+)?)", re.I) | |
| AMOUNT_RE = re.compile(r"\b(?:paid amount|amount paid|total price|price|amount|budget|cost)\b[:\s-]*(₹|INR|Rs\.?)?\s*([\d,]+(?:\.\d+)?)", re.I) | |
| VERSION_RE = re.compile(r"\b\d+(?:\.\d+){1,3}\b") | |
| DATE_RE = re.compile( | |
| r"\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|" | |
| r"September|October|November|December)\s+\d{4}\b", | |
| flags=re.IGNORECASE, | |
| ) | |
| NUMBER_RE = re.compile(r"\b\d[\d,]*(?:\.\d+)?\b") | |
| ROLE_NOUNS = {"guide", "supervisor", "advisor", "mentor", "approver", "director", "lead", "manager"} | |
| DOC_NAME_HINTS = { | |
| "project charter": "01_Project_Charter.pdf", | |
| "validation master plan": "02_Validation_Master_Plan.pdf", | |
| "vmp": "02_Validation_Master_Plan.pdf", | |
| "user requirements specification": "03_User_Requirements_Specification.pdf", | |
| "urs": "03_User_Requirements_Specification.pdf", | |
| "functional requirements specification": "04_Functional_Requirements_Specification.pdf", | |
| "frs": "04_Functional_Requirements_Specification.pdf", | |
| "risk assessment": "05_Risk_Assessment.pdf", | |
| "configuration guide": "06_HP_ALM_Configuration_Guide.pdf", | |
| "hp alm configuration guide": "06_HP_ALM_Configuration_Guide.pdf", | |
| "iq protocol": "07_IQ_Protocol_and_Report.pdf", | |
| "iq report": "07_IQ_Protocol_and_Report.pdf", | |
| "oq protocol": "08_OQ_Protocol_and_Report.pdf", | |
| "oq report": "08_OQ_Protocol_and_Report.pdf", | |
| "data migration plan": "09_Data_Migration_Plan.pdf", | |
| "migration plan": "09_Data_Migration_Plan.pdf", | |
| "data migration summary": "10_Data_Migration_Summary_Report.pdf", | |
| "migration summary": "10_Data_Migration_Summary_Report.pdf", | |
| "pq": "11_PQ_UAT_Protocol_and_Report.pdf", | |
| "uat": "11_PQ_UAT_Protocol_and_Report.pdf", | |
| "validation summary report": "12_Validation_Summary_Report.pdf", | |
| "vsr": "12_Validation_Summary_Report.pdf", | |
| "traceability matrix": "13_Traceability_Matrix.pdf", | |
| "rtm": "13_Traceability_Matrix.pdf", | |
| "change control sop": "14_Change_Control_SOP.pdf", | |
| "regulatory reference guide": "15_Regulatory_Reference_Guide.pdf", | |
| } | |
| class PageRecord: | |
| source_file: str | |
| page_num: int | |
| text: str | |
| lines: List[str] | |
| class Citation: | |
| source_file: str | |
| page_num: int | |
| line_start: int | |
| line_end: int | |
| excerpt: str | |
| class SessionData: | |
| session_id: str | |
| temp_dir: str | |
| pdf_path: str | |
| file_name: str | |
| file_hash: str | |
| file_size_bytes: int | |
| page_records: List[PageRecord] | |
| chunks: List[Chunk] | |
| retriever: "SessionRetriever" | |
| agent: AgenticSelfRAG | |
| page_count: int | |
| extractor: str | |
| structured: Dict[str, dict] = field(default_factory=dict) | |
| created_at: float = field(default_factory=time.time) | |
| last_activity: float = field(default_factory=time.time) | |
| question_timestamps: List[float] = field(default_factory=list) | |
| class QuestionPlan: | |
| mode: str | |
| expected_type: str | |
| expanded_query: str | |
| allow_agentic_fallback: bool = True | |
| class EmptyRetriever: | |
| def __init__(self): | |
| self.chunks: List[Chunk] = [] | |
| def retrieve(self, query: str, k: int = K_PASSAGES) -> List[Chunk]: | |
| return [] | |
| class SessionRetriever: | |
| def __init__(self, chunks: List[Chunk], encoder: SentenceTransformer): | |
| self.chunks = chunks | |
| self._encoder = encoder | |
| self.index = None | |
| self._build_index() | |
| def _build_index(self): | |
| if not self.chunks: | |
| return | |
| texts = [f"{chunk.source_file} {chunk.text}" for chunk in self.chunks] | |
| embeddings = self._encoder.encode( | |
| texts, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| show_progress_bar=False, | |
| ).astype("float32") | |
| self.index = faiss.IndexFlatIP(embeddings.shape[1]) | |
| self.index.add(embeddings) | |
| def retrieve(self, query: str, k: int = K_PASSAGES) -> List[Chunk]: | |
| if self.index is None: | |
| return [] | |
| query_embedding = self._encoder.encode( | |
| [query], | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| show_progress_bar=False, | |
| ).astype("float32") | |
| _, indices = self.index.search(query_embedding, min(k, len(self.chunks))) | |
| return [self.chunks[i] for i in indices[0] if 0 <= i < len(self.chunks)] | |
| SESSIONS: Dict[str, SessionData] = {} | |
| SESSIONS_LOCK = threading.Lock() | |
| MODEL_LOCK = threading.Lock() | |
| EMBEDDER_LOCK = threading.Lock() | |
| GLOBAL_EMBEDDER: Optional[SentenceTransformer] = None | |
| GLOBAL_AGENT_TEMPLATE: Optional[AgenticSelfRAG] = None | |
| def ensure_directories(): | |
| UPLOAD_ROOT.mkdir(parents=True, exist_ok=True) | |
| LOG_ROOT.mkdir(parents=True, exist_ok=True) | |
| def now_ts() -> float: | |
| return time.time() | |
| def normalize_text(text: str) -> str: | |
| text = text.replace("\u2581", " ").replace("\xa0", " ") | |
| text = re.sub(r"\s+", " ", text) | |
| return text.strip() | |
| def pretty_doc_name(file_name: str) -> str: | |
| base = file_name.replace(".pdf", "") | |
| base = re.sub(r"^\d+_", "", base) | |
| return base.replace("_", " ") | |
| def content_terms(text: str) -> set: | |
| stop = { | |
| "a", "an", "the", "is", "are", "was", "were", "be", "been", "being", | |
| "do", "does", "did", "have", "has", "had", "how", "what", "when", | |
| "where", "why", "which", "who", "whom", "this", "that", "these", | |
| "those", "and", "or", "but", "for", "with", "into", "from", "about", | |
| "main", "use", "uses", "using", "used", "number", "version", "date", | |
| "system", "document", "pdf", "page", "line", "file", "does", "give", | |
| } | |
| tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]+", text.lower()) | |
| return {token for token in tokens if token not in stop and len(token) > 2} | |
| def clip_text(text: str, max_chars: int = 320) -> str: | |
| text = normalize_text(text) | |
| if len(text) <= max_chars: | |
| return text | |
| clipped = text[:max_chars].rsplit(" ", 1)[0].strip() | |
| return clipped + "..." | |
| def question_plan(question: str) -> QuestionPlan: | |
| q = normalize_text(question).lower() | |
| expanded = q | |
| mode = "descriptive" | |
| expected = "text" | |
| allow_agentic_fallback = True | |
| if any(cue in q for cue in ["how to", "how do", "how should", "steps", "process", "procedure", "workflow", "manage ", "handling "]): | |
| mode = "procedural" | |
| expected = "procedure" | |
| allow_agentic_fallback = True | |
| elif q.startswith("who") or "who is" in q or "who was" in q: | |
| mode = "factoid" | |
| expected = "person" | |
| elif any(cue in q for cue in ["how many", "count", "number of"]): | |
| mode = "factoid" | |
| expected = "number" | |
| elif any(cue in q for cue in ["amount", "paid amount", "price", "cost", "total", "fee"]): | |
| mode = "factoid" | |
| expected = "currency" | |
| elif any(cue in q for cue in ["date", "when", "go-live"]): | |
| mode = "factoid" | |
| expected = "date" | |
| elif "version" in q: | |
| mode = "factoid" | |
| expected = "version" | |
| elif "name of the project" in q or ("project" in q and "name" in q): | |
| mode = "factoid" | |
| expected = "project_name" | |
| elif "name of" in q: | |
| mode = "factoid" | |
| expected = "name" | |
| elif q.startswith("what is") or q.startswith("what was") or q.startswith("what were"): | |
| mode = "factoid" | |
| expected = "text" | |
| if "deviation" in q: | |
| expanded += " deviation deviations reviewed review closed closure investigated approved documented" | |
| if "guide" in q: | |
| expanded += " guide supervisor advisor mentor person name" | |
| if expected == "currency": | |
| expanded += " amount paid total INR Rs price payment" | |
| if expected == "project_name": | |
| expanded += " project name document project" | |
| if expected == "procedure": | |
| expanded += " steps process procedure shall must review close approve" | |
| return QuestionPlan( | |
| mode=mode, | |
| expected_type=expected, | |
| expanded_query=expanded, | |
| allow_agentic_fallback=allow_agentic_fallback, | |
| ) | |
| def matched_source_files(question: str) -> List[str]: | |
| q = normalize_text(question).lower() | |
| matches = [] | |
| for hint, file_name in DOC_NAME_HINTS.items(): | |
| if hint in q and file_name not in matches: | |
| matches.append(file_name) | |
| return matches | |
| def evidence_has_expected_type(plan: QuestionPlan, sentences: List[str]) -> bool: | |
| if not sentences: | |
| return False | |
| joined = " ".join(sentences) | |
| q = plan.expanded_query | |
| if plan.expected_type == "person": | |
| if PERSON_RE.search(joined): | |
| return True | |
| if any(role in q for role in ROLE_NOUNS): | |
| return False | |
| return False | |
| if plan.expected_type == "currency": | |
| return bool(CURRENCY_RE.search(joined) or AMOUNT_RE.search(joined)) | |
| if plan.expected_type == "date": | |
| return bool(DATE_RE.search(joined)) | |
| if plan.expected_type == "version": | |
| return bool(VERSION_RE.search(joined)) | |
| if plan.expected_type == "number": | |
| return bool(NUMBER_RE.search(joined)) | |
| if plan.expected_type == "procedure": | |
| return any( | |
| token in joined.lower() | |
| for token in ["must", "shall", "should", "reviewed", "closed", "approved", "documented", "investigated", "process", "procedure", "steps"] | |
| ) | |
| return True | |
| def append_jsonl(path: Path, payload: dict): | |
| ensure_directories() | |
| with path.open("a", encoding="utf-8") as handle: | |
| handle.write(json.dumps(payload, ensure_ascii=True) + "\n") | |
| def log_event(event_type: str, **payload): | |
| append_jsonl( | |
| EVENT_LOG_PATH, | |
| { | |
| "timestamp": now_ts(), | |
| "event_type": event_type, | |
| **payload, | |
| }, | |
| ) | |
| def sha256_file(file_path: str) -> str: | |
| digest = hashlib.sha256() | |
| with open(file_path, "rb") as handle: | |
| for chunk in iter(lambda: handle.read(1024 * 1024), b""): | |
| digest.update(chunk) | |
| return digest.hexdigest() | |
| def remove_session(session_id: str): | |
| session = SESSIONS.pop(session_id, None) | |
| if not session: | |
| return | |
| try: | |
| shutil.rmtree(session.temp_dir, ignore_errors=True) | |
| except Exception: | |
| pass | |
| def cleanup_expired_sessions(): | |
| cutoff = now_ts() - SESSION_TTL_SECONDS | |
| expired: List[str] = [] | |
| with SESSIONS_LOCK: | |
| for session_id, session in list(SESSIONS.items()): | |
| if session_id == "phase2-corpus": | |
| continue | |
| if session.last_activity < cutoff: | |
| expired.append(session_id) | |
| for session_id in expired: | |
| remove_session(session_id) | |
| for session_id in expired: | |
| log_event("session_expired", session_id=session_id) | |
| def get_embedder() -> SentenceTransformer: | |
| global GLOBAL_EMBEDDER | |
| with EMBEDDER_LOCK: | |
| if GLOBAL_EMBEDDER is None: | |
| GLOBAL_EMBEDDER = SentenceTransformer("all-MiniLM-L6-v2", device="cpu") | |
| return GLOBAL_EMBEDDER | |
| def get_agent_template() -> AgenticSelfRAG: | |
| global GLOBAL_AGENT_TEMPLATE | |
| with MODEL_LOCK: | |
| if GLOBAL_AGENT_TEMPLATE is None: | |
| template = AgenticSelfRAG(EmptyRetriever(), load_in_4bit=LOAD_IN_4BIT) | |
| template.load_model() | |
| GLOBAL_AGENT_TEMPLATE = template | |
| return GLOBAL_AGENT_TEMPLATE | |
| def build_session_agent(retriever: SessionRetriever) -> AgenticSelfRAG: | |
| template = get_agent_template() | |
| agent = AgenticSelfRAG(retriever, load_in_4bit=LOAD_IN_4BIT) | |
| agent.pipeline.gen_model = template.pipeline.gen_model | |
| agent.pipeline.gen_tokenizer = template.pipeline.gen_tokenizer | |
| agent.pipeline._loaded = True | |
| agent.pipeline._repair_vocab = None | |
| agent._model_loaded = True | |
| agent.verif_agent = template.verif_agent | |
| agent.qr_agent.pipeline = agent.pipeline | |
| agent.corr_agent.pipeline = agent.pipeline | |
| return agent | |
| def extract_page_records(pdf_path: str, source_file: str) -> Tuple[List[PageRecord], str]: | |
| try: | |
| import fitz | |
| doc = fitz.open(pdf_path) | |
| page_records: List[PageRecord] = [] | |
| for index, page in enumerate(doc): | |
| raw_text = page.get_text("text") or "" | |
| raw_lines = [normalize_text(line) for line in raw_text.splitlines()] | |
| lines = [line for line in raw_lines if line] | |
| text = " ".join(lines).strip() | |
| if text: | |
| page_records.append(PageRecord(source_file=source_file, page_num=index + 1, text=text, lines=lines)) | |
| doc.close() | |
| return page_records, "pymupdf" | |
| except Exception: | |
| pass | |
| try: | |
| from pypdf import PdfReader | |
| reader = PdfReader(pdf_path) | |
| page_records = [] | |
| for index, page in enumerate(reader.pages): | |
| raw_text = page.extract_text() or "" | |
| raw_lines = [normalize_text(line) for line in raw_text.splitlines()] | |
| lines = [line for line in raw_lines if line] | |
| text = " ".join(lines).strip() | |
| if text: | |
| page_records.append(PageRecord(source_file=source_file, page_num=index + 1, text=text, lines=lines)) | |
| return page_records, "pypdf" | |
| except Exception as exc: | |
| raise RuntimeError( | |
| "Could not extract text from a corpus PDF. Please verify the stored project documents are text-based PDFs " | |
| "instead of scanned image-only files." | |
| ) from exc | |
| def chunk_page_records(page_records: List[PageRecord], file_name: str) -> List[Chunk]: | |
| chunks: List[Chunk] = [] | |
| chunk_index = 0 | |
| stride = max(1, CHUNK_WORDS - CHUNK_OVERLAP) | |
| for page_record in page_records: | |
| words = page_record.text.split() | |
| if not words: | |
| continue | |
| start = 0 | |
| while start < len(words): | |
| end = min(start + CHUNK_WORDS, len(words)) | |
| window = words[start:end] | |
| if len(window) > 20: | |
| chunks.append( | |
| Chunk( | |
| chunk_id=f"{file_name}::p{page_record.page_num}::c{chunk_index}", | |
| source_file=file_name, | |
| page_num=page_record.page_num, | |
| text=" ".join(window), | |
| char_start=start, | |
| ) | |
| ) | |
| chunk_index += 1 | |
| if end == len(words): | |
| break | |
| start += stride | |
| return chunks | |
| def validate_pdf(file_path: str) -> Tuple[bool, str]: | |
| if not file_path: | |
| return False, "Please upload a PDF file." | |
| if not file_path.lower().endswith(".pdf"): | |
| return False, "Only PDF files are accepted." | |
| try: | |
| with open(file_path, "rb") as handle: | |
| if handle.read(4) != b"%PDF": | |
| return False, "The uploaded file does not look like a valid PDF." | |
| except OSError: | |
| return False, "The uploaded file could not be read." | |
| size_bytes = os.path.getsize(file_path) | |
| if size_bytes > MAX_FILE_SIZE_MB * 1024 * 1024: | |
| return False, f"PDF exceeds the {MAX_FILE_SIZE_MB} MB file-size limit." | |
| return True, "" | |
| def make_temp_session_dir() -> str: | |
| ensure_directories() | |
| return tempfile.mkdtemp(prefix="session_", dir=str(UPLOAD_ROOT)) | |
| def make_metric_badge(label: str, value: str) -> str: | |
| return ( | |
| "<div style='padding:10px 12px;border-radius:14px;background:#f8fafc;" | |
| "border:1px solid #dbe4f0'>" | |
| f"<div style='font-size:11px;letter-spacing:0.08em;text-transform:uppercase;" | |
| f"opacity:0.7;margin-bottom:5px'>{html.escape(label)}</div>" | |
| f"<div style='font-size:16px;font-weight:700;color:#0f172a'>{html.escape(value)}</div>" | |
| "</div>" | |
| ) | |
| def result_card(title: str, body: str, tone: str = "normal") -> str: | |
| palette = { | |
| "normal": ("#ffffff", "#0f172a", "#dbe4f0"), | |
| "error": ("#fff4f4", "#9f1239", "#fecdd3"), | |
| "warn": ("#fff8eb", "#9a3412", "#fed7aa"), | |
| "success": ("#f0fdf4", "#166534", "#bbf7d0"), | |
| } | |
| bg, fg, border = palette[tone] | |
| return ( | |
| f"<div style='background:{bg};color:{fg};border:1px solid {border};" | |
| "border-radius:18px;padding:18px 20px;box-shadow:0 10px 30px rgba(15,23,42,0.06)'>" | |
| f"<div style='font-size:12px;letter-spacing:0.08em;text-transform:uppercase;" | |
| f"opacity:0.65;margin-bottom:8px'>{html.escape(title)}</div>" | |
| f"<div style='font-size:16px;line-height:1.65'>{body}</div></div>" | |
| ) | |
| def initial_result_html() -> str: | |
| return result_card( | |
| "Ready", | |
| "Ask a question about the preloaded Phase 2 project corpus to receive a grounded answer with page and line references.", | |
| ) | |
| def line_window_score(query: str, answer: str, chunk_text: str, snippet: str) -> float: | |
| query_terms = content_terms(query) | |
| answer_terms = content_terms(answer) | |
| chunk_terms = content_terms(chunk_text) | |
| snippet_terms = content_terms(snippet) | |
| exact_bonus = 1.0 if normalize_text(answer).lower() in normalize_text(snippet).lower() and answer.strip() else 0.0 | |
| overlap = len((query_terms | answer_terms | chunk_terms) & snippet_terms) | |
| return exact_bonus + overlap / max(1, len(snippet_terms)) | |
| def locate_citation(question: str, answer: str, chunk: Optional[Chunk], session: SessionData) -> Optional[Citation]: | |
| if chunk is None: | |
| return None | |
| page_record = next( | |
| ( | |
| record | |
| for record in session.page_records | |
| if record.page_num == chunk.page_num and record.source_file == chunk.source_file | |
| ), | |
| None, | |
| ) | |
| if page_record is None or not page_record.lines: | |
| return None | |
| best_score = -1.0 | |
| best_window = (1, 1, page_record.lines[0]) | |
| for start_index in range(len(page_record.lines)): | |
| for end_index in range(start_index, min(len(page_record.lines), start_index + 4)): | |
| excerpt = " ".join(page_record.lines[start_index:end_index + 1]).strip() | |
| if not excerpt: | |
| continue | |
| score = line_window_score(question, answer, chunk.text, excerpt) | |
| if score > best_score: | |
| best_score = score | |
| best_window = (start_index + 1, end_index + 1, excerpt) | |
| return Citation( | |
| source_file=session.file_name, | |
| page_num=chunk.page_num, | |
| line_start=best_window[0], | |
| line_end=best_window[1], | |
| excerpt=best_window[2], | |
| ) | |
| def locate_citation_by_file_page(question: str, answer: str, session: SessionData, source_file: str, page_num: int) -> Optional[Citation]: | |
| page_record = next( | |
| ( | |
| record | |
| for record in session.page_records | |
| if record.source_file == source_file and record.page_num == page_num | |
| ), | |
| None, | |
| ) | |
| if page_record is None or not page_record.lines: | |
| return None | |
| best_score = -1.0 | |
| best_window = (1, 1, page_record.lines[0]) | |
| for start_index in range(len(page_record.lines)): | |
| for end_index in range(start_index, min(len(page_record.lines), start_index + 4)): | |
| excerpt = " ".join(page_record.lines[start_index:end_index + 1]).strip() | |
| if not excerpt: | |
| continue | |
| score = line_window_score(question, answer, excerpt, excerpt) | |
| if score > best_score: | |
| best_score = score | |
| best_window = (start_index + 1, end_index + 1, excerpt) | |
| return Citation( | |
| source_file=source_file, | |
| page_num=page_num, | |
| line_start=best_window[0], | |
| line_end=best_window[1], | |
| excerpt=best_window[2], | |
| ) | |
| def best_evidence_sentences( | |
| session: SessionData, | |
| question: str, | |
| plan: QuestionPlan, | |
| source_filters: Optional[List[str]] = None, | |
| ) -> Tuple[List[Chunk], List[Tuple[Chunk, str]]]: | |
| chunks = session.retriever.retrieve(plan.expanded_query, k=min(max(K_PASSAGES * 3, 12), len(session.chunks))) | |
| if source_filters: | |
| filtered = [chunk for chunk in chunks if chunk.source_file in source_filters] | |
| if filtered: | |
| chunks = filtered | |
| evidence = session.agent.pipeline.select_evidence( | |
| plan.expanded_query, | |
| chunks, | |
| max_sentences=4 if plan.mode in {"procedural", "descriptive"} else 3, | |
| ) | |
| return chunks, [(item.chunk, item.sentence) for item in evidence] | |
| def extract_project_name(sentence: str) -> Optional[str]: | |
| match = re.search(r"\bProject\s*[:\-]?\s*([A-Z][A-Za-z0-9 .-]{2,80})", sentence) | |
| if not match: | |
| return None | |
| candidate = match.group(1) | |
| candidate = re.split(r"\b(?:Status|System|Version|Document|Approved|Author)\b", candidate)[0].strip(" :-,") | |
| return candidate or None | |
| def concise_factoid_answer(question: str, plan: QuestionPlan, evidence_pairs: List[Tuple[Chunk, str]]) -> Optional[str]: | |
| if not evidence_pairs: | |
| return None | |
| for _, sentence in evidence_pairs: | |
| if any(token in question.lower() for token in ["interval", "duration", "period"]) and re.search(r"\b\d+\s+(?:months?|days?|years?)\b", sentence, re.I): | |
| match = re.search(r"\b(\d+\s+(?:months?|days?|years?))\b", sentence, re.I) | |
| if match: | |
| return normalize_text(match.group(1)) | |
| if plan.expected_type == "currency": | |
| match = AMOUNT_RE.search(sentence) | |
| if match: | |
| prefix = (match.group(1) or "INR").replace(".", "") | |
| return normalize_text(f"{prefix} {match.group(2)}") | |
| match = CURRENCY_RE.search(sentence) | |
| if match: | |
| return normalize_text(match.group(0)) | |
| if plan.expected_type == "date": | |
| match = DATE_RE.search(sentence) | |
| if match: | |
| return normalize_text(match.group(0)) | |
| if plan.expected_type == "version": | |
| match = VERSION_RE.search(sentence) | |
| if match: | |
| return normalize_text(match.group(0)) | |
| if plan.expected_type == "number": | |
| matches = NUMBER_RE.findall(sentence) | |
| if matches: | |
| return normalize_text(matches[0]) | |
| if plan.expected_type == "person": | |
| match = PERSON_RE.search(sentence) | |
| if match: | |
| return normalize_text(match.group(0)) | |
| if plan.expected_type in {"project_name", "name"}: | |
| candidate = extract_project_name(sentence) | |
| if candidate: | |
| return normalize_text(candidate) | |
| top_sentence = evidence_pairs[0][1] | |
| if plan.expected_type == "text": | |
| short = clip_text(top_sentence, max_chars=180) | |
| if len(short.split()) <= 18: | |
| return short | |
| return None | |
| def metadata_lookup_answer(session: SessionData, question: str, source_filters: List[str]) -> Optional[Tuple[str, str, int]]: | |
| q = question.lower() | |
| headers = session.structured.get("headers", {}) | |
| vmp_table = session.structured.get("vmp_table", {}) | |
| if ("author" in q or "approver" in q or "approve" in q or "document id" in q or "version" in q or "approved date" in q) and not source_filters: | |
| if "author" in q or "approver" in q or "approve" in q: | |
| return ( | |
| "Please specify which document you mean, for example Validation Master Plan, Configuration Guide, or Traceability Matrix.", | |
| "", | |
| 0, | |
| ) | |
| for file_name in source_filters: | |
| pretty = pretty_doc_name(file_name).lower() | |
| row = vmp_table.get(pretty) | |
| header = headers.get(file_name, {}) | |
| if "qa approver" in q and header.get("qa_approver"): | |
| return header["qa_approver"], file_name, 1 | |
| if "author" in q and header.get("author"): | |
| return header["author"], file_name, 1 | |
| if ("approve" in q or "approver" in q) and row and row.get("approver"): | |
| return row["approver"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2")) | |
| if "document id" in q and row and row.get("document_id"): | |
| return row["document_id"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2")) | |
| if "phase" in q and row and row.get("phase"): | |
| return row["phase"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2")) | |
| if ("qa approver" in q or "approved date" in q or "version" in q or "system" in q or "status" in q or "project" in q) and header: | |
| if "approved date" in q and header.get("approved_date"): | |
| return header["approved_date"], file_name, 1 | |
| if "version" in q and header.get("version"): | |
| return header["version"], file_name, 1 | |
| if "system" in q and header.get("system"): | |
| return header["system"], file_name, 1 | |
| if "status" in q and header.get("status"): | |
| return header["status"], file_name, 1 | |
| if "project" in q and header.get("project"): | |
| return header["project"], file_name, 1 | |
| return None | |
| def summarize_procedural_answer(evidence_pairs: List[Tuple[Chunk, str]]) -> Optional[str]: | |
| if not evidence_pairs: | |
| return None | |
| sentences: List[str] = [] | |
| seen = set() | |
| for _, sentence in evidence_pairs: | |
| cleaned = clip_text(sentence, max_chars=220) | |
| key = cleaned.lower() | |
| if key in seen: | |
| continue | |
| sentences.append(cleaned) | |
| seen.add(key) | |
| if len(sentences) >= MAX_SUMMARY_SENTENCES: | |
| break | |
| if not sentences: | |
| return None | |
| return "Based on the document: " + " ".join(sentences) | |
| def answer_relevance_proxy(question: str, answer: str, citation: Optional[Citation]) -> Optional[float]: | |
| if not answer or answer.lower().startswith("i don't have enough evidence"): | |
| return None | |
| query_terms = content_terms(question) | |
| support_terms = content_terms(answer) | |
| if citation: | |
| support_terms |= content_terms(citation.excerpt) | |
| if not query_terms: | |
| return 1.0 | |
| return round(len(query_terms & support_terms) / len(query_terms), 4) | |
| def context_precision_proxy(question: str, answer: str, retrieved_chunks: List[Chunk]) -> Optional[float]: | |
| if not retrieved_chunks: | |
| return None | |
| query_terms = content_terms(question) | |
| answer_terms = content_terms(answer) | |
| relevant = 0 | |
| for chunk in retrieved_chunks: | |
| chunk_terms = content_terms(chunk.text) | |
| if query_terms & chunk_terms or answer_terms & chunk_terms: | |
| relevant += 1 | |
| return round(relevant / len(retrieved_chunks), 4) | |
| def faithfulness_proxy(answer: str, citation: Optional[Citation]) -> Optional[float]: | |
| if not answer or not citation: | |
| return None | |
| answer_norm = normalize_text(answer).lower() | |
| excerpt_norm = normalize_text(citation.excerpt).lower() | |
| if answer_norm and answer_norm in excerpt_norm: | |
| return 1.0 | |
| answer_terms = content_terms(answer) | |
| excerpt_terms = content_terms(citation.excerpt) | |
| if not answer_terms: | |
| return 1.0 if answer_norm and answer_norm in excerpt_norm else 0.0 | |
| return round(len(answer_terms & excerpt_terms) / len(answer_terms), 4) | |
| def citation_html(citation: Optional[Citation]) -> str: | |
| if citation is None: | |
| return "No supporting citation was selected." | |
| line_label = ( | |
| f"line {citation.line_start}" | |
| if citation.line_start == citation.line_end | |
| else f"lines {citation.line_start}-{citation.line_end}" | |
| ) | |
| return ( | |
| f"<div style='font-weight:700;font-size:18px;margin-bottom:8px'>{html.escape(citation.source_file)}</div>" | |
| f"<div style='font-size:13px;opacity:0.75;margin-bottom:10px'>" | |
| f"Page {citation.page_num}, {line_label} (extracted text)</div>" | |
| f"<div>{html.escape(citation.excerpt)}</div>" | |
| ) | |
| def render_result( | |
| question: str, | |
| answer: str, | |
| citation: Optional[Citation], | |
| metrics: Dict[str, Optional[float]], | |
| abstained: bool, | |
| ) -> str: | |
| answer_card = result_card( | |
| "Final Answer", | |
| html.escape(answer), | |
| tone="warn" if abstained else "normal", | |
| ) | |
| source_card = result_card( | |
| "Source Reference", | |
| citation_html(citation), | |
| tone="warn" if abstained else "success", | |
| ) | |
| metric_boxes = "".join( | |
| [ | |
| make_metric_badge("Latency", f"{metrics['latency_seconds']:.2f}s"), | |
| make_metric_badge( | |
| "Answer Relevance", | |
| "N/A" if metrics.get("answer_relevance") is None else f"{metrics['answer_relevance']:.2f}", | |
| ), | |
| make_metric_badge( | |
| "Context Precision", | |
| "N/A" if metrics.get("context_precision") is None else f"{metrics['context_precision']:.2f}", | |
| ), | |
| make_metric_badge( | |
| "Faithfulness", | |
| "N/A" if metrics.get("faithfulness_proxy") is None else f"{metrics['faithfulness_proxy']:.2f}", | |
| ), | |
| make_metric_badge("Hallucination", f"{metrics.get('hallucination_rate', 0.0):.2f}"), | |
| ] | |
| ) | |
| question_html = ( | |
| "<div style='margin-bottom:14px;padding:14px 16px;border-radius:16px;" | |
| "background:linear-gradient(135deg,#eef4ff 0%,#f8fafc 100%);" | |
| "border:1px solid #d9e6ff;color:#0f172a'>" | |
| "<div style='font-size:12px;letter-spacing:0.08em;text-transform:uppercase;" | |
| "opacity:0.65;margin-bottom:6px'>Question</div>" | |
| f"<div style='font-size:18px;font-weight:600;line-height:1.5'>{html.escape(question)}</div>" | |
| "</div>" | |
| ) | |
| return ( | |
| "<div style='font-family:ui-sans-serif,system-ui,sans-serif'>" | |
| f"{question_html}" | |
| "<div style='display:grid;grid-template-columns:1fr;gap:14px'>" | |
| f"{answer_card}{source_card}" | |
| f"<div style='display:grid;grid-template-columns:repeat(auto-fit,minmax(140px,1fr));gap:10px'>{metric_boxes}</div>" | |
| "</div></div>" | |
| ) | |
| def render_document_status(session: SessionData) -> str: | |
| doc_count = len({record.source_file for record in session.page_records}) | |
| return result_card( | |
| "Corpus Loaded", | |
| ( | |
| f"<strong>{html.escape(session.file_name)}</strong><br>" | |
| f"Documents indexed: {doc_count}<br>" | |
| f"Pages indexed: {session.page_count}<br>" | |
| f"Chunks indexed: {len(session.chunks)}<br>" | |
| f"Extractor used: {html.escape(session.extractor)}<br>" | |
| f"Knowledge base mode: preloaded project corpus" | |
| ), | |
| tone="success", | |
| ) | |
| def error_html(message: str) -> str: | |
| return result_card("Action Required", html.escape(message), tone="error") | |
| def info_html(message: str) -> str: | |
| return result_card("Notice", html.escape(message), tone="warn") | |
| def build_session(file_path: str) -> SessionData: | |
| is_valid, validation_message = validate_pdf(file_path) | |
| if not is_valid: | |
| raise ValueError(validation_message) | |
| temp_dir = make_temp_session_dir() | |
| try: | |
| file_name = os.path.basename(file_path) | |
| dest_path = os.path.join(temp_dir, file_name) | |
| shutil.copy2(file_path, dest_path) | |
| page_records, extractor_name = extract_page_records(dest_path, file_name) | |
| if not page_records: | |
| raise ValueError( | |
| "No extractable text was found in the PDF. Please upload a text-based PDF." | |
| ) | |
| if len(page_records) > MAX_PAGES: | |
| raise ValueError( | |
| f"PDF has {len(page_records)} pages, which exceeds the {MAX_PAGES}-page limit." | |
| ) | |
| chunks = chunk_page_records(page_records, file_name) | |
| if not chunks: | |
| raise ValueError( | |
| "The extracted PDF text did not produce enough content to index." | |
| ) | |
| if len(chunks) > MAX_CHUNKS: | |
| raise ValueError( | |
| f"PDF produced {len(chunks)} chunks, which exceeds the {MAX_CHUNKS}-chunk limit." | |
| ) | |
| file_hash = sha256_file(dest_path) | |
| file_size_bytes = os.path.getsize(dest_path) | |
| retriever = SessionRetriever(chunks, get_embedder()) | |
| agent = build_session_agent(retriever) | |
| return SessionData( | |
| session_id=str(uuid.uuid4()), | |
| temp_dir=temp_dir, | |
| pdf_path=dest_path, | |
| file_name=file_name, | |
| file_hash=file_hash, | |
| file_size_bytes=file_size_bytes, | |
| page_records=page_records, | |
| chunks=chunks, | |
| retriever=retriever, | |
| agent=agent, | |
| page_count=len(page_records), | |
| extractor=extractor_name, | |
| structured={}, | |
| ) | |
| except Exception: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| raise | |
| def parse_header_metadata(page_records: List[PageRecord]) -> Dict[str, str]: | |
| if not page_records: | |
| return {} | |
| lines = page_records[0].lines | |
| metadata: Dict[str, str] = {} | |
| title_lines: List[str] = [] | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i] | |
| if re.match(r"^\d+\.", line): | |
| break | |
| if line.endswith(":") and i + 1 < len(lines): | |
| key = line[:-1].strip().lower().replace(" ", "_") | |
| metadata[key] = lines[i + 1].strip() | |
| i += 2 | |
| continue | |
| title_lines.append(line) | |
| i += 1 | |
| metadata["header_text"] = " ".join(lines[: min(len(lines), 20)]) | |
| metadata["title"] = " | ".join(title_lines[:3]) | |
| return metadata | |
| def parse_vmp_table(page_records: List[PageRecord]) -> Dict[str, Dict[str, str]]: | |
| known_docs = { | |
| "Project Charter", | |
| "Validation Master Plan", | |
| "User Requirements Specification", | |
| "Functional Requirements Specification", | |
| "Risk Assessment", | |
| "HP ALM Configuration Guide", | |
| "IQ Protocol", | |
| "IQ Execution Report", | |
| "OQ Protocol", | |
| "OQ Execution Report", | |
| "PQ/UAT Protocol and Report", | |
| "Data Migration Plan", | |
| "Data Migration Summary Report", | |
| "Validation Summary Report", | |
| "Traceability Matrix", | |
| "Change Control SOP", | |
| } | |
| rows: Dict[str, Dict[str, str]] = {} | |
| all_lines: List[Tuple[int, str]] = [] | |
| for record in page_records: | |
| for line in record.lines: | |
| all_lines.append((record.page_num, line)) | |
| collecting = False | |
| idx = 0 | |
| while idx < len(all_lines): | |
| page_num, line = all_lines[idx] | |
| if line == "Document": | |
| collecting = True | |
| idx += 5 | |
| continue | |
| if not collecting: | |
| idx += 1 | |
| continue | |
| if line.startswith("4. Roles and Responsibilities"): | |
| break | |
| if line in known_docs and idx + 4 < len(all_lines): | |
| _, doc_id = all_lines[idx + 1] | |
| _, phase = all_lines[idx + 2] | |
| _, author = all_lines[idx + 3] | |
| _, approver = all_lines[idx + 4] | |
| rows[line.lower()] = { | |
| "document": line, | |
| "document_id": doc_id, | |
| "phase": phase, | |
| "author": author, | |
| "approver": approver, | |
| "page_num": str(page_num), | |
| } | |
| idx += 5 | |
| continue | |
| idx += 1 | |
| if not rows: | |
| log_event( | |
| "vmp_table_parse_empty", | |
| source_file="02_Validation_Master_Plan.pdf", | |
| page_count=len(page_records), | |
| ) | |
| return rows | |
| def corpus_pdf_files(candidate: Path) -> List[Path]: | |
| if not candidate.exists() or not candidate.is_dir(): | |
| return [] | |
| pdfs = sorted(p for p in candidate.glob("*.pdf") if p.is_file()) | |
| numbered = [p for p in pdfs if re.match(r"^\d{2}_.+\.pdf$", p.name)] | |
| required = { | |
| "01_Project_Charter.pdf", | |
| "02_Validation_Master_Plan.pdf", | |
| "15_Regulatory_Reference_Guide.pdf", | |
| } | |
| names = {p.name for p in numbered} | |
| if required.issubset(names): | |
| return numbered | |
| if len(numbered) >= 10: | |
| return numbered | |
| return [] | |
| def resolve_corpus_dir() -> Path: | |
| for candidate in CORPUS_CANDIDATES: | |
| if corpus_pdf_files(candidate): | |
| return candidate | |
| raise FileNotFoundError( | |
| "Phase 2 corpus not found. Upload the 15 PDF files either into a phase2_corpus folder in the app repo or at the repo root." | |
| ) | |
| def build_corpus_session() -> SessionData: | |
| corpus_dir = resolve_corpus_dir() | |
| pdf_paths = corpus_pdf_files(corpus_dir) | |
| page_records: List[PageRecord] = [] | |
| chunks: List[Chunk] = [] | |
| structured: Dict[str, dict] = {"headers": {}, "vmp_table": {}, "corpus_dir": str(corpus_dir)} | |
| extractors = set() | |
| file_hash_parts: List[str] = [] | |
| for pdf_path in pdf_paths: | |
| file_name = pdf_path.name | |
| doc_pages, extractor_name = extract_page_records(str(pdf_path), file_name) | |
| extractors.add(extractor_name) | |
| page_records.extend(doc_pages) | |
| chunks.extend(chunk_page_records(doc_pages, file_name)) | |
| structured["headers"][file_name] = parse_header_metadata(doc_pages) | |
| file_hash_parts.append(f"{file_name}:{pdf_path.stat().st_size}:{int(pdf_path.stat().st_mtime)}") | |
| if file_name == "02_Validation_Master_Plan.pdf": | |
| structured["vmp_table"] = parse_vmp_table(doc_pages) | |
| retriever = SessionRetriever(chunks, get_embedder()) | |
| agent = build_session_agent(retriever) | |
| corpus_hash = hashlib.sha256("|".join(file_hash_parts).encode("utf-8")).hexdigest() | |
| return SessionData( | |
| session_id="phase2-corpus", | |
| temp_dir="", | |
| pdf_path=str(corpus_dir), | |
| file_name="Phase 2 corpus", | |
| file_hash=corpus_hash, | |
| file_size_bytes=0, | |
| page_records=page_records, | |
| chunks=chunks, | |
| retriever=retriever, | |
| agent=agent, | |
| page_count=len({(record.source_file, record.page_num) for record in page_records}), | |
| extractor=" / ".join(sorted(extractors)), | |
| structured=structured, | |
| ) | |
| def handle_upload(file_obj, current_session_id: Optional[str]): | |
| cleanup_expired_sessions() | |
| if current_session_id: | |
| with SESSIONS_LOCK: | |
| remove_session(current_session_id) | |
| file_path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None) | |
| if not file_path: | |
| return None, info_html("Upload one PDF file to start a testing session."), initial_result_html(), None, "" | |
| try: | |
| session = build_session(file_path) | |
| with SESSIONS_LOCK: | |
| SESSIONS[session.session_id] = session | |
| log_event( | |
| "upload_success", | |
| session_id=session.session_id, | |
| file_hash=session.file_hash, | |
| file_name=session.file_name, | |
| page_count=session.page_count, | |
| chunk_count=len(session.chunks), | |
| file_size_bytes=session.file_size_bytes, | |
| extractor=session.extractor, | |
| ) | |
| return session.session_id, render_document_status(session), initial_result_html(), None, "" | |
| except Exception as exc: | |
| log_event("upload_rejected", reason=str(exc), file_name=os.path.basename(file_path)) | |
| return None, error_html(str(exc)), initial_result_html(), None, "" | |
| def get_session(session_id: Optional[str]) -> Optional[SessionData]: | |
| if not session_id: | |
| return None | |
| with SESSIONS_LOCK: | |
| session = SESSIONS.get(session_id) | |
| return session | |
| def check_rate_limit(session: SessionData) -> Optional[str]: | |
| now = now_ts() | |
| window_start = now - 60 | |
| session.question_timestamps = [ts for ts in session.question_timestamps if ts >= window_start] | |
| if len(session.question_timestamps) >= MAX_QUESTIONS_PER_MINUTE: | |
| return ( | |
| f"Rate limit reached. Please wait before asking more than " | |
| f"{MAX_QUESTIONS_PER_MINUTE} questions per minute in one session." | |
| ) | |
| session.question_timestamps.append(now) | |
| return None | |
| def build_question_metrics(question: str, answer: str, citation: Optional[Citation], retrieved_chunks: List[Chunk], hallucination_rate: float, latency_seconds: float) -> Dict[str, Optional[float]]: | |
| return { | |
| "latency_seconds": latency_seconds, | |
| "answer_relevance": answer_relevance_proxy(question, answer, citation), | |
| "context_precision": context_precision_proxy(question, answer, retrieved_chunks), | |
| "faithfulness_proxy": faithfulness_proxy(answer, citation), | |
| "hallucination_rate": hallucination_rate, | |
| } | |
| def ask_question(question: str, session_id: Optional[str]): | |
| cleanup_expired_sessions() | |
| question = (question or "").strip() | |
| if not question: | |
| return info_html("Enter a question to query the preloaded project corpus."), None, "" | |
| session = get_session(session_id) | |
| if session is None: | |
| return error_html("The preloaded corpus is not available right now. Please reload the app."), None, "" | |
| session.last_activity = now_ts() | |
| rate_limit_message = check_rate_limit(session) | |
| if rate_limit_message: | |
| log_event("rate_limited", session_id=session.session_id, question=question) | |
| return error_html(rate_limit_message), None, "" | |
| plan = question_plan(question) | |
| source_filters = matched_source_files(question) | |
| metadata_hit = metadata_lookup_answer(session, question, source_filters) | |
| if metadata_hit: | |
| answer_text, source_file, source_page = metadata_hit | |
| abstained = False | |
| hallucination_rate = 0.0 | |
| latency_seconds = 0.0 | |
| citation = ( | |
| locate_citation_by_file_page(question, answer_text, session, source_file, source_page) | |
| if source_file | |
| else None | |
| ) | |
| metrics = build_question_metrics(question, answer_text, citation, [], hallucination_rate, latency_seconds) | |
| result_html = render_result(question, answer_text, citation, metrics, abstained=False) | |
| response_state = { | |
| "session_id": session.session_id, | |
| "question": question, | |
| "answer": answer_text, | |
| "abstained": False, | |
| "source_file": citation.source_file if citation else None, | |
| "page_num": citation.page_num if citation else None, | |
| "line_start": citation.line_start if citation else None, | |
| "line_end": citation.line_end if citation else None, | |
| "excerpt": citation.excerpt if citation else None, | |
| "metrics": metrics, | |
| "file_hash": session.file_hash, | |
| "route_mode": "structured", | |
| "expected_type": plan.expected_type, | |
| } | |
| append_jsonl( | |
| INTERACTION_LOG_PATH, | |
| { | |
| "timestamp": now_ts(), | |
| "session_id": session.session_id, | |
| "file_hash": session.file_hash, | |
| "file_name": session.file_name, | |
| "page_count": session.page_count, | |
| "question": question, | |
| "answer": answer_text, | |
| "abstained": False, | |
| "source_file": citation.source_file if citation else None, | |
| "page_num": citation.page_num if citation else None, | |
| "line_start": citation.line_start if citation else None, | |
| "line_end": citation.line_end if citation else None, | |
| "excerpt": citation.excerpt if citation else None, | |
| "latency_seconds": latency_seconds, | |
| "route_mode": "structured", | |
| "expected_type": plan.expected_type, | |
| "answer_relevance": metrics["answer_relevance"], | |
| "context_precision": metrics["context_precision"], | |
| "faithfulness_proxy": metrics["faithfulness_proxy"], | |
| "hallucination_rate": metrics["hallucination_rate"], | |
| }, | |
| ) | |
| return result_html, response_state, "" | |
| retrieved_chunks, evidence_pairs = best_evidence_sentences(session, question, plan, source_filters=source_filters) | |
| evidence_sentences = [sentence for _, sentence in evidence_pairs] | |
| start = time.perf_counter() | |
| output = None | |
| best_chunk = evidence_pairs[0][0] if evidence_pairs else None | |
| answer_text: str | |
| abstained = False | |
| hallucination_rate = 0.0 | |
| def run_agentic_fallback() -> Tuple[str, bool, float, Optional[Chunk], Optional[str]]: | |
| nonlocal output, best_chunk | |
| try: | |
| output = session.agent.run(question) | |
| best_chunk = output.best_chunk or best_chunk | |
| answer = ( | |
| "I don't have enough evidence in the project corpus to answer that reliably." | |
| if output.abstained | |
| else (output.answer or "No answer produced.") | |
| ) | |
| return answer, output.abstained, output.hallucination_rate or 0.0, best_chunk, None | |
| except Exception as exc: | |
| log_event("inference_failed", session_id=session.session_id, question=question, error=str(exc)) | |
| return "", False, 0.0, best_chunk, str(exc) | |
| if not evidence_pairs: | |
| abstained = True | |
| answer_text = "I don't have enough evidence in the project corpus to answer that reliably." | |
| elif plan.mode in {"procedural", "descriptive"}: | |
| summary_answer = summarize_procedural_answer(evidence_pairs) if evidence_has_expected_type(plan, evidence_sentences) else None | |
| if summary_answer: | |
| answer_text = summary_answer | |
| abstained = False | |
| else: | |
| if plan.allow_agentic_fallback: | |
| answer_text, abstained, hallucination_rate, best_chunk, inference_error = run_agentic_fallback() | |
| if inference_error: | |
| return error_html(f"Inference failed: {inference_error}"), None, "" | |
| else: | |
| abstained = True | |
| answer_text = "I don't have enough evidence in the project corpus to answer that reliably." | |
| else: | |
| concise_answer = concise_factoid_answer(question, plan, evidence_pairs) | |
| if concise_answer and evidence_has_expected_type(plan, evidence_sentences): | |
| answer_text = concise_answer | |
| elif not evidence_has_expected_type(plan, evidence_sentences): | |
| abstained = True | |
| answer_text = "I don't have enough evidence in the project corpus to answer that reliably." | |
| elif plan.allow_agentic_fallback: | |
| answer_text, abstained, hallucination_rate, best_chunk, inference_error = run_agentic_fallback() | |
| if inference_error: | |
| return error_html(f"Inference failed: {inference_error}"), None, "" | |
| else: | |
| abstained = True | |
| answer_text = "I don't have enough evidence in the project corpus to answer that reliably." | |
| latency_seconds = time.perf_counter() - start | |
| citation = locate_citation(question, answer_text, best_chunk, session) | |
| metrics = build_question_metrics( | |
| question, | |
| answer_text, | |
| citation, | |
| retrieved_chunks, | |
| hallucination_rate, | |
| latency_seconds, | |
| ) | |
| result_html = render_result( | |
| question=question, | |
| answer=answer_text, | |
| citation=citation, | |
| metrics=metrics, | |
| abstained=abstained, | |
| ) | |
| response_state = { | |
| "session_id": session.session_id, | |
| "question": question, | |
| "answer": answer_text, | |
| "abstained": abstained, | |
| "source_file": citation.source_file if citation else None, | |
| "page_num": citation.page_num if citation else None, | |
| "line_start": citation.line_start if citation else None, | |
| "line_end": citation.line_end if citation else None, | |
| "excerpt": citation.excerpt if citation else None, | |
| "metrics": metrics, | |
| "file_hash": session.file_hash, | |
| "route_mode": plan.mode, | |
| "expected_type": plan.expected_type, | |
| } | |
| append_jsonl( | |
| INTERACTION_LOG_PATH, | |
| { | |
| "timestamp": now_ts(), | |
| "session_id": session.session_id, | |
| "file_hash": session.file_hash, | |
| "file_name": session.file_name, | |
| "page_count": session.page_count, | |
| "question": question, | |
| "answer": answer_text, | |
| "abstained": abstained, | |
| "source_file": citation.source_file if citation else None, | |
| "page_num": citation.page_num if citation else None, | |
| "line_start": citation.line_start if citation else None, | |
| "line_end": citation.line_end if citation else None, | |
| "excerpt": citation.excerpt if citation else None, | |
| "latency_seconds": latency_seconds, | |
| "route_mode": plan.mode, | |
| "expected_type": plan.expected_type, | |
| "answer_relevance": metrics["answer_relevance"], | |
| "context_precision": metrics["context_precision"], | |
| "faithfulness_proxy": metrics["faithfulness_proxy"], | |
| "hallucination_rate": metrics["hallucination_rate"], | |
| }, | |
| ) | |
| return result_html, response_state, "" | |
| def submit_feedback(response_state: Optional[dict], vote: str): | |
| if not response_state: | |
| return "Ask a question first, then rate the answer." | |
| log_event( | |
| "feedback", | |
| session_id=response_state.get("session_id"), | |
| file_hash=response_state.get("file_hash"), | |
| question=response_state.get("question"), | |
| vote=vote, | |
| source_file=response_state.get("source_file"), | |
| page_num=response_state.get("page_num"), | |
| line_start=response_state.get("line_start"), | |
| line_end=response_state.get("line_end"), | |
| ) | |
| if vote == "helpful": | |
| return "Thanks. Your feedback was recorded as helpful." | |
| return "Thanks. Your feedback was recorded for review." | |
| ensure_directories() | |
| try: | |
| PRELOADED_SESSION = build_corpus_session() | |
| with SESSIONS_LOCK: | |
| SESSIONS[PRELOADED_SESSION.session_id] = PRELOADED_SESSION | |
| PRELOADED_STATUS_HTML = render_document_status(PRELOADED_SESSION) | |
| STARTUP_NOTICE = "" | |
| except Exception as exc: | |
| PRELOADED_SESSION = None | |
| PRELOADED_STATUS_HTML = error_html(str(exc)) | |
| STARTUP_NOTICE = str(exc) | |
| with gr.Blocks(css=CSS) as demo: | |
| session_state = gr.State(PRELOADED_SESSION.session_id if PRELOADED_SESSION else None) | |
| response_state = gr.State(None) | |
| gr.Markdown( | |
| f""" | |
| # {APP_NAME} | |
| {APP_TAGLINE} | |
| **Project corpus mode** | |
| - Preloaded Phase 2 project documents | |
| - Optimized for this fixed validation corpus | |
| - Best for onboarding, project lookup, and validation Q&A | |
| - Rate limit: **{MAX_QUESTIONS_PER_MINUTE} questions per minute** | |
| """ | |
| ) | |
| gr.Markdown( | |
| f""" | |
| <div style="padding:14px 16px;border-radius:14px;background:#fff8eb;border:1px solid #fed7aa;color:#9a3412"> | |
| <strong>Privacy notice</strong><br> | |
| {html.escape(PRIVACY_NOTICE)} | |
| </div> | |
| """ | |
| ) | |
| if STARTUP_NOTICE: | |
| gr.HTML(error_html(STARTUP_NOTICE)) | |
| document_status = gr.HTML(PRELOADED_STATUS_HTML) | |
| with gr.Row(): | |
| question_box = gr.Textbox( | |
| label="Your Question", | |
| lines=2, | |
| placeholder="Ask a question about the preloaded project documents...", | |
| scale=5, | |
| ) | |
| ask_btn = gr.Button("Ask", elem_id="ask_btn", scale=1) | |
| result_html_component = gr.HTML(initial_result_html()) | |
| with gr.Row(): | |
| helpful_btn = gr.Button("Helpful") | |
| needs_work_btn = gr.Button("Needs Improvement") | |
| feedback_status = gr.Textbox( | |
| label="Feedback Status", | |
| value="", | |
| interactive=False, | |
| ) | |
| with gr.Accordion("Testing guardrails and privacy details", open=False): | |
| gr.Markdown( | |
| f""" | |
| - **Knowledge base**: 15 preloaded Phase 2 project PDFs | |
| - **Citation format**: page and extracted-text line range | |
| - **Queueing**: concurrency limit {QUEUE_CONCURRENCY}, queue size {QUEUE_MAX_SIZE} | |
| - **Logged for evaluation**: corpus question, answer, citation, latency, and proxy metrics | |
| """ | |
| ) | |
| ask_btn.click( | |
| ask_question, | |
| inputs=[question_box, session_state], | |
| outputs=[result_html_component, response_state, feedback_status], | |
| ) | |
| question_box.submit( | |
| ask_question, | |
| inputs=[question_box, session_state], | |
| outputs=[result_html_component, response_state, feedback_status], | |
| ) | |
| helpful_btn.click( | |
| lambda state: submit_feedback(state, "helpful"), | |
| inputs=[response_state], | |
| outputs=[feedback_status], | |
| ) | |
| needs_work_btn.click( | |
| lambda state: submit_feedback(state, "needs_improvement"), | |
| inputs=[response_state], | |
| outputs=[feedback_status], | |
| ) | |
| if __name__ == "__main__": | |
| ensure_directories() | |
| cleanup_expired_sessions() | |
| demo.queue(default_concurrency_limit=QUEUE_CONCURRENCY, max_size=QUEUE_MAX_SIZE).launch() | |