""" G.U.I.D.E. API route handlers. All routes are registered on a single APIRouter that main.py includes. Presidio redaction is wired into POST /session/{id}/message here — the CMA never receives un-redacted text from this layer. Critical data-flow contracts enforced here: 1. Every /message call redacts PII before forwarding to GUIDEAgent. 2. /validate-entities injects "[USER CONFIRMED]: {entities}" and awaits the CMA's draft reply — it must not call send_message directly. 3. CMA calls (send_message, confirm_entities) are blocking I/O (Anthropic API); they run in a thread-pool via run_in_threadpool so the async event loop is not blocked. """ from __future__ import annotations import dataclasses import logging import os import re import shutil from datetime import datetime, timezone from pathlib import Path from fastapi import APIRouter, File, HTTPException, UploadFile from fastapi.concurrency import run_in_threadpool from src.api import sessions as session_store from src.api.models import ( AuditEntry, AuditResponse, ClassifyRequest, ClassifyResponse, CreateSessionResponse, EntityOut, ExtractRequest, ExtractResponse, HistoryMessage, HistoryResponse, MessageRequest, MessageResponse, RedactionSpanOut, UploadResponse, ValidateEntitiesRequest, ) from src.classifier.predict import classify from src.ner.predict import extract_entities from src.privacy.redactor import get_redactor logger = logging.getLogger(__name__) router = APIRouter(prefix="/api") def _llm_http_error(exc: Exception) -> HTTPException: """Translate an LLM/agent exception into a calm, user-friendly HTTPException. The raw exception is already logged by the caller (logger.exception); this only shapes the message the UI shows verbatim. Anthropic surfaces transient capacity issues as an 'Overloaded' APIStatusError (HTTP 529) and rate limits as RateLimitError (429) — both are temporary and worth a retry. """ name = type(exc).__name__ text = str(exc).lower() if "overloaded" in text or name == "OverloadedError" or "529" in text: return HTTPException( status_code=503, detail=("Claude (the AI service) is briefly overloaded with requests. " "Please wait a few seconds and send your message again — your " "conversation is saved."), ) if name == "RateLimitError" or "rate limit" in text or "429" in text: return HTTPException( status_code=429, detail=("The AI service is handling a high volume of requests right now. " "Please wait about a minute and try again."), ) if name == "APITimeoutError" or "timeout" in text or "timed out" in text: return HTTPException( status_code=504, detail=("The AI took too long to respond. Please try sending your " "message again."), ) if name == "AuthenticationError" or "authentication" in text or "401" in text: return HTTPException( status_code=502, detail=("The assistant isn't configured correctly on the server " "(authentication failed). Please contact the site operator."), ) return HTTPException( status_code=502, detail=("The AI service ran into an unexpected error. Please try again in " "a moment; if it keeps happening, try rephrasing your message."), ) # Upload directory inside the project root — survives server restarts unlike # tempfile.gettempdir() which macOS can purge between sessions. _UPLOAD_DIR = Path(__file__).resolve().parent.parent.parent / "uploads" _UPLOAD_DIR.mkdir(parents=True, exist_ok=True) # --------------------------------------------------------------------------- # Health # --------------------------------------------------------------------------- @router.get("/health") async def health(): """ Return the operational status of every G.U.I.D.E. component. status = "ok" all critical components initialised status = "degraded" one or more components using a fallback or not loaded """ from src.api.main import _component_status # set during lifespan startup from src.api.models import HealthResponse overall = ( "ok" if all( v in ("ok", "keyword_fallback", "rule_fallback") for v in _component_status.values() ) else "degraded" ) return HealthResponse(status=overall, components=dict(_component_status)) # --------------------------------------------------------------------------- # Session lifecycle # --------------------------------------------------------------------------- @router.post("/session/create", response_model=CreateSessionResponse) async def create_session(): """Create a new stateful CMA session and return its session_id.""" session_id = await run_in_threadpool(session_store.create_session) logger.info("Session created: %s", session_id) return CreateSessionResponse(session_id=session_id) # --------------------------------------------------------------------------- # Message (Presidio redaction enforced here) # --------------------------------------------------------------------------- @router.post( "/session/{session_id}/message", response_model=MessageResponse, ) async def send_message(session_id: str, req: MessageRequest): """ Redact PII from *req.text*, forward the redacted text to the CMA session, and return the agent's reply together with redaction metadata. Redaction happens in this handler — GUIDEAgent.send_message only ever receives text that has already passed through Presidio. """ session = session_store.get_session(session_id) # 1. Presidio redaction (local, before any external call) redaction = get_redactor().redact(req.text) redacted_text = redaction.redacted_text # 1a. Privacy audit: record exactly what is about to leave the process, and # verify that none of the original PII values survive into the redacted text # that will be transmitted to Anthropic. This makes the privacy guarantee # provable rather than merely asserted. leaked = [ s.original for s in redaction.spans if s.original and s.original in redacted_text ] session_store.append_audit( session_id, { "timestamp": datetime.now(timezone.utc).isoformat(), "event": "outbound_to_anthropic", "description": ( f"Message sent to Claude API — {len(redaction.spans)} personal " f"identifier(s) stripped locally before transmission." if redaction.spans else "Message sent to Claude API — no personal identifiers detected." ), "transmitted_text": redacted_text, "pii_types_found": redaction.pii_types_found, "pii_count": len(redaction.spans), "leak_check": "failed" if leaked else "passed", }, ) # 2. CMA call (blocking — run in thread pool) _key = os.environ.get("ANTHROPIC_API_KEY", "") logger.info("ANTHROPIC_API_KEY present=%s len=%d prefix=%s", bool(_key), len(_key), _key[:8] if _key else "") try: reply: str = await run_in_threadpool( session.agent.send_message, redacted_text ) except NotImplementedError: raise HTTPException( status_code=501, detail="GUIDEAgent.send_message is not yet implemented.", ) except Exception as exc: logger.exception("CMA error in session %s", session_id) raise _llm_http_error(exc) from exc # 3. Persist conversation turn session_store.append_history(session_id, "user", redacted_text) session_store.append_history(session_id, "assistant", reply) return MessageResponse( reply=reply, pii_redacted=redaction.pii_redacted, pii_types_found=redaction.pii_types_found, original_text=req.text, redacted_text=redacted_text, redactions=[ RedactionSpanOut( entity_type=s.entity_type, original=s.original, placeholder=s.placeholder, start=s.start, end=s.end, ) for s in redaction.spans ], ) # --------------------------------------------------------------------------- # Document upload # --------------------------------------------------------------------------- @router.post( "/session/{session_id}/upload", response_model=UploadResponse, ) async def upload_document( session_id: str, file: UploadFile = File(...), ): """ Accept a document file, run DocumentProcessor (OCR → NER → ViT), register the file path with the CMA session so the next agent turn can call process_document, and return the extracted text and entities. Supported: .pdf .png .jpg .jpeg .webp Files are saved to a per-process temp directory; they persist for the duration of the server process so the agent can reference them by path. """ session = session_store.get_session(session_id) # Validate extension original_name = file.filename or "upload" suffix = Path(original_name).suffix.lower() allowed = {".pdf", ".png", ".jpg", ".jpeg", ".webp"} if suffix not in allowed: raise HTTPException( status_code=415, detail=f"Unsupported file type '{suffix}'. Allowed: {', '.join(sorted(allowed))}", ) # Sanitize filename: replace non-ASCII chars (e.g. macOS narrow no-break space # U+202F in screenshot filenames) and whitespace with underscores so the path # is safe to embed in agent messages without being truncated by Claude. safe_stem = re.sub(r"[^\x20-\x7E]", "_", Path(original_name).stem) safe_stem = re.sub(r"\s+", "_", safe_stem) safe_name = safe_stem + suffix tmp_path = _UPLOAD_DIR / f"{session_id}_{safe_name}" try: with tmp_path.open("wb") as fh: shutil.copyfileobj(file.file, fh) finally: await file.close() # DocumentProcessor (blocking) try: from src.document_processor.processor import get_processor result = await run_in_threadpool(get_processor().process, str(tmp_path)) except Exception as exc: logger.exception("DocumentProcessor failed for %s", original_name) raise HTTPException( status_code=500, detail=f"Document processing failed: {exc}", ) from exc # Register path with CMA session try: await run_in_threadpool(session.agent.add_document, str(tmp_path)) except NotImplementedError: pass # agent stub — path stored in session, will be wired when agent is implemented # Serialise Entity dataclasses to EntityOut Pydantic models entity_out = [ EntityOut(**dataclasses.asdict(e)) for e in result["entities"] ] # Privacy audit: document OCR + NER run entirely on this server (no image or # text is sent to a third party at upload time). Record it so the audit trail # is a complete account of where the user's data went. session_store.append_audit( session_id, { "timestamp": datetime.now(timezone.utc).isoformat(), "event": "document_local", "description": ( f"Document '{original_name}' processed locally on this server " f"(OCR + entity extraction). {len(entity_out)} detail(s) extracted. " "The image was not sent to any third-party service." ), "transmitted_text": "", "pii_types_found": [], "pii_count": 0, "leak_check": "n/a", }, ) logger.info( "Uploaded '%s' to session %s — %d chars, %d entities.", original_name, session_id, len(result["raw_text"]), len(entity_out), ) return UploadResponse( filename=original_name, raw_text=result["raw_text"], entities=entity_out, ) # --------------------------------------------------------------------------- # HITL — validate-entities # --------------------------------------------------------------------------- @router.post( "/session/{session_id}/validate-entities", response_model=MessageResponse, ) async def validate_entities(session_id: str, req: ValidateEntitiesRequest): """ HITL gate: receive user-confirmed (and optionally corrected) entity values, inject a [USER CONFIRMED] signal into the CMA, and return the agent's draft complaint reply. This endpoint must be called after the user reviews the entity summary presented by the agent in the Chat tab and clicks "Confirm & Generate Draft" in the Verify Entities tab. The agent uses the confirmed entities to fill the complaint draft accurately (important because PII redaction may have replaced, e.g., "HDFC Bank" with ""; the HITL step restores the correct readable label). """ session = session_store.get_session(session_id) try: reply: str = await run_in_threadpool( session.agent.confirm_entities, req.entities ) except NotImplementedError: raise HTTPException( status_code=501, detail="GUIDEAgent.confirm_entities is not yet implemented.", ) except Exception as exc: logger.exception("confirm_entities error in session %s", session_id) raise _llm_http_error(exc) from exc # The [USER CONFIRMED] message and the draft reply are both part of history confirmed_marker = f"[USER CONFIRMED]: {req.entities}" session_store.append_history(session_id, "user", confirmed_marker) session_store.append_history(session_id, "assistant", reply) return MessageResponse(reply=reply, pii_redacted=False, pii_types_found=[]) # --------------------------------------------------------------------------- # Escalation guide — second, separate request after the draft # --------------------------------------------------------------------------- @router.post( "/session/{session_id}/escalation-guide", response_model=MessageResponse, ) async def escalation_guide(session_id: str): """ Generate the escalation guide as a SEPARATE request from the draft letter. The draft (validate-entities) and the escalation guide are split into two distinct agent turns so each stays within the per-minute token budget and the token bucket can refill between them. The UI calls this immediately after the draft reply returns. """ session = session_store.get_session(session_id) try: reply: str = await run_in_threadpool(session.agent.generate_escalation) except Exception as exc: logger.exception("generate_escalation error in session %s", session_id) raise _llm_http_error(exc) from exc session_store.append_history(session_id, "user", "[Generate escalation guide]") session_store.append_history(session_id, "assistant", reply) return MessageResponse(reply=reply, pii_redacted=False, pii_types_found=[]) # --------------------------------------------------------------------------- # History # --------------------------------------------------------------------------- @router.get( "/session/{session_id}/history", response_model=HistoryResponse, ) async def get_history(session_id: str): """Return the full conversation history for *session_id*.""" session = session_store.get_session(session_id) return HistoryResponse( session_id=session_id, history=[HistoryMessage(**turn) for turn in session.history], ) # --------------------------------------------------------------------------- # Privacy audit trail # --------------------------------------------------------------------------- @router.get( "/session/{session_id}/audit", response_model=AuditResponse, ) async def get_audit(session_id: str): """Return the privacy audit trail for *session_id* — a chronological account of every event where data left the process or was processed locally.""" session = session_store.get_session(session_id) return AuditResponse( session_id=session_id, entries=[AuditEntry(**entry) for entry in session.audit], ) # --------------------------------------------------------------------------- # Debug — direct DL access (no session required) # --------------------------------------------------------------------------- @router.post("/classify", response_model=ClassifyResponse) async def classify_debug(req: ClassifyRequest): """ Classify *req.text* using DomainClassifier (or keyword fallback) directly. Does not redact PII — callers are responsible for not sending sensitive data to this debug endpoint in production. """ try: result = await run_in_threadpool(classify, req.text) except Exception as exc: raise _llm_http_error(exc) from exc return ClassifyResponse( domain=result.domain, confidence=result.confidence, all_probs=result.all_probs, ) @router.post("/extract", response_model=ExtractResponse) async def extract_debug(req: ExtractRequest): """ Run EvidenceNER on *req.text* and return entity spans directly. Does not redact PII — debug use only. """ try: entities = await run_in_threadpool(extract_entities, req.text) except Exception as exc: raise _llm_http_error(exc) from exc return ExtractResponse( entities=[EntityOut(**dataclasses.asdict(e)) for e in entities] )