import hashlib
import html
import json
import os
import re
import shutil
import tempfile
import threading
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import faiss
import gradio as gr
from sentence_transformers import SentenceTransformer
from extracted_phase2_core import AgenticSelfRAG, Chunk, K_PASSAGES
APP_NAME = "SourceTruth"
APP_TAGLINE = "Ask grounded questions over the preloaded Phase 2 project corpus and inspect cited evidence."
APP_ROOT = Path(__file__).resolve().parent
UPLOAD_ROOT = APP_ROOT / "testing_uploads"
LOG_ROOT = APP_ROOT / "testing_logs"
EVENT_LOG_PATH = LOG_ROOT / "events.jsonl"
INTERACTION_LOG_PATH = LOG_ROOT / "interactions.jsonl"
CORPUS_CANDIDATES = [
APP_ROOT / "phase2_corpus",
APP_ROOT / "phase 2 corpus",
APP_ROOT,
]
LOCAL_CORPUS_DIR = os.getenv("LOCAL_CORPUS_DIR", "").strip()
if LOCAL_CORPUS_DIR:
CORPUS_CANDIDATES.append(Path(LOCAL_CORPUS_DIR).expanduser())
MAX_FILE_SIZE_MB = int(os.getenv("MAX_FILE_SIZE_MB", "20"))
MAX_PAGES = int(os.getenv("MAX_PAGES", "75"))
MAX_CHUNKS = int(os.getenv("MAX_CHUNKS", "250"))
CHUNK_WORDS = int(os.getenv("CHUNK_WORDS", "300"))
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "50"))
SESSION_TTL_SECONDS = int(os.getenv("SESSION_TTL_SECONDS", str(30 * 60)))
MAX_QUESTIONS_PER_MINUTE = int(os.getenv("MAX_QUESTIONS_PER_MINUTE", "8"))
QUEUE_CONCURRENCY = int(os.getenv("QUEUE_CONCURRENCY", "2"))
QUEUE_MAX_SIZE = int(os.getenv("QUEUE_MAX_SIZE", "20"))
LOAD_IN_4BIT = os.getenv("LOAD_IN_4BIT", "0") == "1"
MAX_SUMMARY_SENTENCES = int(os.getenv("MAX_SUMMARY_SENTENCES", "3"))
PRIVACY_NOTICE = (
"The preloaded project PDFs are processed only to answer your questions and produce citations. "
"Documents are not used to train models. Interaction logs may store the question, answer, citation, "
"and proxy evaluation metrics for testing analysis. Avoid using the application for confidential, "
"personal, medical, or legal decisions without direct document verification."
)
CSS = """
.gradio-container {
background:
radial-gradient(circle at top left, rgba(59,130,246,0.08), transparent 28%),
radial-gradient(circle at top right, rgba(16,185,129,0.08), transparent 22%),
linear-gradient(180deg, #f8fbff 0%, #f4f7fb 100%);
}
#ask_btn {
background: linear-gradient(135deg, #0f172a 0%, #1d4ed8 100%) !important;
color: white !important;
border: none !important;
}
"""
PERSON_RE = re.compile(r"\b(?:Dr\.?\s+)?[A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2}\b")
CURRENCY_RE = re.compile(r"(?:₹\s*[\d,]+(?:\.\d+)?|\b(?:INR|Rs\.?)\s*[\d,]+(?:\.\d+)?)", re.I)
AMOUNT_RE = re.compile(r"\b(?:paid amount|amount paid|total price|price|amount|budget|cost)\b[:\s-]*(₹|INR|Rs\.?)?\s*([\d,]+(?:\.\d+)?)", re.I)
VERSION_RE = re.compile(r"\b\d+(?:\.\d+){1,3}\b")
DATE_RE = re.compile(
r"\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|"
r"September|October|November|December)\s+\d{4}\b",
flags=re.IGNORECASE,
)
NUMBER_RE = re.compile(r"\b\d[\d,]*(?:\.\d+)?\b")
ROLE_NOUNS = {"guide", "supervisor", "advisor", "mentor", "approver", "director", "lead", "manager"}
DOC_NAME_HINTS = {
"project charter": "01_Project_Charter.pdf",
"validation master plan": "02_Validation_Master_Plan.pdf",
"vmp": "02_Validation_Master_Plan.pdf",
"user requirements specification": "03_User_Requirements_Specification.pdf",
"urs": "03_User_Requirements_Specification.pdf",
"functional requirements specification": "04_Functional_Requirements_Specification.pdf",
"frs": "04_Functional_Requirements_Specification.pdf",
"risk assessment": "05_Risk_Assessment.pdf",
"configuration guide": "06_HP_ALM_Configuration_Guide.pdf",
"hp alm configuration guide": "06_HP_ALM_Configuration_Guide.pdf",
"iq protocol": "07_IQ_Protocol_and_Report.pdf",
"iq report": "07_IQ_Protocol_and_Report.pdf",
"oq protocol": "08_OQ_Protocol_and_Report.pdf",
"oq report": "08_OQ_Protocol_and_Report.pdf",
"data migration plan": "09_Data_Migration_Plan.pdf",
"migration plan": "09_Data_Migration_Plan.pdf",
"data migration summary": "10_Data_Migration_Summary_Report.pdf",
"migration summary": "10_Data_Migration_Summary_Report.pdf",
"pq": "11_PQ_UAT_Protocol_and_Report.pdf",
"uat": "11_PQ_UAT_Protocol_and_Report.pdf",
"validation summary report": "12_Validation_Summary_Report.pdf",
"vsr": "12_Validation_Summary_Report.pdf",
"traceability matrix": "13_Traceability_Matrix.pdf",
"rtm": "13_Traceability_Matrix.pdf",
"change control sop": "14_Change_Control_SOP.pdf",
"regulatory reference guide": "15_Regulatory_Reference_Guide.pdf",
}
@dataclass
class PageRecord:
source_file: str
page_num: int
text: str
lines: List[str]
@dataclass
class Citation:
source_file: str
page_num: int
line_start: int
line_end: int
excerpt: str
@dataclass
class SessionData:
session_id: str
temp_dir: str
pdf_path: str
file_name: str
file_hash: str
file_size_bytes: int
page_records: List[PageRecord]
chunks: List[Chunk]
retriever: "SessionRetriever"
agent: AgenticSelfRAG
page_count: int
extractor: str
structured: Dict[str, dict] = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
last_activity: float = field(default_factory=time.time)
question_timestamps: List[float] = field(default_factory=list)
@dataclass
class QuestionPlan:
mode: str
expected_type: str
expanded_query: str
allow_agentic_fallback: bool = True
class EmptyRetriever:
def __init__(self):
self.chunks: List[Chunk] = []
def retrieve(self, query: str, k: int = K_PASSAGES) -> List[Chunk]:
return []
class SessionRetriever:
def __init__(self, chunks: List[Chunk], encoder: SentenceTransformer):
self.chunks = chunks
self._encoder = encoder
self.index = None
self._build_index()
def _build_index(self):
if not self.chunks:
return
texts = [f"{chunk.source_file} {chunk.text}" for chunk in self.chunks]
embeddings = self._encoder.encode(
texts,
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=False,
).astype("float32")
self.index = faiss.IndexFlatIP(embeddings.shape[1])
self.index.add(embeddings)
def retrieve(self, query: str, k: int = K_PASSAGES) -> List[Chunk]:
if self.index is None:
return []
query_embedding = self._encoder.encode(
[query],
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=False,
).astype("float32")
_, indices = self.index.search(query_embedding, min(k, len(self.chunks)))
return [self.chunks[i] for i in indices[0] if 0 <= i < len(self.chunks)]
SESSIONS: Dict[str, SessionData] = {}
SESSIONS_LOCK = threading.Lock()
MODEL_LOCK = threading.Lock()
EMBEDDER_LOCK = threading.Lock()
GLOBAL_EMBEDDER: Optional[SentenceTransformer] = None
GLOBAL_AGENT_TEMPLATE: Optional[AgenticSelfRAG] = None
def ensure_directories():
UPLOAD_ROOT.mkdir(parents=True, exist_ok=True)
LOG_ROOT.mkdir(parents=True, exist_ok=True)
def now_ts() -> float:
return time.time()
def normalize_text(text: str) -> str:
text = text.replace("\u2581", " ").replace("\xa0", " ")
text = re.sub(r"\s+", " ", text)
return text.strip()
def pretty_doc_name(file_name: str) -> str:
base = file_name.replace(".pdf", "")
base = re.sub(r"^\d+_", "", base)
return base.replace("_", " ")
def content_terms(text: str) -> set:
stop = {
"a", "an", "the", "is", "are", "was", "were", "be", "been", "being",
"do", "does", "did", "have", "has", "had", "how", "what", "when",
"where", "why", "which", "who", "whom", "this", "that", "these",
"those", "and", "or", "but", "for", "with", "into", "from", "about",
"main", "use", "uses", "using", "used", "number", "version", "date",
"system", "document", "pdf", "page", "line", "file", "does", "give",
}
tokens = re.findall(r"[A-Za-z][A-Za-z0-9_-]+", text.lower())
return {token for token in tokens if token not in stop and len(token) > 2}
def clip_text(text: str, max_chars: int = 320) -> str:
text = normalize_text(text)
if len(text) <= max_chars:
return text
clipped = text[:max_chars].rsplit(" ", 1)[0].strip()
return clipped + "..."
def question_plan(question: str) -> QuestionPlan:
q = normalize_text(question).lower()
expanded = q
mode = "descriptive"
expected = "text"
allow_agentic_fallback = True
if any(cue in q for cue in ["how to", "how do", "how should", "steps", "process", "procedure", "workflow", "manage ", "handling "]):
mode = "procedural"
expected = "procedure"
allow_agentic_fallback = True
elif q.startswith("who") or "who is" in q or "who was" in q:
mode = "factoid"
expected = "person"
elif any(cue in q for cue in ["how many", "count", "number of"]):
mode = "factoid"
expected = "number"
elif any(cue in q for cue in ["amount", "paid amount", "price", "cost", "total", "fee"]):
mode = "factoid"
expected = "currency"
elif any(cue in q for cue in ["date", "when", "go-live"]):
mode = "factoid"
expected = "date"
elif "version" in q:
mode = "factoid"
expected = "version"
elif "name of the project" in q or ("project" in q and "name" in q):
mode = "factoid"
expected = "project_name"
elif "name of" in q:
mode = "factoid"
expected = "name"
elif q.startswith("what is") or q.startswith("what was") or q.startswith("what were"):
mode = "factoid"
expected = "text"
if "deviation" in q:
expanded += " deviation deviations reviewed review closed closure investigated approved documented"
if "guide" in q:
expanded += " guide supervisor advisor mentor person name"
if expected == "currency":
expanded += " amount paid total INR Rs price payment"
if expected == "project_name":
expanded += " project name document project"
if expected == "procedure":
expanded += " steps process procedure shall must review close approve"
return QuestionPlan(
mode=mode,
expected_type=expected,
expanded_query=expanded,
allow_agentic_fallback=allow_agentic_fallback,
)
def matched_source_files(question: str) -> List[str]:
q = normalize_text(question).lower()
matches = []
for hint, file_name in DOC_NAME_HINTS.items():
if hint in q and file_name not in matches:
matches.append(file_name)
return matches
def evidence_has_expected_type(plan: QuestionPlan, sentences: List[str]) -> bool:
if not sentences:
return False
joined = " ".join(sentences)
q = plan.expanded_query
if plan.expected_type == "person":
if PERSON_RE.search(joined):
return True
if any(role in q for role in ROLE_NOUNS):
return False
return False
if plan.expected_type == "currency":
return bool(CURRENCY_RE.search(joined) or AMOUNT_RE.search(joined))
if plan.expected_type == "date":
return bool(DATE_RE.search(joined))
if plan.expected_type == "version":
return bool(VERSION_RE.search(joined))
if plan.expected_type == "number":
return bool(NUMBER_RE.search(joined))
if plan.expected_type == "procedure":
return any(
token in joined.lower()
for token in ["must", "shall", "should", "reviewed", "closed", "approved", "documented", "investigated", "process", "procedure", "steps"]
)
return True
def append_jsonl(path: Path, payload: dict):
ensure_directories()
with path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, ensure_ascii=True) + "\n")
def log_event(event_type: str, **payload):
append_jsonl(
EVENT_LOG_PATH,
{
"timestamp": now_ts(),
"event_type": event_type,
**payload,
},
)
def sha256_file(file_path: str) -> str:
digest = hashlib.sha256()
with open(file_path, "rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def remove_session(session_id: str):
session = SESSIONS.pop(session_id, None)
if not session:
return
try:
shutil.rmtree(session.temp_dir, ignore_errors=True)
except Exception:
pass
def cleanup_expired_sessions():
cutoff = now_ts() - SESSION_TTL_SECONDS
expired: List[str] = []
with SESSIONS_LOCK:
for session_id, session in list(SESSIONS.items()):
if session_id == "phase2-corpus":
continue
if session.last_activity < cutoff:
expired.append(session_id)
for session_id in expired:
remove_session(session_id)
for session_id in expired:
log_event("session_expired", session_id=session_id)
def get_embedder() -> SentenceTransformer:
global GLOBAL_EMBEDDER
with EMBEDDER_LOCK:
if GLOBAL_EMBEDDER is None:
GLOBAL_EMBEDDER = SentenceTransformer("all-MiniLM-L6-v2", device="cpu")
return GLOBAL_EMBEDDER
def get_agent_template() -> AgenticSelfRAG:
global GLOBAL_AGENT_TEMPLATE
with MODEL_LOCK:
if GLOBAL_AGENT_TEMPLATE is None:
template = AgenticSelfRAG(EmptyRetriever(), load_in_4bit=LOAD_IN_4BIT)
template.load_model()
GLOBAL_AGENT_TEMPLATE = template
return GLOBAL_AGENT_TEMPLATE
def build_session_agent(retriever: SessionRetriever) -> AgenticSelfRAG:
template = get_agent_template()
agent = AgenticSelfRAG(retriever, load_in_4bit=LOAD_IN_4BIT)
agent.pipeline.gen_model = template.pipeline.gen_model
agent.pipeline.gen_tokenizer = template.pipeline.gen_tokenizer
agent.pipeline._loaded = True
agent.pipeline._repair_vocab = None
agent._model_loaded = True
agent.verif_agent = template.verif_agent
agent.qr_agent.pipeline = agent.pipeline
agent.corr_agent.pipeline = agent.pipeline
return agent
def extract_page_records(pdf_path: str, source_file: str) -> Tuple[List[PageRecord], str]:
try:
import fitz
doc = fitz.open(pdf_path)
page_records: List[PageRecord] = []
for index, page in enumerate(doc):
raw_text = page.get_text("text") or ""
raw_lines = [normalize_text(line) for line in raw_text.splitlines()]
lines = [line for line in raw_lines if line]
text = " ".join(lines).strip()
if text:
page_records.append(PageRecord(source_file=source_file, page_num=index + 1, text=text, lines=lines))
doc.close()
return page_records, "pymupdf"
except Exception:
pass
try:
from pypdf import PdfReader
reader = PdfReader(pdf_path)
page_records = []
for index, page in enumerate(reader.pages):
raw_text = page.extract_text() or ""
raw_lines = [normalize_text(line) for line in raw_text.splitlines()]
lines = [line for line in raw_lines if line]
text = " ".join(lines).strip()
if text:
page_records.append(PageRecord(source_file=source_file, page_num=index + 1, text=text, lines=lines))
return page_records, "pypdf"
except Exception as exc:
raise RuntimeError(
"Could not extract text from a corpus PDF. Please verify the stored project documents are text-based PDFs "
"instead of scanned image-only files."
) from exc
def chunk_page_records(page_records: List[PageRecord], file_name: str) -> List[Chunk]:
chunks: List[Chunk] = []
chunk_index = 0
stride = max(1, CHUNK_WORDS - CHUNK_OVERLAP)
for page_record in page_records:
words = page_record.text.split()
if not words:
continue
start = 0
while start < len(words):
end = min(start + CHUNK_WORDS, len(words))
window = words[start:end]
if len(window) > 20:
chunks.append(
Chunk(
chunk_id=f"{file_name}::p{page_record.page_num}::c{chunk_index}",
source_file=file_name,
page_num=page_record.page_num,
text=" ".join(window),
char_start=start,
)
)
chunk_index += 1
if end == len(words):
break
start += stride
return chunks
def validate_pdf(file_path: str) -> Tuple[bool, str]:
if not file_path:
return False, "Please upload a PDF file."
if not file_path.lower().endswith(".pdf"):
return False, "Only PDF files are accepted."
try:
with open(file_path, "rb") as handle:
if handle.read(4) != b"%PDF":
return False, "The uploaded file does not look like a valid PDF."
except OSError:
return False, "The uploaded file could not be read."
size_bytes = os.path.getsize(file_path)
if size_bytes > MAX_FILE_SIZE_MB * 1024 * 1024:
return False, f"PDF exceeds the {MAX_FILE_SIZE_MB} MB file-size limit."
return True, ""
def make_temp_session_dir() -> str:
ensure_directories()
return tempfile.mkdtemp(prefix="session_", dir=str(UPLOAD_ROOT))
def make_metric_badge(label: str, value: str) -> str:
return (
"
"
f"
{html.escape(label)}
"
f"
{html.escape(value)}
"
"
"
)
def result_card(title: str, body: str, tone: str = "normal") -> str:
palette = {
"normal": ("#ffffff", "#0f172a", "#dbe4f0"),
"error": ("#fff4f4", "#9f1239", "#fecdd3"),
"warn": ("#fff8eb", "#9a3412", "#fed7aa"),
"success": ("#f0fdf4", "#166534", "#bbf7d0"),
}
bg, fg, border = palette[tone]
return (
f""
f"
{html.escape(title)}
"
f"
{body}
"
)
def initial_result_html() -> str:
return result_card(
"Ready",
"Ask a question about the preloaded Phase 2 project corpus to receive a grounded answer with page and line references.",
)
def line_window_score(query: str, answer: str, chunk_text: str, snippet: str) -> float:
query_terms = content_terms(query)
answer_terms = content_terms(answer)
chunk_terms = content_terms(chunk_text)
snippet_terms = content_terms(snippet)
exact_bonus = 1.0 if normalize_text(answer).lower() in normalize_text(snippet).lower() and answer.strip() else 0.0
overlap = len((query_terms | answer_terms | chunk_terms) & snippet_terms)
return exact_bonus + overlap / max(1, len(snippet_terms))
def locate_citation(question: str, answer: str, chunk: Optional[Chunk], session: SessionData) -> Optional[Citation]:
if chunk is None:
return None
page_record = next(
(
record
for record in session.page_records
if record.page_num == chunk.page_num and record.source_file == chunk.source_file
),
None,
)
if page_record is None or not page_record.lines:
return None
best_score = -1.0
best_window = (1, 1, page_record.lines[0])
for start_index in range(len(page_record.lines)):
for end_index in range(start_index, min(len(page_record.lines), start_index + 4)):
excerpt = " ".join(page_record.lines[start_index:end_index + 1]).strip()
if not excerpt:
continue
score = line_window_score(question, answer, chunk.text, excerpt)
if score > best_score:
best_score = score
best_window = (start_index + 1, end_index + 1, excerpt)
return Citation(
source_file=session.file_name,
page_num=chunk.page_num,
line_start=best_window[0],
line_end=best_window[1],
excerpt=best_window[2],
)
def locate_citation_by_file_page(question: str, answer: str, session: SessionData, source_file: str, page_num: int) -> Optional[Citation]:
page_record = next(
(
record
for record in session.page_records
if record.source_file == source_file and record.page_num == page_num
),
None,
)
if page_record is None or not page_record.lines:
return None
best_score = -1.0
best_window = (1, 1, page_record.lines[0])
for start_index in range(len(page_record.lines)):
for end_index in range(start_index, min(len(page_record.lines), start_index + 4)):
excerpt = " ".join(page_record.lines[start_index:end_index + 1]).strip()
if not excerpt:
continue
score = line_window_score(question, answer, excerpt, excerpt)
if score > best_score:
best_score = score
best_window = (start_index + 1, end_index + 1, excerpt)
return Citation(
source_file=source_file,
page_num=page_num,
line_start=best_window[0],
line_end=best_window[1],
excerpt=best_window[2],
)
def best_evidence_sentences(
session: SessionData,
question: str,
plan: QuestionPlan,
source_filters: Optional[List[str]] = None,
) -> Tuple[List[Chunk], List[Tuple[Chunk, str]]]:
chunks = session.retriever.retrieve(plan.expanded_query, k=min(max(K_PASSAGES * 3, 12), len(session.chunks)))
if source_filters:
filtered = [chunk for chunk in chunks if chunk.source_file in source_filters]
if filtered:
chunks = filtered
evidence = session.agent.pipeline.select_evidence(
plan.expanded_query,
chunks,
max_sentences=4 if plan.mode in {"procedural", "descriptive"} else 3,
)
return chunks, [(item.chunk, item.sentence) for item in evidence]
def extract_project_name(sentence: str) -> Optional[str]:
match = re.search(r"\bProject\s*[:\-]?\s*([A-Z][A-Za-z0-9 .-]{2,80})", sentence)
if not match:
return None
candidate = match.group(1)
candidate = re.split(r"\b(?:Status|System|Version|Document|Approved|Author)\b", candidate)[0].strip(" :-,")
return candidate or None
def concise_factoid_answer(question: str, plan: QuestionPlan, evidence_pairs: List[Tuple[Chunk, str]]) -> Optional[str]:
if not evidence_pairs:
return None
for _, sentence in evidence_pairs:
if any(token in question.lower() for token in ["interval", "duration", "period"]) and re.search(r"\b\d+\s+(?:months?|days?|years?)\b", sentence, re.I):
match = re.search(r"\b(\d+\s+(?:months?|days?|years?))\b", sentence, re.I)
if match:
return normalize_text(match.group(1))
if plan.expected_type == "currency":
match = AMOUNT_RE.search(sentence)
if match:
prefix = (match.group(1) or "INR").replace(".", "")
return normalize_text(f"{prefix} {match.group(2)}")
match = CURRENCY_RE.search(sentence)
if match:
return normalize_text(match.group(0))
if plan.expected_type == "date":
match = DATE_RE.search(sentence)
if match:
return normalize_text(match.group(0))
if plan.expected_type == "version":
match = VERSION_RE.search(sentence)
if match:
return normalize_text(match.group(0))
if plan.expected_type == "number":
matches = NUMBER_RE.findall(sentence)
if matches:
return normalize_text(matches[0])
if plan.expected_type == "person":
match = PERSON_RE.search(sentence)
if match:
return normalize_text(match.group(0))
if plan.expected_type in {"project_name", "name"}:
candidate = extract_project_name(sentence)
if candidate:
return normalize_text(candidate)
top_sentence = evidence_pairs[0][1]
if plan.expected_type == "text":
short = clip_text(top_sentence, max_chars=180)
if len(short.split()) <= 18:
return short
return None
def metadata_lookup_answer(session: SessionData, question: str, source_filters: List[str]) -> Optional[Tuple[str, str, int]]:
q = question.lower()
headers = session.structured.get("headers", {})
vmp_table = session.structured.get("vmp_table", {})
if ("author" in q or "approver" in q or "approve" in q or "document id" in q or "version" in q or "approved date" in q) and not source_filters:
if "author" in q or "approver" in q or "approve" in q:
return (
"Please specify which document you mean, for example Validation Master Plan, Configuration Guide, or Traceability Matrix.",
"",
0,
)
for file_name in source_filters:
pretty = pretty_doc_name(file_name).lower()
row = vmp_table.get(pretty)
header = headers.get(file_name, {})
if "qa approver" in q and header.get("qa_approver"):
return header["qa_approver"], file_name, 1
if "author" in q and header.get("author"):
return header["author"], file_name, 1
if ("approve" in q or "approver" in q) and row and row.get("approver"):
return row["approver"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2"))
if "document id" in q and row and row.get("document_id"):
return row["document_id"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2"))
if "phase" in q and row and row.get("phase"):
return row["phase"], "02_Validation_Master_Plan.pdf", int(row.get("page_num", "2"))
if ("qa approver" in q or "approved date" in q or "version" in q or "system" in q or "status" in q or "project" in q) and header:
if "approved date" in q and header.get("approved_date"):
return header["approved_date"], file_name, 1
if "version" in q and header.get("version"):
return header["version"], file_name, 1
if "system" in q and header.get("system"):
return header["system"], file_name, 1
if "status" in q and header.get("status"):
return header["status"], file_name, 1
if "project" in q and header.get("project"):
return header["project"], file_name, 1
return None
def summarize_procedural_answer(evidence_pairs: List[Tuple[Chunk, str]]) -> Optional[str]:
if not evidence_pairs:
return None
sentences: List[str] = []
seen = set()
for _, sentence in evidence_pairs:
cleaned = clip_text(sentence, max_chars=220)
key = cleaned.lower()
if key in seen:
continue
sentences.append(cleaned)
seen.add(key)
if len(sentences) >= MAX_SUMMARY_SENTENCES:
break
if not sentences:
return None
return "Based on the document: " + " ".join(sentences)
def answer_relevance_proxy(question: str, answer: str, citation: Optional[Citation]) -> Optional[float]:
if not answer or answer.lower().startswith("i don't have enough evidence"):
return None
query_terms = content_terms(question)
support_terms = content_terms(answer)
if citation:
support_terms |= content_terms(citation.excerpt)
if not query_terms:
return 1.0
return round(len(query_terms & support_terms) / len(query_terms), 4)
def context_precision_proxy(question: str, answer: str, retrieved_chunks: List[Chunk]) -> Optional[float]:
if not retrieved_chunks:
return None
query_terms = content_terms(question)
answer_terms = content_terms(answer)
relevant = 0
for chunk in retrieved_chunks:
chunk_terms = content_terms(chunk.text)
if query_terms & chunk_terms or answer_terms & chunk_terms:
relevant += 1
return round(relevant / len(retrieved_chunks), 4)
def faithfulness_proxy(answer: str, citation: Optional[Citation]) -> Optional[float]:
if not answer or not citation:
return None
answer_norm = normalize_text(answer).lower()
excerpt_norm = normalize_text(citation.excerpt).lower()
if answer_norm and answer_norm in excerpt_norm:
return 1.0
answer_terms = content_terms(answer)
excerpt_terms = content_terms(citation.excerpt)
if not answer_terms:
return 1.0 if answer_norm and answer_norm in excerpt_norm else 0.0
return round(len(answer_terms & excerpt_terms) / len(answer_terms), 4)
def citation_html(citation: Optional[Citation]) -> str:
if citation is None:
return "No supporting citation was selected."
line_label = (
f"line {citation.line_start}"
if citation.line_start == citation.line_end
else f"lines {citation.line_start}-{citation.line_end}"
)
return (
f"{html.escape(citation.source_file)}
"
f""
f"Page {citation.page_num}, {line_label} (extracted text)
"
f"{html.escape(citation.excerpt)}
"
)
def render_result(
question: str,
answer: str,
citation: Optional[Citation],
metrics: Dict[str, Optional[float]],
abstained: bool,
) -> str:
answer_card = result_card(
"Final Answer",
html.escape(answer),
tone="warn" if abstained else "normal",
)
source_card = result_card(
"Source Reference",
citation_html(citation),
tone="warn" if abstained else "success",
)
metric_boxes = "".join(
[
make_metric_badge("Latency", f"{metrics['latency_seconds']:.2f}s"),
make_metric_badge(
"Answer Relevance",
"N/A" if metrics.get("answer_relevance") is None else f"{metrics['answer_relevance']:.2f}",
),
make_metric_badge(
"Context Precision",
"N/A" if metrics.get("context_precision") is None else f"{metrics['context_precision']:.2f}",
),
make_metric_badge(
"Faithfulness",
"N/A" if metrics.get("faithfulness_proxy") is None else f"{metrics['faithfulness_proxy']:.2f}",
),
make_metric_badge("Hallucination", f"{metrics.get('hallucination_rate', 0.0):.2f}"),
]
)
question_html = (
""
"
Question
"
f"
{html.escape(question)}
"
"
"
)
return (
""
f"{question_html}"
"
"
f"{answer_card}{source_card}"
f"
{metric_boxes}
"
"
"
)
def render_document_status(session: SessionData) -> str:
doc_count = len({record.source_file for record in session.page_records})
return result_card(
"Corpus Loaded",
(
f"{html.escape(session.file_name)}
"
f"Documents indexed: {doc_count}
"
f"Pages indexed: {session.page_count}
"
f"Chunks indexed: {len(session.chunks)}
"
f"Extractor used: {html.escape(session.extractor)}
"
f"Knowledge base mode: preloaded project corpus"
),
tone="success",
)
def error_html(message: str) -> str:
return result_card("Action Required", html.escape(message), tone="error")
def info_html(message: str) -> str:
return result_card("Notice", html.escape(message), tone="warn")
def build_session(file_path: str) -> SessionData:
is_valid, validation_message = validate_pdf(file_path)
if not is_valid:
raise ValueError(validation_message)
temp_dir = make_temp_session_dir()
try:
file_name = os.path.basename(file_path)
dest_path = os.path.join(temp_dir, file_name)
shutil.copy2(file_path, dest_path)
page_records, extractor_name = extract_page_records(dest_path, file_name)
if not page_records:
raise ValueError(
"No extractable text was found in the PDF. Please upload a text-based PDF."
)
if len(page_records) > MAX_PAGES:
raise ValueError(
f"PDF has {len(page_records)} pages, which exceeds the {MAX_PAGES}-page limit."
)
chunks = chunk_page_records(page_records, file_name)
if not chunks:
raise ValueError(
"The extracted PDF text did not produce enough content to index."
)
if len(chunks) > MAX_CHUNKS:
raise ValueError(
f"PDF produced {len(chunks)} chunks, which exceeds the {MAX_CHUNKS}-chunk limit."
)
file_hash = sha256_file(dest_path)
file_size_bytes = os.path.getsize(dest_path)
retriever = SessionRetriever(chunks, get_embedder())
agent = build_session_agent(retriever)
return SessionData(
session_id=str(uuid.uuid4()),
temp_dir=temp_dir,
pdf_path=dest_path,
file_name=file_name,
file_hash=file_hash,
file_size_bytes=file_size_bytes,
page_records=page_records,
chunks=chunks,
retriever=retriever,
agent=agent,
page_count=len(page_records),
extractor=extractor_name,
structured={},
)
except Exception:
shutil.rmtree(temp_dir, ignore_errors=True)
raise
def parse_header_metadata(page_records: List[PageRecord]) -> Dict[str, str]:
if not page_records:
return {}
lines = page_records[0].lines
metadata: Dict[str, str] = {}
title_lines: List[str] = []
i = 0
while i < len(lines):
line = lines[i]
if re.match(r"^\d+\.", line):
break
if line.endswith(":") and i + 1 < len(lines):
key = line[:-1].strip().lower().replace(" ", "_")
metadata[key] = lines[i + 1].strip()
i += 2
continue
title_lines.append(line)
i += 1
metadata["header_text"] = " ".join(lines[: min(len(lines), 20)])
metadata["title"] = " | ".join(title_lines[:3])
return metadata
def parse_vmp_table(page_records: List[PageRecord]) -> Dict[str, Dict[str, str]]:
known_docs = {
"Project Charter",
"Validation Master Plan",
"User Requirements Specification",
"Functional Requirements Specification",
"Risk Assessment",
"HP ALM Configuration Guide",
"IQ Protocol",
"IQ Execution Report",
"OQ Protocol",
"OQ Execution Report",
"PQ/UAT Protocol and Report",
"Data Migration Plan",
"Data Migration Summary Report",
"Validation Summary Report",
"Traceability Matrix",
"Change Control SOP",
}
rows: Dict[str, Dict[str, str]] = {}
all_lines: List[Tuple[int, str]] = []
for record in page_records:
for line in record.lines:
all_lines.append((record.page_num, line))
collecting = False
idx = 0
while idx < len(all_lines):
page_num, line = all_lines[idx]
if line == "Document":
collecting = True
idx += 5
continue
if not collecting:
idx += 1
continue
if line.startswith("4. Roles and Responsibilities"):
break
if line in known_docs and idx + 4 < len(all_lines):
_, doc_id = all_lines[idx + 1]
_, phase = all_lines[idx + 2]
_, author = all_lines[idx + 3]
_, approver = all_lines[idx + 4]
rows[line.lower()] = {
"document": line,
"document_id": doc_id,
"phase": phase,
"author": author,
"approver": approver,
"page_num": str(page_num),
}
idx += 5
continue
idx += 1
if not rows:
log_event(
"vmp_table_parse_empty",
source_file="02_Validation_Master_Plan.pdf",
page_count=len(page_records),
)
return rows
def corpus_pdf_files(candidate: Path) -> List[Path]:
if not candidate.exists() or not candidate.is_dir():
return []
pdfs = sorted(p for p in candidate.glob("*.pdf") if p.is_file())
numbered = [p for p in pdfs if re.match(r"^\d{2}_.+\.pdf$", p.name)]
required = {
"01_Project_Charter.pdf",
"02_Validation_Master_Plan.pdf",
"15_Regulatory_Reference_Guide.pdf",
}
names = {p.name for p in numbered}
if required.issubset(names):
return numbered
if len(numbered) >= 10:
return numbered
return []
def resolve_corpus_dir() -> Path:
for candidate in CORPUS_CANDIDATES:
if corpus_pdf_files(candidate):
return candidate
raise FileNotFoundError(
"Phase 2 corpus not found. Upload the 15 PDF files either into a phase2_corpus folder in the app repo or at the repo root."
)
def build_corpus_session() -> SessionData:
corpus_dir = resolve_corpus_dir()
pdf_paths = corpus_pdf_files(corpus_dir)
page_records: List[PageRecord] = []
chunks: List[Chunk] = []
structured: Dict[str, dict] = {"headers": {}, "vmp_table": {}, "corpus_dir": str(corpus_dir)}
extractors = set()
file_hash_parts: List[str] = []
for pdf_path in pdf_paths:
file_name = pdf_path.name
doc_pages, extractor_name = extract_page_records(str(pdf_path), file_name)
extractors.add(extractor_name)
page_records.extend(doc_pages)
chunks.extend(chunk_page_records(doc_pages, file_name))
structured["headers"][file_name] = parse_header_metadata(doc_pages)
file_hash_parts.append(f"{file_name}:{pdf_path.stat().st_size}:{int(pdf_path.stat().st_mtime)}")
if file_name == "02_Validation_Master_Plan.pdf":
structured["vmp_table"] = parse_vmp_table(doc_pages)
retriever = SessionRetriever(chunks, get_embedder())
agent = build_session_agent(retriever)
corpus_hash = hashlib.sha256("|".join(file_hash_parts).encode("utf-8")).hexdigest()
return SessionData(
session_id="phase2-corpus",
temp_dir="",
pdf_path=str(corpus_dir),
file_name="Phase 2 corpus",
file_hash=corpus_hash,
file_size_bytes=0,
page_records=page_records,
chunks=chunks,
retriever=retriever,
agent=agent,
page_count=len({(record.source_file, record.page_num) for record in page_records}),
extractor=" / ".join(sorted(extractors)),
structured=structured,
)
def handle_upload(file_obj, current_session_id: Optional[str]):
cleanup_expired_sessions()
if current_session_id:
with SESSIONS_LOCK:
remove_session(current_session_id)
file_path = file_obj if isinstance(file_obj, str) else getattr(file_obj, "name", None)
if not file_path:
return None, info_html("Upload one PDF file to start a testing session."), initial_result_html(), None, ""
try:
session = build_session(file_path)
with SESSIONS_LOCK:
SESSIONS[session.session_id] = session
log_event(
"upload_success",
session_id=session.session_id,
file_hash=session.file_hash,
file_name=session.file_name,
page_count=session.page_count,
chunk_count=len(session.chunks),
file_size_bytes=session.file_size_bytes,
extractor=session.extractor,
)
return session.session_id, render_document_status(session), initial_result_html(), None, ""
except Exception as exc:
log_event("upload_rejected", reason=str(exc), file_name=os.path.basename(file_path))
return None, error_html(str(exc)), initial_result_html(), None, ""
def get_session(session_id: Optional[str]) -> Optional[SessionData]:
if not session_id:
return None
with SESSIONS_LOCK:
session = SESSIONS.get(session_id)
return session
def check_rate_limit(session: SessionData) -> Optional[str]:
now = now_ts()
window_start = now - 60
session.question_timestamps = [ts for ts in session.question_timestamps if ts >= window_start]
if len(session.question_timestamps) >= MAX_QUESTIONS_PER_MINUTE:
return (
f"Rate limit reached. Please wait before asking more than "
f"{MAX_QUESTIONS_PER_MINUTE} questions per minute in one session."
)
session.question_timestamps.append(now)
return None
def build_question_metrics(question: str, answer: str, citation: Optional[Citation], retrieved_chunks: List[Chunk], hallucination_rate: float, latency_seconds: float) -> Dict[str, Optional[float]]:
return {
"latency_seconds": latency_seconds,
"answer_relevance": answer_relevance_proxy(question, answer, citation),
"context_precision": context_precision_proxy(question, answer, retrieved_chunks),
"faithfulness_proxy": faithfulness_proxy(answer, citation),
"hallucination_rate": hallucination_rate,
}
def ask_question(question: str, session_id: Optional[str]):
cleanup_expired_sessions()
question = (question or "").strip()
if not question:
return info_html("Enter a question to query the preloaded project corpus."), None, ""
session = get_session(session_id)
if session is None:
return error_html("The preloaded corpus is not available right now. Please reload the app."), None, ""
session.last_activity = now_ts()
rate_limit_message = check_rate_limit(session)
if rate_limit_message:
log_event("rate_limited", session_id=session.session_id, question=question)
return error_html(rate_limit_message), None, ""
plan = question_plan(question)
source_filters = matched_source_files(question)
metadata_hit = metadata_lookup_answer(session, question, source_filters)
if metadata_hit:
answer_text, source_file, source_page = metadata_hit
abstained = False
hallucination_rate = 0.0
latency_seconds = 0.0
citation = (
locate_citation_by_file_page(question, answer_text, session, source_file, source_page)
if source_file
else None
)
metrics = build_question_metrics(question, answer_text, citation, [], hallucination_rate, latency_seconds)
result_html = render_result(question, answer_text, citation, metrics, abstained=False)
response_state = {
"session_id": session.session_id,
"question": question,
"answer": answer_text,
"abstained": False,
"source_file": citation.source_file if citation else None,
"page_num": citation.page_num if citation else None,
"line_start": citation.line_start if citation else None,
"line_end": citation.line_end if citation else None,
"excerpt": citation.excerpt if citation else None,
"metrics": metrics,
"file_hash": session.file_hash,
"route_mode": "structured",
"expected_type": plan.expected_type,
}
append_jsonl(
INTERACTION_LOG_PATH,
{
"timestamp": now_ts(),
"session_id": session.session_id,
"file_hash": session.file_hash,
"file_name": session.file_name,
"page_count": session.page_count,
"question": question,
"answer": answer_text,
"abstained": False,
"source_file": citation.source_file if citation else None,
"page_num": citation.page_num if citation else None,
"line_start": citation.line_start if citation else None,
"line_end": citation.line_end if citation else None,
"excerpt": citation.excerpt if citation else None,
"latency_seconds": latency_seconds,
"route_mode": "structured",
"expected_type": plan.expected_type,
"answer_relevance": metrics["answer_relevance"],
"context_precision": metrics["context_precision"],
"faithfulness_proxy": metrics["faithfulness_proxy"],
"hallucination_rate": metrics["hallucination_rate"],
},
)
return result_html, response_state, ""
retrieved_chunks, evidence_pairs = best_evidence_sentences(session, question, plan, source_filters=source_filters)
evidence_sentences = [sentence for _, sentence in evidence_pairs]
start = time.perf_counter()
output = None
best_chunk = evidence_pairs[0][0] if evidence_pairs else None
answer_text: str
abstained = False
hallucination_rate = 0.0
def run_agentic_fallback() -> Tuple[str, bool, float, Optional[Chunk], Optional[str]]:
nonlocal output, best_chunk
try:
output = session.agent.run(question)
best_chunk = output.best_chunk or best_chunk
answer = (
"I don't have enough evidence in the project corpus to answer that reliably."
if output.abstained
else (output.answer or "No answer produced.")
)
return answer, output.abstained, output.hallucination_rate or 0.0, best_chunk, None
except Exception as exc:
log_event("inference_failed", session_id=session.session_id, question=question, error=str(exc))
return "", False, 0.0, best_chunk, str(exc)
if not evidence_pairs:
abstained = True
answer_text = "I don't have enough evidence in the project corpus to answer that reliably."
elif plan.mode in {"procedural", "descriptive"}:
summary_answer = summarize_procedural_answer(evidence_pairs) if evidence_has_expected_type(plan, evidence_sentences) else None
if summary_answer:
answer_text = summary_answer
abstained = False
else:
if plan.allow_agentic_fallback:
answer_text, abstained, hallucination_rate, best_chunk, inference_error = run_agentic_fallback()
if inference_error:
return error_html(f"Inference failed: {inference_error}"), None, ""
else:
abstained = True
answer_text = "I don't have enough evidence in the project corpus to answer that reliably."
else:
concise_answer = concise_factoid_answer(question, plan, evidence_pairs)
if concise_answer and evidence_has_expected_type(plan, evidence_sentences):
answer_text = concise_answer
elif not evidence_has_expected_type(plan, evidence_sentences):
abstained = True
answer_text = "I don't have enough evidence in the project corpus to answer that reliably."
elif plan.allow_agentic_fallback:
answer_text, abstained, hallucination_rate, best_chunk, inference_error = run_agentic_fallback()
if inference_error:
return error_html(f"Inference failed: {inference_error}"), None, ""
else:
abstained = True
answer_text = "I don't have enough evidence in the project corpus to answer that reliably."
latency_seconds = time.perf_counter() - start
citation = locate_citation(question, answer_text, best_chunk, session)
metrics = build_question_metrics(
question,
answer_text,
citation,
retrieved_chunks,
hallucination_rate,
latency_seconds,
)
result_html = render_result(
question=question,
answer=answer_text,
citation=citation,
metrics=metrics,
abstained=abstained,
)
response_state = {
"session_id": session.session_id,
"question": question,
"answer": answer_text,
"abstained": abstained,
"source_file": citation.source_file if citation else None,
"page_num": citation.page_num if citation else None,
"line_start": citation.line_start if citation else None,
"line_end": citation.line_end if citation else None,
"excerpt": citation.excerpt if citation else None,
"metrics": metrics,
"file_hash": session.file_hash,
"route_mode": plan.mode,
"expected_type": plan.expected_type,
}
append_jsonl(
INTERACTION_LOG_PATH,
{
"timestamp": now_ts(),
"session_id": session.session_id,
"file_hash": session.file_hash,
"file_name": session.file_name,
"page_count": session.page_count,
"question": question,
"answer": answer_text,
"abstained": abstained,
"source_file": citation.source_file if citation else None,
"page_num": citation.page_num if citation else None,
"line_start": citation.line_start if citation else None,
"line_end": citation.line_end if citation else None,
"excerpt": citation.excerpt if citation else None,
"latency_seconds": latency_seconds,
"route_mode": plan.mode,
"expected_type": plan.expected_type,
"answer_relevance": metrics["answer_relevance"],
"context_precision": metrics["context_precision"],
"faithfulness_proxy": metrics["faithfulness_proxy"],
"hallucination_rate": metrics["hallucination_rate"],
},
)
return result_html, response_state, ""
def submit_feedback(response_state: Optional[dict], vote: str):
if not response_state:
return "Ask a question first, then rate the answer."
log_event(
"feedback",
session_id=response_state.get("session_id"),
file_hash=response_state.get("file_hash"),
question=response_state.get("question"),
vote=vote,
source_file=response_state.get("source_file"),
page_num=response_state.get("page_num"),
line_start=response_state.get("line_start"),
line_end=response_state.get("line_end"),
)
if vote == "helpful":
return "Thanks. Your feedback was recorded as helpful."
return "Thanks. Your feedback was recorded for review."
ensure_directories()
try:
PRELOADED_SESSION = build_corpus_session()
with SESSIONS_LOCK:
SESSIONS[PRELOADED_SESSION.session_id] = PRELOADED_SESSION
PRELOADED_STATUS_HTML = render_document_status(PRELOADED_SESSION)
STARTUP_NOTICE = ""
except Exception as exc:
PRELOADED_SESSION = None
PRELOADED_STATUS_HTML = error_html(str(exc))
STARTUP_NOTICE = str(exc)
with gr.Blocks(css=CSS) as demo:
session_state = gr.State(PRELOADED_SESSION.session_id if PRELOADED_SESSION else None)
response_state = gr.State(None)
gr.Markdown(
f"""
# {APP_NAME}
{APP_TAGLINE}
**Project corpus mode**
- Preloaded Phase 2 project documents
- Optimized for this fixed validation corpus
- Best for onboarding, project lookup, and validation Q&A
- Rate limit: **{MAX_QUESTIONS_PER_MINUTE} questions per minute**
"""
)
gr.Markdown(
f"""
Privacy notice
{html.escape(PRIVACY_NOTICE)}
"""
)
if STARTUP_NOTICE:
gr.HTML(error_html(STARTUP_NOTICE))
document_status = gr.HTML(PRELOADED_STATUS_HTML)
with gr.Row():
question_box = gr.Textbox(
label="Your Question",
lines=2,
placeholder="Ask a question about the preloaded project documents...",
scale=5,
)
ask_btn = gr.Button("Ask", elem_id="ask_btn", scale=1)
result_html_component = gr.HTML(initial_result_html())
with gr.Row():
helpful_btn = gr.Button("Helpful")
needs_work_btn = gr.Button("Needs Improvement")
feedback_status = gr.Textbox(
label="Feedback Status",
value="",
interactive=False,
)
with gr.Accordion("Testing guardrails and privacy details", open=False):
gr.Markdown(
f"""
- **Knowledge base**: 15 preloaded Phase 2 project PDFs
- **Citation format**: page and extracted-text line range
- **Queueing**: concurrency limit {QUEUE_CONCURRENCY}, queue size {QUEUE_MAX_SIZE}
- **Logged for evaluation**: corpus question, answer, citation, latency, and proxy metrics
"""
)
ask_btn.click(
ask_question,
inputs=[question_box, session_state],
outputs=[result_html_component, response_state, feedback_status],
)
question_box.submit(
ask_question,
inputs=[question_box, session_state],
outputs=[result_html_component, response_state, feedback_status],
)
helpful_btn.click(
lambda state: submit_feedback(state, "helpful"),
inputs=[response_state],
outputs=[feedback_status],
)
needs_work_btn.click(
lambda state: submit_feedback(state, "needs_improvement"),
inputs=[response_state],
outputs=[feedback_status],
)
if __name__ == "__main__":
ensure_directories()
cleanup_expired_sessions()
demo.queue(default_concurrency_limit=QUEUE_CONCURRENCY, max_size=QUEUE_MAX_SIZE).launch()