Spaces:
Sleeping
Sleeping
| """ | |
| G.U.I.D.E. API route handlers. | |
| All routes are registered on a single APIRouter that main.py includes. | |
| Presidio redaction is wired into POST /session/{id}/message here β the CMA | |
| never receives un-redacted text from this layer. | |
| Critical data-flow contracts enforced here: | |
| 1. Every /message call redacts PII before forwarding to GUIDEAgent. | |
| 2. /validate-entities injects "[USER CONFIRMED]: {entities}" and awaits | |
| the CMA's draft reply β it must not call send_message directly. | |
| 3. CMA calls (send_message, confirm_entities) are blocking I/O (Anthropic | |
| API); they run in a thread-pool via run_in_threadpool so the async | |
| event loop is not blocked. | |
| """ | |
| from __future__ import annotations | |
| import dataclasses | |
| import logging | |
| import os | |
| import re | |
| import shutil | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from fastapi import APIRouter, File, HTTPException, UploadFile | |
| from fastapi.concurrency import run_in_threadpool | |
| from src.api import sessions as session_store | |
| from src.api.models import ( | |
| AuditEntry, | |
| AuditResponse, | |
| ClassifyRequest, | |
| ClassifyResponse, | |
| CreateSessionResponse, | |
| EntityOut, | |
| ExtractRequest, | |
| ExtractResponse, | |
| HistoryMessage, | |
| HistoryResponse, | |
| MessageRequest, | |
| MessageResponse, | |
| RedactionSpanOut, | |
| UploadResponse, | |
| ValidateEntitiesRequest, | |
| ) | |
| from src.classifier.predict import classify | |
| from src.ner.predict import extract_entities | |
| from src.privacy.redactor import get_redactor | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api") | |
| def _llm_http_error(exc: Exception) -> HTTPException: | |
| """Translate an LLM/agent exception into a calm, user-friendly HTTPException. | |
| The raw exception is already logged by the caller (logger.exception); this | |
| only shapes the message the UI shows verbatim. Anthropic surfaces transient | |
| capacity issues as an 'Overloaded' APIStatusError (HTTP 529) and rate limits | |
| as RateLimitError (429) β both are temporary and worth a retry. | |
| """ | |
| name = type(exc).__name__ | |
| text = str(exc).lower() | |
| if "overloaded" in text or name == "OverloadedError" or "529" in text: | |
| return HTTPException( | |
| status_code=503, | |
| detail=("Claude (the AI service) is briefly overloaded with requests. " | |
| "Please wait a few seconds and send your message again β your " | |
| "conversation is saved."), | |
| ) | |
| if name == "RateLimitError" or "rate limit" in text or "429" in text: | |
| return HTTPException( | |
| status_code=429, | |
| detail=("The AI service is handling a high volume of requests right now. " | |
| "Please wait about a minute and try again."), | |
| ) | |
| if name == "APITimeoutError" or "timeout" in text or "timed out" in text: | |
| return HTTPException( | |
| status_code=504, | |
| detail=("The AI took too long to respond. Please try sending your " | |
| "message again."), | |
| ) | |
| if name == "AuthenticationError" or "authentication" in text or "401" in text: | |
| return HTTPException( | |
| status_code=502, | |
| detail=("The assistant isn't configured correctly on the server " | |
| "(authentication failed). Please contact the site operator."), | |
| ) | |
| return HTTPException( | |
| status_code=502, | |
| detail=("The AI service ran into an unexpected error. Please try again in " | |
| "a moment; if it keeps happening, try rephrasing your message."), | |
| ) | |
| # Upload directory inside the project root β survives server restarts unlike | |
| # tempfile.gettempdir() which macOS can purge between sessions. | |
| _UPLOAD_DIR = Path(__file__).resolve().parent.parent.parent / "uploads" | |
| _UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| # --------------------------------------------------------------------------- | |
| # Health | |
| # --------------------------------------------------------------------------- | |
| async def health(): | |
| """ | |
| Return the operational status of every G.U.I.D.E. component. | |
| status = "ok" all critical components initialised | |
| status = "degraded" one or more components using a fallback or not loaded | |
| """ | |
| from src.api.main import _component_status # set during lifespan startup | |
| from src.api.models import HealthResponse | |
| overall = ( | |
| "ok" | |
| if all( | |
| v in ("ok", "keyword_fallback", "rule_fallback") | |
| for v in _component_status.values() | |
| ) | |
| else "degraded" | |
| ) | |
| return HealthResponse(status=overall, components=dict(_component_status)) | |
| # --------------------------------------------------------------------------- | |
| # Session lifecycle | |
| # --------------------------------------------------------------------------- | |
| async def create_session(): | |
| """Create a new stateful CMA session and return its session_id.""" | |
| session_id = await run_in_threadpool(session_store.create_session) | |
| logger.info("Session created: %s", session_id) | |
| return CreateSessionResponse(session_id=session_id) | |
| # --------------------------------------------------------------------------- | |
| # Message (Presidio redaction enforced here) | |
| # --------------------------------------------------------------------------- | |
| async def send_message(session_id: str, req: MessageRequest): | |
| """ | |
| Redact PII from *req.text*, forward the redacted text to the CMA session, | |
| and return the agent's reply together with redaction metadata. | |
| Redaction happens in this handler β GUIDEAgent.send_message only ever | |
| receives text that has already passed through Presidio. | |
| """ | |
| session = session_store.get_session(session_id) | |
| # 1. Presidio redaction (local, before any external call) | |
| redaction = get_redactor().redact(req.text) | |
| redacted_text = redaction.redacted_text | |
| # 1a. Privacy audit: record exactly what is about to leave the process, and | |
| # verify that none of the original PII values survive into the redacted text | |
| # that will be transmitted to Anthropic. This makes the privacy guarantee | |
| # provable rather than merely asserted. | |
| leaked = [ | |
| s.original for s in redaction.spans | |
| if s.original and s.original in redacted_text | |
| ] | |
| session_store.append_audit( | |
| session_id, | |
| { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "event": "outbound_to_anthropic", | |
| "description": ( | |
| f"Message sent to Claude API β {len(redaction.spans)} personal " | |
| f"identifier(s) stripped locally before transmission." | |
| if redaction.spans | |
| else "Message sent to Claude API β no personal identifiers detected." | |
| ), | |
| "transmitted_text": redacted_text, | |
| "pii_types_found": redaction.pii_types_found, | |
| "pii_count": len(redaction.spans), | |
| "leak_check": "failed" if leaked else "passed", | |
| }, | |
| ) | |
| # 2. CMA call (blocking β run in thread pool) | |
| _key = os.environ.get("ANTHROPIC_API_KEY", "") | |
| logger.info("ANTHROPIC_API_KEY present=%s len=%d prefix=%s", bool(_key), len(_key), _key[:8] if _key else "") | |
| try: | |
| reply: str = await run_in_threadpool( | |
| session.agent.send_message, redacted_text | |
| ) | |
| except NotImplementedError: | |
| raise HTTPException( | |
| status_code=501, | |
| detail="GUIDEAgent.send_message is not yet implemented.", | |
| ) | |
| except Exception as exc: | |
| logger.exception("CMA error in session %s", session_id) | |
| raise _llm_http_error(exc) from exc | |
| # 3. Persist conversation turn | |
| session_store.append_history(session_id, "user", redacted_text) | |
| session_store.append_history(session_id, "assistant", reply) | |
| return MessageResponse( | |
| reply=reply, | |
| pii_redacted=redaction.pii_redacted, | |
| pii_types_found=redaction.pii_types_found, | |
| original_text=req.text, | |
| redacted_text=redacted_text, | |
| redactions=[ | |
| RedactionSpanOut( | |
| entity_type=s.entity_type, | |
| original=s.original, | |
| placeholder=s.placeholder, | |
| start=s.start, | |
| end=s.end, | |
| ) | |
| for s in redaction.spans | |
| ], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Document upload | |
| # --------------------------------------------------------------------------- | |
| async def upload_document( | |
| session_id: str, | |
| file: UploadFile = File(...), | |
| ): | |
| """ | |
| Accept a document file, run DocumentProcessor (OCR β NER β ViT), register | |
| the file path with the CMA session so the next agent turn can call | |
| process_document, and return the extracted text and entities. | |
| Supported: .pdf .png .jpg .jpeg .webp | |
| Files are saved to a per-process temp directory; they persist for the | |
| duration of the server process so the agent can reference them by path. | |
| """ | |
| session = session_store.get_session(session_id) | |
| # Validate extension | |
| original_name = file.filename or "upload" | |
| suffix = Path(original_name).suffix.lower() | |
| allowed = {".pdf", ".png", ".jpg", ".jpeg", ".webp"} | |
| if suffix not in allowed: | |
| raise HTTPException( | |
| status_code=415, | |
| detail=f"Unsupported file type '{suffix}'. Allowed: {', '.join(sorted(allowed))}", | |
| ) | |
| # Sanitize filename: replace non-ASCII chars (e.g. macOS narrow no-break space | |
| # U+202F in screenshot filenames) and whitespace with underscores so the path | |
| # is safe to embed in agent messages without being truncated by Claude. | |
| safe_stem = re.sub(r"[^\x20-\x7E]", "_", Path(original_name).stem) | |
| safe_stem = re.sub(r"\s+", "_", safe_stem) | |
| safe_name = safe_stem + suffix | |
| tmp_path = _UPLOAD_DIR / f"{session_id}_{safe_name}" | |
| try: | |
| with tmp_path.open("wb") as fh: | |
| shutil.copyfileobj(file.file, fh) | |
| finally: | |
| await file.close() | |
| # DocumentProcessor (blocking) | |
| try: | |
| from src.document_processor.processor import get_processor | |
| result = await run_in_threadpool(get_processor().process, str(tmp_path)) | |
| except Exception as exc: | |
| logger.exception("DocumentProcessor failed for %s", original_name) | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Document processing failed: {exc}", | |
| ) from exc | |
| # Register path with CMA session | |
| try: | |
| await run_in_threadpool(session.agent.add_document, str(tmp_path)) | |
| except NotImplementedError: | |
| pass # agent stub β path stored in session, will be wired when agent is implemented | |
| # Serialise Entity dataclasses to EntityOut Pydantic models | |
| entity_out = [ | |
| EntityOut(**dataclasses.asdict(e)) for e in result["entities"] | |
| ] | |
| # Privacy audit: document OCR + NER run entirely on this server (no image or | |
| # text is sent to a third party at upload time). Record it so the audit trail | |
| # is a complete account of where the user's data went. | |
| session_store.append_audit( | |
| session_id, | |
| { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "event": "document_local", | |
| "description": ( | |
| f"Document '{original_name}' processed locally on this server " | |
| f"(OCR + entity extraction). {len(entity_out)} detail(s) extracted. " | |
| "The image was not sent to any third-party service." | |
| ), | |
| "transmitted_text": "", | |
| "pii_types_found": [], | |
| "pii_count": 0, | |
| "leak_check": "n/a", | |
| }, | |
| ) | |
| logger.info( | |
| "Uploaded '%s' to session %s β %d chars, %d entities.", | |
| original_name, session_id, len(result["raw_text"]), len(entity_out), | |
| ) | |
| return UploadResponse( | |
| filename=original_name, | |
| raw_text=result["raw_text"], | |
| entities=entity_out, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # HITL β validate-entities | |
| # --------------------------------------------------------------------------- | |
| async def validate_entities(session_id: str, req: ValidateEntitiesRequest): | |
| """ | |
| HITL gate: receive user-confirmed (and optionally corrected) entity values, | |
| inject a [USER CONFIRMED] signal into the CMA, and return the agent's | |
| draft complaint reply. | |
| This endpoint must be called after the user reviews the entity summary | |
| presented by the agent in the Chat tab and clicks "Confirm & Generate Draft" | |
| in the Verify Entities tab. | |
| The agent uses the confirmed entities to fill the complaint draft accurately | |
| (important because PII redaction may have replaced, e.g., "HDFC Bank" with | |
| "<ORG>"; the HITL step restores the correct readable label). | |
| """ | |
| session = session_store.get_session(session_id) | |
| try: | |
| reply: str = await run_in_threadpool( | |
| session.agent.confirm_entities, req.entities | |
| ) | |
| except NotImplementedError: | |
| raise HTTPException( | |
| status_code=501, | |
| detail="GUIDEAgent.confirm_entities is not yet implemented.", | |
| ) | |
| except Exception as exc: | |
| logger.exception("confirm_entities error in session %s", session_id) | |
| raise _llm_http_error(exc) from exc | |
| # The [USER CONFIRMED] message and the draft reply are both part of history | |
| confirmed_marker = f"[USER CONFIRMED]: {req.entities}" | |
| session_store.append_history(session_id, "user", confirmed_marker) | |
| session_store.append_history(session_id, "assistant", reply) | |
| return MessageResponse(reply=reply, pii_redacted=False, pii_types_found=[]) | |
| # --------------------------------------------------------------------------- | |
| # Escalation guide β second, separate request after the draft | |
| # --------------------------------------------------------------------------- | |
| async def escalation_guide(session_id: str): | |
| """ | |
| Generate the escalation guide as a SEPARATE request from the draft letter. | |
| The draft (validate-entities) and the escalation guide are split into two | |
| distinct agent turns so each stays within the per-minute token budget and | |
| the token bucket can refill between them. The UI calls this immediately | |
| after the draft reply returns. | |
| """ | |
| session = session_store.get_session(session_id) | |
| try: | |
| reply: str = await run_in_threadpool(session.agent.generate_escalation) | |
| except Exception as exc: | |
| logger.exception("generate_escalation error in session %s", session_id) | |
| raise _llm_http_error(exc) from exc | |
| session_store.append_history(session_id, "user", "[Generate escalation guide]") | |
| session_store.append_history(session_id, "assistant", reply) | |
| return MessageResponse(reply=reply, pii_redacted=False, pii_types_found=[]) | |
| # --------------------------------------------------------------------------- | |
| # History | |
| # --------------------------------------------------------------------------- | |
| async def get_history(session_id: str): | |
| """Return the full conversation history for *session_id*.""" | |
| session = session_store.get_session(session_id) | |
| return HistoryResponse( | |
| session_id=session_id, | |
| history=[HistoryMessage(**turn) for turn in session.history], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Privacy audit trail | |
| # --------------------------------------------------------------------------- | |
| async def get_audit(session_id: str): | |
| """Return the privacy audit trail for *session_id* β a chronological account | |
| of every event where data left the process or was processed locally.""" | |
| session = session_store.get_session(session_id) | |
| return AuditResponse( | |
| session_id=session_id, | |
| entries=[AuditEntry(**entry) for entry in session.audit], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Debug β direct DL access (no session required) | |
| # --------------------------------------------------------------------------- | |
| async def classify_debug(req: ClassifyRequest): | |
| """ | |
| Classify *req.text* using DomainClassifier (or keyword fallback) directly. | |
| Does not redact PII β callers are responsible for not sending sensitive data | |
| to this debug endpoint in production. | |
| """ | |
| try: | |
| result = await run_in_threadpool(classify, req.text) | |
| except Exception as exc: | |
| raise _llm_http_error(exc) from exc | |
| return ClassifyResponse( | |
| domain=result.domain, | |
| confidence=result.confidence, | |
| all_probs=result.all_probs, | |
| ) | |
| async def extract_debug(req: ExtractRequest): | |
| """ | |
| Run EvidenceNER on *req.text* and return entity spans directly. | |
| Does not redact PII β debug use only. | |
| """ | |
| try: | |
| entities = await run_in_threadpool(extract_entities, req.text) | |
| except Exception as exc: | |
| raise _llm_http_error(exc) from exc | |
| return ExtractResponse( | |
| entities=[EntityOut(**dataclasses.asdict(e)) for e in entities] | |
| ) | |