| """ |
| FastAPI application: main entry point for the CX Bot. |
| |
| Fixes: |
| - Removed unused HTTPException import |
| - Removed redundant local cache_hit = False variable (now set per-branch cleanly) |
| - POST /index now also invalidates in-memory FAISS + BM25 caches |
| """ |
| import os |
| import time |
| import uuid |
| import logging |
| from typing import Optional |
|
|
| from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
| from fastapi.staticfiles import StaticFiles |
|
|
| from agents.billing_agent import BillingAgent |
| from agents.returns_agent import ReturnsAgent |
| from agents.escalation_agent import EscalationAgent |
| from agents.router import route |
| from pipeline.pii_redactor import redact |
| from memory.session_store import get_history, append_turn, clear_session |
| from cache.redis_cache import lookup as cache_lookup, store as cache_store |
| from audit.logger import write_audit, read_audit_logs |
| from rag.indexer import build_all_indexes, build_index |
| from rag.retriever import invalidate_faiss_cache |
| from rag.bm25_retriever import invalidate_cache as invalidate_bm25 |
| from config import DOCS_DIR, DOMAINS |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI( |
| title="Domain-Aware Multi-Agent CX Bot", |
| description="Production-grade customer experience platform with RAG-powered sub-agents.", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| _AGENTS = { |
| "billing": BillingAgent(), |
| "returns": ReturnsAgent(), |
| "escalation": EscalationAgent(), |
| } |
|
|
|
|
| |
|
|
| class QueryRequest(BaseModel): |
| session_id: str = Field(..., min_length=1, max_length=128) |
| query: str = Field(..., min_length=1, max_length=2000) |
|
|
|
|
| class QueryResponse(BaseModel): |
| answer: str |
| agent: str |
| sources: list |
| confidence: float |
| low_confidence: bool |
| disclaimer: str |
| audit_id: str |
| latency_ms: int |
| cache_hit: bool |
|
|
|
|
| class AuditResponse(BaseModel): |
| records: list |
| count: int |
|
|
|
|
| class HealthResponse(BaseModel): |
| status: str |
| agents: list |
|
|
|
|
| class UploadResponse(BaseModel): |
| status: str |
| domain: str |
| filename: str |
| message: str |
|
|
|
|
| |
|
|
| @app.get("/health", response_model=HealthResponse) |
| def health(): |
| return HealthResponse(status="ok", agents=list(_AGENTS.keys())) |
|
|
|
|
| @app.post("/query", response_model=QueryResponse) |
| def query_endpoint(req: QueryRequest): |
| start = time.time() |
|
|
| |
| redacted_query = redact(req.query) |
|
|
| |
| category, router_confidence = route(redacted_query) |
|
|
| |
| cached = cache_lookup(redacted_query, category) |
| if cached: |
| elapsed_ms = int((time.time() - start) * 1000) |
| audit_id = write_audit( |
| session_id=req.session_id, |
| raw_query=req.query, |
| redacted_query=redacted_query, |
| routed_to=category, |
| router_confidence=router_confidence, |
| retrieved_sources=cached.get("sources", []), |
| final_confidence=cached.get("confidence", 1.0), |
| low_confidence_flag=cached.get("low_confidence", False), |
| response_latency_ms=elapsed_ms, |
| cache_hit=True, |
| ) |
| return QueryResponse( |
| answer=cached.get("answer", ""), |
| agent=cached.get("agent", category), |
| sources=cached.get("sources", []), |
| confidence=cached.get("confidence", 1.0), |
| low_confidence=cached.get("low_confidence", False), |
| disclaimer=cached.get("disclaimer", ""), |
| audit_id=audit_id, |
| latency_ms=elapsed_ms, |
| cache_hit=True, |
| ) |
|
|
| |
| session_history = get_history(req.session_id) |
|
|
| |
| agent = _AGENTS.get(category, _AGENTS["escalation"]) |
| result = agent.run(redacted_query, router_confidence, session_history) |
|
|
| |
| append_turn(req.session_id, "user", redacted_query) |
| append_turn(req.session_id, "assistant", result["answer"]) |
|
|
| |
| cache_store(redacted_query, category, result) |
|
|
| |
| elapsed_ms = int((time.time() - start) * 1000) |
| audit_id = write_audit( |
| session_id=req.session_id, |
| raw_query=req.query, |
| redacted_query=redacted_query, |
| routed_to=category, |
| router_confidence=router_confidence, |
| retrieved_sources=result["sources"], |
| final_confidence=result["confidence"], |
| low_confidence_flag=result["low_confidence"], |
| response_latency_ms=elapsed_ms, |
| cache_hit=False, |
| ) |
|
|
| return QueryResponse( |
| **result, |
| audit_id=audit_id, |
| latency_ms=elapsed_ms, |
| cache_hit=False, |
| ) |
|
|
|
|
| @app.get("/audit", response_model=AuditResponse) |
| def audit_endpoint(date: Optional[str] = None, limit: int = 100): |
| records = read_audit_logs(date_str=date, limit=limit) |
| return AuditResponse(records=records, count=len(records)) |
|
|
|
|
| @app.delete("/session/{session_id}") |
| def clear_session_endpoint(session_id: str): |
| clear_session(session_id) |
| return {"status": "cleared", "session_id": session_id} |
|
|
|
|
| @app.post("/index") |
| def index_endpoint(background_tasks: BackgroundTasks): |
| """ |
| Trigger async re-indexing of all domain documents. |
| Also clears in-memory FAISS and BM25 caches so agents |
| immediately serve fresh indexes after rebuild. |
| """ |
| def _reindex(): |
| build_all_indexes() |
| invalidate_faiss_cache() |
| invalidate_bm25() |
|
|
| background_tasks.add_task(_reindex) |
| return {"status": "indexing started", "message": "Re-indexing running in background."} |
|
|
|
|
| ALLOWED_DOC_EXTENSIONS = {".txt", ".md"} |
| MAX_UPLOAD_BYTES = 5 * 1024 * 1024 |
|
|
|
|
| @app.post("/documents/upload", response_model=UploadResponse) |
| async def upload_document( |
| background_tasks: BackgroundTasks, |
| domain: str = Form(...), |
| file: UploadFile = File(...), |
| ): |
| """ |
| Upload a policy document into a domain's document folder and |
| trigger a background re-index of that domain only. |
| """ |
| if domain not in DOMAINS: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Invalid domain '{domain}'. Must be one of: {DOMAINS}", |
| ) |
|
|
| safe_name = os.path.basename(file.filename or "") |
| ext = os.path.splitext(safe_name)[1].lower() |
| if not safe_name or ext not in ALLOWED_DOC_EXTENSIONS: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unsupported file type '{ext}'. Allowed: {sorted(ALLOWED_DOC_EXTENSIONS)}", |
| ) |
|
|
| contents = await file.read() |
| if len(contents) > MAX_UPLOAD_BYTES: |
| raise HTTPException(status_code=400, detail="File too large (max 5MB).") |
| if not contents.strip(): |
| raise HTTPException(status_code=400, detail="File is empty.") |
|
|
| domain_dir = os.path.join(DOCS_DIR, domain) |
| os.makedirs(domain_dir, exist_ok=True) |
|
|
| dest_path = os.path.join(domain_dir, safe_name) |
| if os.path.exists(dest_path): |
| stem, ext2 = os.path.splitext(safe_name) |
| dest_path = os.path.join(domain_dir, f"{stem}_{uuid.uuid4().hex[:8]}{ext2}") |
|
|
| with open(dest_path, "wb") as f: |
| f.write(contents) |
|
|
| final_name = os.path.basename(dest_path) |
| logger.info(f"Uploaded '{final_name}' to domain '{domain}'. Triggering re-index.") |
|
|
| def _reindex_domain(): |
| build_index(domain) |
| invalidate_faiss_cache(domain) |
| invalidate_bm25(domain) |
|
|
| background_tasks.add_task(_reindex_domain) |
|
|
| return UploadResponse( |
| status="uploaded", |
| domain=domain, |
| filename=final_name, |
| message="File saved. Re-indexing started in background.", |
| ) |
|
|
| app.mount("/", StaticFiles(directory="static", html=True), name="static") |