Spaces:
Running
Running
| """ | |
| CustomerCore REST API β FastAPI Application Factory (Phase 10) | |
| This is the entry point for the entire API layer. It: | |
| 1. Creates the FastAPI application with OpenAPI metadata | |
| 2. Configures the application lifespan (startup/shutdown events) | |
| 3. Registers all middleware (CORS, security headers, request ID, rate limiting) | |
| 4. Mounts all routers (triage, health, metrics, stream) | |
| 5. Configures global exception handlers | |
| WHY A LIFESPAN CONTEXT MANAGER INSTEAD OF @app.on_event? | |
| ---------------------------------------------------------- | |
| FastAPI deprecated @app.on_event("startup") in favor of the lifespan context manager | |
| (introduced in Starlette 0.20+). The lifespan pattern is cleaner: | |
| - Startup code runs before yield | |
| - Shutdown code runs after yield | |
| - Resources created in startup are accessible in shutdown via the same scope | |
| - Matches Python's standard async context manager protocol | |
| RATE LIMITING STRATEGY: | |
| We use slowapi (a FastAPI wrapper around limits/redis-py) for per-tenant rate limiting. | |
| Rate limits are applied per tenant_id extracted from the JWT: | |
| - POST /triage: 100 requests/minute per tenant (generous for B2B) | |
| - GET /triage/{id}: 600 requests/minute (polling is frequent) | |
| - Global fallback: 1000 requests/minute per IP | |
| Why per-tenant and not per-IP? | |
| In B2B, one tenant may use multiple IPs (multiple offices, CDN, load balancers). | |
| IP-based limiting would incorrectly throttle legitimate traffic. | |
| Tenant-based limiting enforces fair usage across the platform. | |
| SECURITY HEADERS (added by middleware): | |
| X-Content-Type-Options: nosniff β prevents MIME type sniffing attacks | |
| X-Frame-Options: DENY β prevents clickjacking | |
| X-XSS-Protection: 1; mode=block β legacy XSS filter (modern browsers ignore this) | |
| Strict-Transport-Security β enforces HTTPS (only active in production) | |
| Content-Security-Policy β restricts resource loading origins | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import time | |
| import uuid | |
| from contextlib import asynccontextmanager | |
| import structlog | |
| from fastapi import FastAPI, Request, Response | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.errors import RateLimitExceeded | |
| from slowapi.util import get_remote_address | |
| from src.api.routers import health, metrics, stream, triage | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Structured logging setup (structlog) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # structlog produces machine-readable JSON logs in production and human-readable | |
| # colored logs in development. This is critical for log aggregation systems | |
| # (Datadog, Grafana Loki, AWS CloudWatch) that parse JSON log lines. | |
| structlog.configure( | |
| processors=[ | |
| structlog.contextvars.merge_contextvars, | |
| structlog.processors.TimeStamper(fmt="iso"), | |
| structlog.processors.add_log_level, | |
| structlog.processors.StackInfoRenderer(), | |
| structlog.dev.ConsoleRenderer() | |
| if os.getenv("APP_ENV", "development") == "development" | |
| else structlog.processors.JSONRenderer(), | |
| ], | |
| wrapper_class=structlog.make_filtering_bound_logger(20), # INFO level | |
| context_class=dict, | |
| logger_factory=structlog.PrintLoggerFactory(), | |
| ) | |
| log = structlog.get_logger("customercore.api") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Rate limiter | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_tenant_or_ip(request: Request) -> str: | |
| """ | |
| Rate limit key function: use tenant_id from JWT if available, else fall back to IP. | |
| This enables per-tenant rate limiting while still protecting unauthenticated endpoints. | |
| """ | |
| # Try to extract tenant_id from Authorization header (without full JWT validation) | |
| # Full validation happens in the route dependencies β here we just need the key | |
| auth_header = request.headers.get("Authorization", "") | |
| if auth_header.startswith("Bearer "): | |
| try: | |
| import jwt as pyjwt | |
| token = auth_header.split(" ")[1] | |
| secret = os.getenv("LITELLM_MASTER_KEY", "dev-key") | |
| payload = pyjwt.decode(token, secret, algorithms=["HS256"], | |
| options={"verify_exp": False}) | |
| tenant_id = payload.get("tenant_id") | |
| if tenant_id: | |
| return f"tenant:{tenant_id}" | |
| except Exception: | |
| pass | |
| return get_remote_address(request) | |
| limiter = Limiter(key_func=_get_tenant_or_ip) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Application lifespan (startup / shutdown) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def lifespan(app: FastAPI): | |
| """ | |
| Application lifespan context manager. | |
| STARTUP: Initialize all stateful components (connections, caches, model clients). | |
| SHUTDOWN: Gracefully close connections to prevent data loss. | |
| The pattern mirrors how production services handle rolling deployments: | |
| 1. New pod starts β lifespan startup runs | |
| 2. Kubernetes readiness probe passes β traffic starts flowing to new pod | |
| 3. Old pod receives SIGTERM β lifespan shutdown runs, connections drain | |
| 4. Old pod exits cleanly | |
| """ | |
| # ββ Startup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| log.info("CustomerCore API starting up", | |
| env=os.getenv("APP_ENV", "development"), | |
| version="1.0.0") | |
| # Run database migration checks | |
| try: | |
| from src.db.migrations import run_db_migrations | |
| run_db_migrations() | |
| except Exception as exc: | |
| log.warning("Database migration checks failed (non-fatal)", error=str(exc)) | |
| # Register Langfuse as LiteLLM callback (Phase 11 β LLM Observability) | |
| try: | |
| from src.monitoring.langfuse_tracer import setup_litellm_tracing | |
| tracing_enabled = setup_litellm_tracing() | |
| log.info("Langfuse LLM tracing", | |
| enabled=tracing_enabled, | |
| configured=bool(os.getenv("LANGFUSE_PUBLIC_KEY"))) | |
| except Exception as exc: | |
| log.warning("Langfuse init failed (non-fatal)", error=str(exc)) | |
| # Warm up LLM router (loads routing table into memory) | |
| try: | |
| from src.rag.router import LLMRouter, ROUTING_TABLE | |
| app.state.llm_router = LLMRouter() | |
| log.info("LLM router initialized", routing_entries=len(ROUTING_TABLE)) | |
| except Exception as exc: | |
| log.warning("LLM router init failed (non-fatal)", error=str(exc)) | |
| app.state.llm_router = None | |
| log.info("CustomerCore API ready to serve requests") | |
| yield # β Application runs here | |
| # ββ Shutdown βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| log.info("CustomerCore API shutting down gracefully") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FastAPI Application Factory | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_app() -> FastAPI: | |
| """ | |
| Create and configure the FastAPI application. | |
| Using a factory function (instead of a module-level `app = FastAPI()`) allows: | |
| - Easy creation of test instances with different configurations | |
| - Clean separation between app creation and configuration | |
| - Future support for multiple app variants (e.g., lightweight health-only app) | |
| """ | |
| app = FastAPI( | |
| title="CustomerCore API", | |
| description=( | |
| "Enterprise B2B AI Customer Support Platform\n\n" | |
| "Powered by: LangGraph 6-agent supervisor, Multi-tenant Hybrid RAG " | |
| "(ChromaDB + BM25 + RRF), Cryptographic Privacy Vault (AES-256-GCM), " | |
| "and a fine-tuned local LLM for on-premise deployment.\n\n" | |
| "**Authentication:** All endpoints require a JWT Bearer token. " | |
| "The `tenant_id` claim in the token determines data isolation boundaries.\n\n" | |
| "**Rate limits:** 100 triage submissions/minute per tenant." | |
| ), | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| docs_url="/docs", # Swagger UI | |
| redoc_url="/redoc", # ReDoc UI (cleaner for sharing with stakeholders) | |
| openapi_url="/openapi.json", | |
| contact={ | |
| "name": "Saibalaji Namburi", | |
| "url": "https://github.com/saibalajinamburi/CustomerCore", | |
| }, | |
| license_info={ | |
| "name": "MIT", | |
| "url": "https://opensource.org/licenses/MIT", | |
| }, | |
| ) | |
| # Attach rate limiter | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| # ββ CORS Middleware βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Allow the Operations Console frontend (Phase 13) to call the API. | |
| # In production, replace "*" with the specific frontend origin. | |
| allowed_origins = os.getenv( | |
| "CORS_ORIGINS", | |
| "http://localhost:3000,http://localhost:8080,https://customercore.hf.space" | |
| ).split(",") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allowed_origins, | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["Authorization", "Content-Type", "X-Request-ID"], | |
| ) | |
| # ββ Security Headers Middleware βββββββββββββββββββββββββββββββββββββββββ | |
| async def security_headers_middleware(request: Request, call_next) -> Response: | |
| """ | |
| Inject security headers on every response. | |
| These are required by OWASP and expected by enterprise security scanners. | |
| """ | |
| request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())) | |
| structlog.contextvars.bind_contextvars(request_id=request_id) | |
| start_time = time.time() | |
| response: Response = await call_next(request) | |
| duration_ms = int((time.time() - start_time) * 1000) | |
| # Security headers | |
| response.headers["X-Request-ID"] = request_id | |
| response.headers["X-Content-Type-Options"] = "nosniff" | |
| # Omit X-Frame-Options in Hugging Face Spaces to allow embedding in its iframe | |
| if not os.getenv("SPACE_ID"): | |
| response.headers["X-Frame-Options"] = "DENY" | |
| response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" | |
| if os.getenv("APP_ENV") == "production": | |
| response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" | |
| # Structured request log | |
| log.info( | |
| "HTTP request", | |
| method=request.method, | |
| path=request.url.path, | |
| status_code=response.status_code, | |
| duration_ms=duration_ms, | |
| request_id=request_id, | |
| ) | |
| structlog.contextvars.clear_contextvars() | |
| return response | |
| # ββ Global Exception Handlers βββββββββββββββββββββββββββββββββββββββββββ | |
| from src.db.repository import RepositoryError | |
| async def repository_error_handler(request: Request, exc: RepositoryError) -> JSONResponse: | |
| """ | |
| Catch repository exceptions (database, Supabase, connections). | |
| Returns detailed exception details so we can debug cloud issues in real-time. | |
| """ | |
| request_id = request.headers.get("X-Request-ID", "unknown") | |
| log.exception( | |
| "Repository error", | |
| path=request.url.path, | |
| request_id=request_id, | |
| error=str(exc), | |
| ) | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": "Database repository error", | |
| "request_id": request_id, | |
| "detail": str(exc), | |
| }, | |
| ) | |
| async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse: | |
| """ | |
| Catch-all handler for unexpected errors. | |
| Returns a clean JSON error (not a raw Python traceback) and logs the full | |
| exception for debugging. In Phase 13, this also sends to Sentry. | |
| """ | |
| request_id = request.headers.get("X-Request-ID", "unknown") | |
| log.exception( | |
| "Unhandled exception", | |
| path=request.url.path, | |
| request_id=request_id, | |
| error=str(exc), | |
| ) | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "error": "Internal server error", | |
| "request_id": request_id, | |
| "detail": str(exc), # Expose error detail for cloud debugging | |
| }, | |
| ) | |
| # ββ Routers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app.include_router(health.router) | |
| app.include_router(triage.router) | |
| app.include_router(stream.router) | |
| app.include_router(metrics.router) | |
| # ββ Test Token endpoint (for UI workspace) ββββββββββββββββββββββββββββββββ | |
| async def get_test_token(tenant_id: str = "acme-corp", role: str = "support_agent") -> JSONResponse: | |
| import jwt as pyjwt | |
| from src.api.auth import _get_secret_key | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "role": role, | |
| "iat": int(time.time()), | |
| "exp": int(time.time()) + 86_400, | |
| } | |
| secret = _get_secret_key() | |
| token = pyjwt.encode(payload, secret, algorithm="HS256") | |
| return JSONResponse({"token": token}) | |
| # ββ Root redirect to docs βββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def root(request: Request) -> Response: | |
| accept = request.headers.get("accept", "") | |
| # Serve the premium UI console to browsers or when running in HF Spaces | |
| if "text/html" in accept or os.getenv("SPACE_ID"): | |
| from fastapi.responses import HTMLResponse | |
| from src.api.ui import HTML_CONTENT | |
| return HTMLResponse(content=HTML_CONTENT) | |
| return JSONResponse({ | |
| "name": "CustomerCore API", | |
| "version": "1.0.0", | |
| "docs": "/docs", | |
| "health": "/api/v1/health", | |
| "github": "https://github.com/saibalajinamburi/CustomerCore", | |
| }) | |
| return app | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Application instance (used by uvicorn) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Trigger rebuild to load new secrets: 2026-05-29T12:57:00 | |
| app = create_app() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Development runner | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "src.api.main:app", | |
| host="0.0.0.0", | |
| port=8080, | |
| reload=True, # Auto-reload on file changes during development | |
| log_level="info", | |
| access_log=False, # Disabled β we have our own structured middleware logging | |
| ) | |