import hashlib import html import json import os import re import shutil import tempfile import threading import time import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional, Tuple import faiss import gradio as gr from sentence_transformers import SentenceTransformer from extracted_phase2_core import AgenticSelfRAG, Chunk, K_PASSAGES APP_NAME = "SourceTruth" APP_TAGLINE = "Ask grounded questions over the preloaded Phase 2 project corpus and inspect cited evidence." APP_ROOT = Path(__file__).resolve().parent UPLOAD_ROOT = APP_ROOT / "testing_uploads" LOG_ROOT = APP_ROOT / "testing_logs" EVENT_LOG_PATH = LOG_ROOT / "events.jsonl" INTERACTION_LOG_PATH = LOG_ROOT / "interactions.jsonl" CORPUS_CANDIDATES = [ APP_ROOT / "phase2_corpus", APP_ROOT / "phase 2 corpus", APP_ROOT, ] LOCAL_CORPUS_DIR = os.getenv("LOCAL_CORPUS_DIR", "").strip() if LOCAL_CORPUS_DIR: CORPUS_CANDIDATES.append(Path(LOCAL_CORPUS_DIR).expanduser()) MAX_FILE_SIZE_MB = int(os.getenv("MAX_FILE_SIZE_MB", "20")) MAX_PAGES = int(os.getenv("MAX_PAGES", "75")) MAX_CHUNKS = int(os.getenv("MAX_CHUNKS", "250")) CHUNK_WORDS = int(os.getenv("CHUNK_WORDS", "300")) CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "50")) SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", str(30 * 60))) MAX_QUESTIONS_PER_MINUTE = int(os.getenv("MAX_QUESTIONS_PER_MINUTE", "8")) QUEUE_CONCURRENCY = int(os.getenv("QUEUE_CONCURRENCY", "2")) QUEUE_MAX_SIZE = int(os.getenv("QUEUE_MAX_SIZE", "20")) LOAD_IN_4BIT = os.getenv("LOAD_IN_4BIT", "0") == "1" MAX_SUMMARY_SENTENCES = int(os.getenv("MAX_SUMMARY_SENTENCES", "3")) PRIVACY_NOTICE = ( "The preloaded project PDFs are processed only to answer your questions and produce citations. " "Documents are not used to train models. Interaction logs may store the question, answer, citation, " "and proxy evaluation metrics for testing analysis. Avoid using the application for confidential, " "personal, medical, or legal decisions without direct document verification." ) CSS = """ .gradio-container { background: radial-gradient(circle at top left, rgba(59,130,246,0.08), transparent 28%), radial-gradient(circle at top right, rgba(16,185,129,0.08), transparent 22%), linear-gradient(180deg, #f8fbff 0%, #f4f7fb 100%); } #ask_btn { background: linear-gradient(135deg, #0f172a 0%, #1d4ed8 100%) !important; color: white !important; border: none !important; } """ PERSON_RE = re.compile(r"\b(?:Dr\.?\s+)?[A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2}\b") CURRENCY_RE = re.compile(r"(?:₹\s*[\d,]+(?:\.\d+)?|\b(?:INR|Rs\.?)\s*[\d,]+(?:\.\d+)?)", re.I) AMOUNT_RE = re.compile(r"\b(?:paid amount|amount paid|total price|price|amount|budget|cost)\b[:\s-]*(₹|INR|Rs\.?)?\s*([\d,]+(?:\.\d+)?)", re.I) VERSION_RE = re.compile(r"\b\d+(?:\.\d+){1,3}\b") DATE_RE = re.compile( r"\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|" r"September|October|November|December)\s+\d{4}\b", flags=re.IGNORECASE, ) NUMBER_RE = re.compile(r"\b\d[\d,]*(?:\.\d+)?\b") ROLE_NOUNS = {"guide", "supervisor", "advisor", "mentor", "approver", "director", "lead", "manager"} DOC_NAME_HINTS = { "project charter": "01_Project_Charter.pdf", "validation master plan": "02_Validation_Master_Plan.pdf", "vmp": "02_Validation_Master_Plan.pdf", "user requirements specification": "03_User_Requirements_Specification.pdf", "urs": "03_User_Requirements_Specification.pdf", "functional requirements specification": "04_Functional_Requirements_Specification.pdf", "frs": "04_Functional_Requirements_Specification.pdf", "risk assessment": "05_Risk_Assessment.pdf", "configuration guide": "06_HP_ALM_Configuration_Guide.pdf", "hp alm configuration guide": "06_HP_ALM_Configuration_Guide.pdf", "iq protocol": "07_IQ_Protocol_and_Report.pdf", "iq report": "07_IQ_Protocol_and_Report.pdf", "oq protocol": "08_OQ_Protocol_and_Report.pdf", "oq report": "08_OQ_Protocol_and_Report.pdf", "data migration plan": "09_Data_Migration_Plan.pdf", "migration plan": "09_Data_Migration_Plan.pdf", "data migration summary": "10_Data_Migration_Summary_Report.pdf", "migration summary": "10_Data_Migration_Summary_Report.pdf", "pq": "11_PQ_UAT_Protocol_and_Report.pdf", "uat": "11_PQ_UAT_Protocol_and_Report.pdf", "validation summary report": "12_Validation_Summary_Report.pdf", "vsr": "12_Validation_Summary_Report.pdf", "traceability matrix": "13_Traceability_Matrix.pdf", "rtm": "13_Traceability_Matrix.pdf", "change control sop": "14_Change_Control_SOP.pdf", "regulatory reference guide": "15_Regulatory_Reference_Guide.pdf", } @dataclass class PageRecord: source_file: str page_num: int text: str lines: List[str] @dataclass class Citation: source_file: str page_num: int line_start: int line_end: int excerpt: str @dataclass class SessionData: session_id: str temp_dir: str pdf_path: str file_name: str file_hash: str file_size_bytes: int page_records: List[PageRecord] chunks: List[Chunk] retriever: "SessionRetriever" agent: AgenticSelfRAG page_count: int extractor: str structured: Dict[str, dict] = field(default_factory=dict) created_at: float = field(default_factory=time.time) last_activity: float = field(default_factory=time.time) question_timestamps: List[float] = field(default_factory=list) @dataclass class QuestionPlan: mode: str expected_type: str expanded_query: str allow_agentic_fallback: bool = True class EmptyRetriever: def __init__(self): self.chunks: List[Chunk] = [] def retrieve(self, query: str, k: int = K_PASSAGES) -> List[Chunk]: return [] class SessionRetriever: def __init__(self, chunks: List[Chunk], encoder: SentenceTransformer): self.chunks = chunks self._encoder = encoder self.index = None self._build_index() def _build_index(self): if not self.chunks: return texts = [f"{chunk.source_file} {chunk.text}" for chunk in self.chunks] embeddings = self._encoder.encode( texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=False, ).astype("float32") self.index = faiss.IndexFlatIP(embeddings.shape[1]) self.index.add(embeddings) def retrieve(self, query: str, k: int = K_PASSAGES) -> List[Chunk]: if self.index is None: return [] query_embedding = self._encoder.encode( [query], convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=False, ).astype("float32") _, indices = self.index.search(query_embedding, min(k, len(self.chunks))) return [self.chunks[i] for i in indices[0] if 0 <= i < len(self.chunks)] SESSIONS: Dict[str, SessionData] = {} SESSIONS_LOCK = threading.Lock() MODEL_LOCK = threading.Lock() EMBEDDER_LOCK = threading.Lock() GLOBAL_EMBEDDER: Optional[SentenceTransformer] = None GLOBAL_AGENT_TEMPLATE: Optional[AgenticSelfRAG] = None def ensure_directories(): UPLOAD_ROOT.mkdir(parents=True, exist_ok=True) LOG_ROOT.mkdir(parents=True, exist_ok=True) def now_ts() -> float: return time.time() def normalize_text(text: str) -> str: text = text.replace("\u2581", " ").replace("\xa0", " ") text = re.sub(r"\s+", " ", text) return text.strip() def pretty_doc_name(file_name: str) -> str: base = file_name.replace(".pdf", "") base = re.sub(r"^\d+_", "", base) return base.replace("_", " ") def content_terms(text: str) -> set: stop = { "a", "an", "the", "is", "are", "was", "were", "be", "been", "being", "do", "does", "did", "have", "has", "had", "how", "what", "when", "where", "why", "which", "who", "whom", "this", "that", "these", "those", "and", "or", "but", "for", "with", "into", "from", "about", "main", "use", "uses", "using", "used", "number", "version", "date", "system", "document", "pdf", "page", "line", "file", "does", "give", } tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]+", text.lower()) return {token for token in tokens if token not in stop and len(token) > 2} def clip_text(text: str, max_chars: int = 320) -> str: text = normalize_text(text) if len(text) <= max_chars: return text clipped = text[:max_chars].rsplit(" ", 1)[0].strip() return clipped + "..." def question_plan(question: str) -> QuestionPlan: q = normalize_text(question).lower() expanded = q mode = "descriptive" expected = "text" allow_agentic_fallback = True if any(cue in q for cue in ["how to", "how do", "how should", "steps", "process", "procedure", "workflow", "manage ", "handling "]): mode = "procedural" expected = "procedure" allow_agentic_fallback = True elif q.startswith("who") or "who is" in q or "who was" in q: mode = "factoid" expected = "person" elif any(cue in q for cue in ["how many", "count", "number of"]): mode = "factoid" expected = "number" elif any(cue in q for cue in ["amount", "paid amount", "price", "cost", "total", "fee"]): mode = "factoid" expected = "currency" elif any(cue in q for cue in ["date", "when", "go-live"]): mode = "factoid" expected = "date" elif "version" in q: mode = "factoid" expected = "version" elif "name of the project" in q or ("project" in q and "name" in q): mode = "factoid" expected = "project_name" elif "name of" in q: mode = "factoid" expected = "name" elif q.startswith("what is") or q.startswith("what was") or q.startswith("what were"): mode = "factoid" expected = "text" if "deviation" in q: expanded += " deviation deviations reviewed review closed closure investigated approved documented" if "guide" in q: expanded += " guide supervisor advisor mentor person name" if expected == "currency": expanded += " amount paid total INR Rs price payment" if expected == "project_name": expanded += " project name document project" if expected == "procedure": expanded += " steps process procedure shall must review close approve" return QuestionPlan( mode=mode, expected_type=expected, expanded_query=expanded, allow_agentic_fallback=allow_agentic_fallback, ) def matched_source_files(question: str) -> List[str]: q = normalize_text(question).lower() matches = [] for hint, file_name in DOC_NAME_HINTS.items(): if hint in q and file_name not in matches: matches.append(file_name) return matches def evidence_has_expected_type(plan: QuestionPlan, sentences: List[str]) -> bool: if not sentences: return False joined = " ".join(sentences) q = plan.expanded_query if plan.expected_type == "person": if PERSON_RE.search(joined): return True if any(role in q for role in ROLE_NOUNS): return False return False if plan.expected_type == "currency": return bool(CURRENCY_RE.search(joined) or AMOUNT_RE.search(joined)) if plan.expected_type == "date": return bool(DATE_RE.search(joined)) if plan.expected_type == "version": return bool(VERSION_RE.search(joined)) if plan.expected_type == "number": return bool(NUMBER_RE.search(joined)) if plan.expected_type == "procedure": return any( token in joined.lower() for token in ["must", "shall", "should", "reviewed", "closed", "approved", "documented", "investigated", "process", "procedure", "steps"] ) return True def append_jsonl(path: Path, payload: dict): ensure_directories() with path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, ensure_ascii=True) + "\n") def log_event(event_type: str, **payload): append_jsonl( EVENT_LOG_PATH, { "timestamp": now_ts(), "event_type": event_type, **payload, }, ) def sha256_file(file_path: str) -> str: digest = hashlib.sha256() with open(file_path, "rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def remove_session(session_id: str): session = SESSIONS.pop(session_id, None) if not session: return try: shutil.rmtree(session.temp_dir, ignore_errors=True) except Exception: pass def cleanup_expired_sessions(): cutoff = now_ts() - SESSION_TTL_SECONDS expired: List[str] = [] with SESSIONS_LOCK: for session_id, session in list(SESSIONS.items()): if session_id == "phase2-corpus": continue if session.last_activity < cutoff: expired.append(session_id) for session_id in expired: remove_session(session_id) for session_id in expired: log_event("session_expired", session_id=session_id) def get_embedder() -> SentenceTransformer: global GLOBAL_EMBEDDER with EMBEDDER_LOCK: if GLOBAL_EMBEDDER is None: GLOBAL_EMBEDDER = SentenceTransformer("all-MiniLM-L6-v2", device="cpu") return GLOBAL_EMBEDDER def get_agent_template() -> AgenticSelfRAG: global GLOBAL_AGENT_TEMPLATE with MODEL_LOCK: if GLOBAL_AGENT_TEMPLATE is None: template = AgenticSelfRAG(EmptyRetriever(), load_in_4bit=LOAD_IN_4BIT) template.load_model() GLOBAL_AGENT_TEMPLATE = template return GLOBAL_AGENT_TEMPLATE def build_session_agent(retriever: SessionRetriever) -> AgenticSelfRAG: template = get_agent_template() agent = AgenticSelfRAG(retriever, load_in_4bit=LOAD_IN_4BIT) agent.pipeline.gen_model = template.pipeline.gen_model agent.pipeline.gen_tokenizer = template.pipeline.gen_tokenizer agent.pipeline._loaded = True agent.pipeline._repair_vocab = None agent._model_loaded = True agent.verif_agent = template.verif_agent agent.qr_agent.pipeline = agent.pipeline agent.corr_agent.pipeline = agent.pipeline return agent def extract_page_records(pdf_path: str, source_file: str) -> Tuple[List[PageRecord], str]: try: import fitz doc = fitz.open(pdf_path) page_records: List[PageRecord] = [] for index, page in enumerate(doc): raw_text = page.get_text("text") or "" raw_lines = [normalize_text(line) for line in raw_text.splitlines()] lines = [line for line in raw_lines if line] text = " ".join(lines).strip() if text: page_records.append(PageRecord(source_file=source_file, page_num=index + 1, text=text, lines=lines)) doc.close() return page_records, "pymupdf" except Exception: pass try: from pypdf import PdfReader reader = PdfReader(pdf_path) page_records = [] for index, page in enumerate(reader.pages): raw_text = page.extract_text() or "" raw_lines = [normalize_text(line) for line in raw_text.splitlines()] lines = [line for line in raw_lines if line] text = " ".join(lines).strip() if text: page_records.append(PageRecord(source_file=source_file, page_num=index + 1, text=text, lines=lines)) return page_records, "pypdf" except Exception as exc: raise RuntimeError( "Could not extract text from a corpus PDF. Please verify the stored project documents are text-based PDFs " "instead of scanned image-only files." ) from exc def chunk_page_records(page_records: List[PageRecord], file_name: str) -> List[Chunk]: chunks: List[Chunk] = [] chunk_index = 0 stride = max(1, CHUNK_WORDS - CHUNK_OVERLAP) for page_record in page_records: words = page_record.text.split() if not words: continue start = 0 while start < len(words): end = min(start + CHUNK_WORDS, len(words)) window = words[start:end] if len(window) > 20: chunks.append( Chunk( chunk_id=f"{file_name}::p{page_record.page_num}::c{chunk_index}", source_file=file_name, page_num=page_record.page_num, text=" ".join(window), char_start=start, ) ) chunk_index += 1 if end == len(words): break start += stride return chunks def validate_pdf(file_path: str) -> Tuple[bool, str]: if not file_path: return False, "Please upload a PDF file." if not file_path.lower().endswith(".pdf"): return False, "Only PDF files are accepted." try: with open(file_path, "rb") as handle: if handle.read(4) != b"%PDF": return False, "The uploaded file does not look like a valid PDF." except OSError: return False, "The uploaded file could not be read." size_bytes = os.path.getsize(file_path) if size_bytes > MAX_FILE_SIZE_MB * 1024 * 1024: return False, f"PDF exceeds the {MAX_FILE_SIZE_MB} MB file-size limit." return True, "" def make_temp_session_dir() -> str: ensure_directories() return tempfile.mkdtemp(prefix="session_", dir=str(UPLOAD_ROOT)) def make_metric_badge(label: str, value: str) -> str: return ( "
" f"
{html.escape(label)}
" f"
{html.escape(value)}
" "
" ) def result_card(title: str, body: str, tone: str = "normal") -> str: palette = { "normal": ("#ffffff", "#0f172a", "#dbe4f0"), "error": ("#fff4f4", "#9f1239", "#fecdd3"), "warn": ("#fff8eb", "#9a3412", "#fed7aa"), "success": ("#f0fdf4", "#166534", "#bbf7d0"), } bg, fg, border = palette[tone] return ( f"
" f"
{html.escape(title)}
" f"
{body}
" ) def initial_result_html() -> str: return result_card( "Ready", "Ask a question about the preloaded Phase 2 project corpus to receive a grounded answer with page and line references.", ) def line_window_score(query: str, answer: str, chunk_text: str, snippet: str) -> float: query_terms = content_terms(query) answer_terms = content_terms(answer) chunk_terms = content_terms(chunk_text) snippet_terms = content_terms(snippet) exact_bonus = 1.0 if normalize_text(answer).lower() in normalize_text(snippet).lower() and answer.strip() else 0.0 overlap = len((query_terms | answer_terms | chunk_terms) & snippet_terms) return exact_bonus + overlap / max(1, len(snippet_terms)) def locate_citation(question: str, answer: str, chunk: Optional[Chunk], session: SessionData) -> Optional[Citation]: if chunk is None: return None page_record = next( ( record for record in session.page_records if record.page_num == chunk.page_num and record.source_file == chunk.source_file ), None, ) if page_record is None or not page_record.lines: return None best_score = -1.0 best_window = (1, 1, page_record.lines[0]) for start_index in range(len(page_record.lines)): for end_index in range(start_index, min(len(page_record.lines), start_index + 4)): excerpt = " ".join(page_record.lines[start_index:end_index + 1]).strip() if not excerpt: continue score = line_window_score(question, answer, chunk.text, excerpt) if score > best_score: best_score = score best_window = (start_index + 1, end_index + 1, excerpt) return Citation( source_file=session.file_name, page_num=chunk.page_num, line_start=best_window[0], line_end=best_window[1], excerpt=best_window[2], ) def locate_citation_by_file_page(question: str, answer: str, session: SessionData, source_file: str, page_num: int) -> Optional[Citation]: page_record = next( ( record for record in session.page_records if record.source_file == source_file and record.page_num == page_num ), None, ) if page_record is None or not page_record.lines: return None best_score = -1.0 best_window = (1, 1, page_record.lines[0]) for start_index in range(len(page_record.lines)): for end_index in range(start_index, min(len(page_record.lines), start_index + 4)): excerpt = " ".join(page_record.lines[start_index:end_index + 1]).strip() if not excerpt: continue score = line_window_score(question, answer, excerpt, excerpt) if score > best_score: best_score = score best_window = (start_index + 1, end_index + 1, excerpt) return Citation( source_file=source_file, page_num=page_num, line_start=best_window[0], line_end=best_window[1], excerpt=best_window[2], ) def best_evidence_sentences( session: SessionData, question: str, plan: QuestionPlan, source_filters: Optional[List[str]] = None, ) -> Tuple[List[Chunk], List[Tuple[Chunk, str]]]: chunks = session.retriever.retrieve(plan.expanded_query, k=min(max(K_PASSAGES * 3, 12), len(session.chunks))) if source_filters: filtered = [chunk for chunk in chunks if chunk.source_file in source_filters] if filtered: chunks = filtered evidence = session.agent.pipeline.select_evidence( plan.expanded_query, chunks, max_sentences=4 if plan.mode in {"procedural", "descriptive"} else 3, ) return chunks, [(item.chunk, item.sentence) for item in evidence] def extract_project_name(sentence: str) -> Optional[str]: match = re.search(r"\bProject\s*[:\-]?\s*([A-Z][A-Za-z0-9 .-]{2,80})", sentence) if not match: return None candidate = match.group(1) candidate = re.split(r"\b(?:Status|System|Version|Document|Approved|Author)\b", candidate)[0].strip(" :-,") return candidate or None def concise_factoid_answer(question: str, plan: QuestionPlan, evidence_pairs: List[Tuple[Chunk, str]]) -> Optional[str]: if not evidence_pairs: return None for _, sentence in evidence_pairs: if any(token in question.lower() for token in ["interval", "duration", "period"]) and re.search(r"\b\d+\s+(?:months?|days?|years?)\b", sentence, re.I): match = re.search(r"\b(\d+\s+(?:months?|days?|years?))\b", sentence, re.I) if match: return normalize_text(match.group(1)) if plan.expected_type == "currency": match = AMOUNT_RE.search(sentence) if match: prefix = (match.group(1) or "INR").replace(".", "") return normalize_text(f"{prefix} {match.group(2)}") match = CURRENCY_RE.search(sentence) if match: return normalize_text(match.group(0)) if plan.expected_type == "date": match = DATE_RE.search(sentence) if match: return normalize_text(match.group(0)) if plan.expected_type == "version": match = VERSION_RE.search(sentence) if match: return normalize_text(match.group(0)) if plan.expected_type == "number": matches = NUMBER_RE.findall(sentence) if matches: return normalize_text(matches[0]) if plan.expected_type == "person": match = PERSON_RE.search(sentence) if match: return normalize_text(match.group(0)) if plan.expected_type in {"project_name", "name"}: candidate = extract_project_name(sentence) if candidate: return normalize_text(candidate) top_sentence = evidence_pairs[0][1] if plan.expected_type == "text": short = clip_text(top_sentence, max_chars=180) if len(short.split()) <= 18: return short return None def metadata_lookup_answer(session: SessionData, question: str, source_filters: List[str]) -> Optional[Tuple[str, str, int]]: q = question.lower() headers = session.structured.get("headers", {}) vmp_table = session.structured.get("vmp_table", {}) if ("author" in q or "approver" in q or "approve" in q or "document id" in q or "version" in q or "approved date" in q) and not source_filters: if "author" in q or "approver" in q or "approve" in q: return ( "Please specify which document you mean, for example Validation Master Plan, Configuration Guide, or Traceability Matrix.", "", 0, ) for file_name in source_filters: pretty = pretty_doc_name(file_name).lower() row = vmp_table.get(pretty) header = headers.get(file_name, {}) if "qa approver" in q and header.get("qa_approver"): return header["qa_approver"], file_name, 1 if "author" in q and header.get("author"): return header["author"], file_name, 1 if ("approve" in q or "approver" in q) and row and row.get("approver"): return row["approver"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2")) if "document id" in q and row and row.get("document_id"): return row["document_id"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2")) if "phase" in q and row and row.get("phase"): return row["phase"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2")) if ("qa approver" in q or "approved date" in q or "version" in q or "system" in q or "status" in q or "project" in q) and header: if "approved date" in q and header.get("approved_date"): return header["approved_date"], file_name, 1 if "version" in q and header.get("version"): return header["version"], file_name, 1 if "system" in q and header.get("system"): return header["system"], file_name, 1 if "status" in q and header.get("status"): return header["status"], file_name, 1 if "project" in q and header.get("project"): return header["project"], file_name, 1 return None def summarize_procedural_answer(evidence_pairs: List[Tuple[Chunk, str]]) -> Optional[str]: if not evidence_pairs: return None sentences: List[str] = [] seen = set() for _, sentence in evidence_pairs: cleaned = clip_text(sentence, max_chars=220) key = cleaned.lower() if key in seen: continue sentences.append(cleaned) seen.add(key) if len(sentences) >= MAX_SUMMARY_SENTENCES: break if not sentences: return None return "Based on the document: " + " ".join(sentences) def answer_relevance_proxy(question: str, answer: str, citation: Optional[Citation]) -> Optional[float]: if not answer or answer.lower().startswith("i don't have enough evidence"): return None query_terms = content_terms(question) support_terms = content_terms(answer) if citation: support_terms |= content_terms(citation.excerpt) if not query_terms: return 1.0 return round(len(query_terms & support_terms) / len(query_terms), 4) def context_precision_proxy(question: str, answer: str, retrieved_chunks: List[Chunk]) -> Optional[float]: if not retrieved_chunks: return None query_terms = content_terms(question) answer_terms = content_terms(answer) relevant = 0 for chunk in retrieved_chunks: chunk_terms = content_terms(chunk.text) if query_terms & chunk_terms or answer_terms & chunk_terms: relevant += 1 return round(relevant / len(retrieved_chunks), 4) def faithfulness_proxy(answer: str, citation: Optional[Citation]) -> Optional[float]: if not answer or not citation: return None answer_norm = normalize_text(answer).lower() excerpt_norm = normalize_text(citation.excerpt).lower() if answer_norm and answer_norm in excerpt_norm: return 1.0 answer_terms = content_terms(answer) excerpt_terms = content_terms(citation.excerpt) if not answer_terms: return 1.0 if answer_norm and answer_norm in excerpt_norm else 0.0 return round(len(answer_terms & excerpt_terms) / len(answer_terms), 4) def citation_html(citation: Optional[Citation]) -> str: if citation is None: return "No supporting citation was selected." line_label = ( f"line {citation.line_start}" if citation.line_start == citation.line_end else f"lines {citation.line_start}-{citation.line_end}" ) return ( f"
{html.escape(citation.source_file)}
" f"
" f"Page {citation.page_num}, {line_label} (extracted text)
" f"
{html.escape(citation.excerpt)}
" ) def render_result( question: str, answer: str, citation: Optional[Citation], metrics: Dict[str, Optional[float]], abstained: bool, ) -> str: answer_card = result_card( "Final Answer", html.escape(answer), tone="warn" if abstained else "normal", ) source_card = result_card( "Source Reference", citation_html(citation), tone="warn" if abstained else "success", ) metric_boxes = "".join( [ make_metric_badge("Latency", f"{metrics['latency_seconds']:.2f}s"), make_metric_badge( "Answer Relevance", "N/A" if metrics.get("answer_relevance") is None else f"{metrics['answer_relevance']:.2f}", ), make_metric_badge( "Context Precision", "N/A" if metrics.get("context_precision") is None else f"{metrics['context_precision']:.2f}", ), make_metric_badge( "Faithfulness", "N/A" if metrics.get("faithfulness_proxy") is None else f"{metrics['faithfulness_proxy']:.2f}", ), make_metric_badge("Hallucination", f"{metrics.get('hallucination_rate', 0.0):.2f}"), ] ) question_html = ( "
" "
Question
" f"
{html.escape(question)}
" "
" ) return ( "
" f"{question_html}" "
" f"{answer_card}{source_card}" f"
{metric_boxes}
" "
" ) def render_document_status(session: SessionData) -> str: doc_count = len({record.source_file for record in session.page_records}) return result_card( "Corpus Loaded", ( f"{html.escape(session.file_name)}
" f"Documents indexed: {doc_count}
" f"Pages indexed: {session.page_count}
" f"Chunks indexed: {len(session.chunks)}
" f"Extractor used: {html.escape(session.extractor)}
" f"Knowledge base mode: preloaded project corpus" ), tone="success", ) def error_html(message: str) -> str: return result_card("Action Required", html.escape(message), tone="error") def info_html(message: str) -> str: return result_card("Notice", html.escape(message), tone="warn") def build_session(file_path: str) -> SessionData: is_valid, validation_message = validate_pdf(file_path) if not is_valid: raise ValueError(validation_message) temp_dir = make_temp_session_dir() try: file_name = os.path.basename(file_path) dest_path = os.path.join(temp_dir, file_name) shutil.copy2(file_path, dest_path) page_records, extractor_name = extract_page_records(dest_path, file_name) if not page_records: raise ValueError( "No extractable text was found in the PDF. Please upload a text-based PDF." ) if len(page_records) > MAX_PAGES: raise ValueError( f"PDF has {len(page_records)} pages, which exceeds the {MAX_PAGES}-page limit." ) chunks = chunk_page_records(page_records, file_name) if not chunks: raise ValueError( "The extracted PDF text did not produce enough content to index." ) if len(chunks) > MAX_CHUNKS: raise ValueError( f"PDF produced {len(chunks)} chunks, which exceeds the {MAX_CHUNKS}-chunk limit." ) file_hash = sha256_file(dest_path) file_size_bytes = os.path.getsize(dest_path) retriever = SessionRetriever(chunks, get_embedder()) agent = build_session_agent(retriever) return SessionData( session_id=str(uuid.uuid4()), temp_dir=temp_dir, pdf_path=dest_path, file_name=file_name, file_hash=file_hash, file_size_bytes=file_size_bytes, page_records=page_records, chunks=chunks, retriever=retriever, agent=agent, page_count=len(page_records), extractor=extractor_name, structured={}, ) except Exception: shutil.rmtree(temp_dir, ignore_errors=True) raise def parse_header_metadata(page_records: List[PageRecord]) -> Dict[str, str]: if not page_records: return {} lines = page_records[0].lines metadata: Dict[str, str] = {} title_lines: List[str] = [] i = 0 while i < len(lines): line = lines[i] if re.match(r"^\d+\.", line): break if line.endswith(":") and i + 1 < len(lines): key = line[:-1].strip().lower().replace(" ", "_") metadata[key] = lines[i + 1].strip() i += 2 continue title_lines.append(line) i += 1 metadata["header_text"] = " ".join(lines[: min(len(lines), 20)]) metadata["title"] = " | ".join(title_lines[:3]) return metadata def parse_vmp_table(page_records: List[PageRecord]) -> Dict[str, Dict[str, str]]: known_docs = { "Project Charter", "Validation Master Plan", "User Requirements Specification", "Functional Requirements Specification", "Risk Assessment", "HP ALM Configuration Guide", "IQ Protocol", "IQ Execution Report", "OQ Protocol", "OQ Execution Report", "PQ/UAT Protocol and Report", "Data Migration Plan", "Data Migration Summary Report", "Validation Summary Report", "Traceability Matrix", "Change Control SOP", } rows: Dict[str, Dict[str, str]] = {} all_lines: List[Tuple[int, str]] = [] for record in page_records: for line in record.lines: all_lines.append((record.page_num, line)) collecting = False idx = 0 while idx < len(all_lines): page_num, line = all_lines[idx] if line == "Document": collecting = True idx += 5 continue if not collecting: idx += 1 continue if line.startswith("4. Roles and Responsibilities"): break if line in known_docs and idx + 4 < len(all_lines): _, doc_id = all_lines[idx + 1] _, phase = all_lines[idx + 2] _, author = all_lines[idx + 3] _, approver = all_lines[idx + 4] rows[line.lower()] = { "document": line, "document_id": doc_id, "phase": phase, "author": author, "approver": approver, "page_num": str(page_num), } idx += 5 continue idx += 1 if not rows: log_event( "vmp_table_parse_empty", source_file="02_Validation_Master_Plan.pdf", page_count=len(page_records), ) return rows def corpus_pdf_files(candidate: Path) -> List[Path]: if not candidate.exists() or not candidate.is_dir(): return [] pdfs = sorted(p for p in candidate.glob("*.pdf") if p.is_file()) numbered = [p for p in pdfs if re.match(r"^\d{2}_.+\.pdf$", p.name)] required = { "01_Project_Charter.pdf", "02_Validation_Master_Plan.pdf", "15_Regulatory_Reference_Guide.pdf", } names = {p.name for p in numbered} if required.issubset(names): return numbered if len(numbered) >= 10: return numbered return [] def resolve_corpus_dir() -> Path: for candidate in CORPUS_CANDIDATES: if corpus_pdf_files(candidate): return candidate raise FileNotFoundError( "Phase 2 corpus not found. Upload the 15 PDF files either into a phase2_corpus folder in the app repo or at the repo root." ) def build_corpus_session() -> SessionData: corpus_dir = resolve_corpus_dir() pdf_paths = corpus_pdf_files(corpus_dir) page_records: List[PageRecord] = [] chunks: List[Chunk] = [] structured: Dict[str, dict] = {"headers": {}, "vmp_table": {}, "corpus_dir": str(corpus_dir)} extractors = set() file_hash_parts: List[str] = [] for pdf_path in pdf_paths: file_name = pdf_path.name doc_pages, extractor_name = extract_page_records(str(pdf_path), file_name) extractors.add(extractor_name) page_records.extend(doc_pages) chunks.extend(chunk_page_records(doc_pages, file_name)) structured["headers"][file_name] = parse_header_metadata(doc_pages) file_hash_parts.append(f"{file_name}:{pdf_path.stat().st_size}:{int(pdf_path.stat().st_mtime)}") if file_name == "02_Validation_Master_Plan.pdf": structured["vmp_table"] = parse_vmp_table(doc_pages) retriever = SessionRetriever(chunks, get_embedder()) agent = build_session_agent(retriever) corpus_hash = hashlib.sha256("|".join(file_hash_parts).encode("utf-8")).hexdigest() return SessionData( session_id="phase2-corpus", temp_dir="", pdf_path=str(corpus_dir), file_name="Phase 2 corpus", file_hash=corpus_hash, file_size_bytes=0, page_records=page_records, chunks=chunks, retriever=retriever, agent=agent, page_count=len({(record.source_file, record.page_num) for record in page_records}), extractor=" / ".join(sorted(extractors)), structured=structured, ) def handle_upload(file_obj, current_session_id: Optional[str]): cleanup_expired_sessions() if current_session_id: with SESSIONS_LOCK: remove_session(current_session_id) file_path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None) if not file_path: return None, info_html("Upload one PDF file to start a testing session."), initial_result_html(), None, "" try: session = build_session(file_path) with SESSIONS_LOCK: SESSIONS[session.session_id] = session log_event( "upload_success", session_id=session.session_id, file_hash=session.file_hash, file_name=session.file_name, page_count=session.page_count, chunk_count=len(session.chunks), file_size_bytes=session.file_size_bytes, extractor=session.extractor, ) return session.session_id, render_document_status(session), initial_result_html(), None, "" except Exception as exc: log_event("upload_rejected", reason=str(exc), file_name=os.path.basename(file_path)) return None, error_html(str(exc)), initial_result_html(), None, "" def get_session(session_id: Optional[str]) -> Optional[SessionData]: if not session_id: return None with SESSIONS_LOCK: session = SESSIONS.get(session_id) return session def check_rate_limit(session: SessionData) -> Optional[str]: now = now_ts() window_start = now - 60 session.question_timestamps = [ts for ts in session.question_timestamps if ts >= window_start] if len(session.question_timestamps) >= MAX_QUESTIONS_PER_MINUTE: return ( f"Rate limit reached. Please wait before asking more than " f"{MAX_QUESTIONS_PER_MINUTE} questions per minute in one session." ) session.question_timestamps.append(now) return None def build_question_metrics(question: str, answer: str, citation: Optional[Citation], retrieved_chunks: List[Chunk], hallucination_rate: float, latency_seconds: float) -> Dict[str, Optional[float]]: return { "latency_seconds": latency_seconds, "answer_relevance": answer_relevance_proxy(question, answer, citation), "context_precision": context_precision_proxy(question, answer, retrieved_chunks), "faithfulness_proxy": faithfulness_proxy(answer, citation), "hallucination_rate": hallucination_rate, } def ask_question(question: str, session_id: Optional[str]): cleanup_expired_sessions() question = (question or "").strip() if not question: return info_html("Enter a question to query the preloaded project corpus."), None, "" session = get_session(session_id) if session is None: return error_html("The preloaded corpus is not available right now. Please reload the app."), None, "" session.last_activity = now_ts() rate_limit_message = check_rate_limit(session) if rate_limit_message: log_event("rate_limited", session_id=session.session_id, question=question) return error_html(rate_limit_message), None, "" plan = question_plan(question) source_filters = matched_source_files(question) metadata_hit = metadata_lookup_answer(session, question, source_filters) if metadata_hit: answer_text, source_file, source_page = metadata_hit abstained = False hallucination_rate = 0.0 latency_seconds = 0.0 citation = ( locate_citation_by_file_page(question, answer_text, session, source_file, source_page) if source_file else None ) metrics = build_question_metrics(question, answer_text, citation, [], hallucination_rate, latency_seconds) result_html = render_result(question, answer_text, citation, metrics, abstained=False) response_state = { "session_id": session.session_id, "question": question, "answer": answer_text, "abstained": False, "source_file": citation.source_file if citation else None, "page_num": citation.page_num if citation else None, "line_start": citation.line_start if citation else None, "line_end": citation.line_end if citation else None, "excerpt": citation.excerpt if citation else None, "metrics": metrics, "file_hash": session.file_hash, "route_mode": "structured", "expected_type": plan.expected_type, } append_jsonl( INTERACTION_LOG_PATH, { "timestamp": now_ts(), "session_id": session.session_id, "file_hash": session.file_hash, "file_name": session.file_name, "page_count": session.page_count, "question": question, "answer": answer_text, "abstained": False, "source_file": citation.source_file if citation else None, "page_num": citation.page_num if citation else None, "line_start": citation.line_start if citation else None, "line_end": citation.line_end if citation else None, "excerpt": citation.excerpt if citation else None, "latency_seconds": latency_seconds, "route_mode": "structured", "expected_type": plan.expected_type, "answer_relevance": metrics["answer_relevance"], "context_precision": metrics["context_precision"], "faithfulness_proxy": metrics["faithfulness_proxy"], "hallucination_rate": metrics["hallucination_rate"], }, ) return result_html, response_state, "" retrieved_chunks, evidence_pairs = best_evidence_sentences(session, question, plan, source_filters=source_filters) evidence_sentences = [sentence for _, sentence in evidence_pairs] start = time.perf_counter() output = None best_chunk = evidence_pairs[0][0] if evidence_pairs else None answer_text: str abstained = False hallucination_rate = 0.0 def run_agentic_fallback() -> Tuple[str, bool, float, Optional[Chunk], Optional[str]]: nonlocal output, best_chunk try: output = session.agent.run(question) best_chunk = output.best_chunk or best_chunk answer = ( "I don't have enough evidence in the project corpus to answer that reliably." if output.abstained else (output.answer or "No answer produced.") ) return answer, output.abstained, output.hallucination_rate or 0.0, best_chunk, None except Exception as exc: log_event("inference_failed", session_id=session.session_id, question=question, error=str(exc)) return "", False, 0.0, best_chunk, str(exc) if not evidence_pairs: abstained = True answer_text = "I don't have enough evidence in the project corpus to answer that reliably." elif plan.mode in {"procedural", "descriptive"}: summary_answer = summarize_procedural_answer(evidence_pairs) if evidence_has_expected_type(plan, evidence_sentences) else None if summary_answer: answer_text = summary_answer abstained = False else: if plan.allow_agentic_fallback: answer_text, abstained, hallucination_rate, best_chunk, inference_error = run_agentic_fallback() if inference_error: return error_html(f"Inference failed: {inference_error}"), None, "" else: abstained = True answer_text = "I don't have enough evidence in the project corpus to answer that reliably." else: concise_answer = concise_factoid_answer(question, plan, evidence_pairs) if concise_answer and evidence_has_expected_type(plan, evidence_sentences): answer_text = concise_answer elif not evidence_has_expected_type(plan, evidence_sentences): abstained = True answer_text = "I don't have enough evidence in the project corpus to answer that reliably." elif plan.allow_agentic_fallback: answer_text, abstained, hallucination_rate, best_chunk, inference_error = run_agentic_fallback() if inference_error: return error_html(f"Inference failed: {inference_error}"), None, "" else: abstained = True answer_text = "I don't have enough evidence in the project corpus to answer that reliably." latency_seconds = time.perf_counter() - start citation = locate_citation(question, answer_text, best_chunk, session) metrics = build_question_metrics( question, answer_text, citation, retrieved_chunks, hallucination_rate, latency_seconds, ) result_html = render_result( question=question, answer=answer_text, citation=citation, metrics=metrics, abstained=abstained, ) response_state = { "session_id": session.session_id, "question": question, "answer": answer_text, "abstained": abstained, "source_file": citation.source_file if citation else None, "page_num": citation.page_num if citation else None, "line_start": citation.line_start if citation else None, "line_end": citation.line_end if citation else None, "excerpt": citation.excerpt if citation else None, "metrics": metrics, "file_hash": session.file_hash, "route_mode": plan.mode, "expected_type": plan.expected_type, } append_jsonl( INTERACTION_LOG_PATH, { "timestamp": now_ts(), "session_id": session.session_id, "file_hash": session.file_hash, "file_name": session.file_name, "page_count": session.page_count, "question": question, "answer": answer_text, "abstained": abstained, "source_file": citation.source_file if citation else None, "page_num": citation.page_num if citation else None, "line_start": citation.line_start if citation else None, "line_end": citation.line_end if citation else None, "excerpt": citation.excerpt if citation else None, "latency_seconds": latency_seconds, "route_mode": plan.mode, "expected_type": plan.expected_type, "answer_relevance": metrics["answer_relevance"], "context_precision": metrics["context_precision"], "faithfulness_proxy": metrics["faithfulness_proxy"], "hallucination_rate": metrics["hallucination_rate"], }, ) return result_html, response_state, "" def submit_feedback(response_state: Optional[dict], vote: str): if not response_state: return "Ask a question first, then rate the answer." log_event( "feedback", session_id=response_state.get("session_id"), file_hash=response_state.get("file_hash"), question=response_state.get("question"), vote=vote, source_file=response_state.get("source_file"), page_num=response_state.get("page_num"), line_start=response_state.get("line_start"), line_end=response_state.get("line_end"), ) if vote == "helpful": return "Thanks. Your feedback was recorded as helpful." return "Thanks. Your feedback was recorded for review." ensure_directories() try: PRELOADED_SESSION = build_corpus_session() with SESSIONS_LOCK: SESSIONS[PRELOADED_SESSION.session_id] = PRELOADED_SESSION PRELOADED_STATUS_HTML = render_document_status(PRELOADED_SESSION) STARTUP_NOTICE = "" except Exception as exc: PRELOADED_SESSION = None PRELOADED_STATUS_HTML = error_html(str(exc)) STARTUP_NOTICE = str(exc) with gr.Blocks(css=CSS) as demo: session_state = gr.State(PRELOADED_SESSION.session_id if PRELOADED_SESSION else None) response_state = gr.State(None) gr.Markdown( f""" # {APP_NAME} {APP_TAGLINE} **Project corpus mode** - Preloaded Phase 2 project documents - Optimized for this fixed validation corpus - Best for onboarding, project lookup, and validation Q&A - Rate limit: **{MAX_QUESTIONS_PER_MINUTE} questions per minute** """ ) gr.Markdown( f"""
Privacy notice
{html.escape(PRIVACY_NOTICE)}
""" ) if STARTUP_NOTICE: gr.HTML(error_html(STARTUP_NOTICE)) document_status = gr.HTML(PRELOADED_STATUS_HTML) with gr.Row(): question_box = gr.Textbox( label="Your Question", lines=2, placeholder="Ask a question about the preloaded project documents...", scale=5, ) ask_btn = gr.Button("Ask", elem_id="ask_btn", scale=1) result_html_component = gr.HTML(initial_result_html()) with gr.Row(): helpful_btn = gr.Button("Helpful") needs_work_btn = gr.Button("Needs Improvement") feedback_status = gr.Textbox( label="Feedback Status", value="", interactive=False, ) with gr.Accordion("Testing guardrails and privacy details", open=False): gr.Markdown( f""" - **Knowledge base**: 15 preloaded Phase 2 project PDFs - **Citation format**: page and extracted-text line range - **Queueing**: concurrency limit {QUEUE_CONCURRENCY}, queue size {QUEUE_MAX_SIZE} - **Logged for evaluation**: corpus question, answer, citation, latency, and proxy metrics """ ) ask_btn.click( ask_question, inputs=[question_box, session_state], outputs=[result_html_component, response_state, feedback_status], ) question_box.submit( ask_question, inputs=[question_box, session_state], outputs=[result_html_component, response_state, feedback_status], ) helpful_btn.click( lambda state: submit_feedback(state, "helpful"), inputs=[response_state], outputs=[feedback_status], ) needs_work_btn.click( lambda state: submit_feedback(state, "needs_improvement"), inputs=[response_state], outputs=[feedback_status], ) if __name__ == "__main__": ensure_directories() cleanup_expired_sessions() demo.queue(default_concurrency_limit=QUEUE_CONCURRENCY, max_size=QUEUE_MAX_SIZE).launch()