customercore / src /api /main.py
saibalajiomg's picture
Upload folder using huggingface_hub
41f4b01 verified
Raw
History Blame Contribute Delete
18 kB
"""
CustomerCore REST API β€” FastAPI Application Factory (Phase 10)
This is the entry point for the entire API layer. It:
1. Creates the FastAPI application with OpenAPI metadata
2. Configures the application lifespan (startup/shutdown events)
3. Registers all middleware (CORS, security headers, request ID, rate limiting)
4. Mounts all routers (triage, health, metrics, stream)
5. Configures global exception handlers
WHY A LIFESPAN CONTEXT MANAGER INSTEAD OF @app.on_event?
----------------------------------------------------------
FastAPI deprecated @app.on_event("startup") in favor of the lifespan context manager
(introduced in Starlette 0.20+). The lifespan pattern is cleaner:
- Startup code runs before yield
- Shutdown code runs after yield
- Resources created in startup are accessible in shutdown via the same scope
- Matches Python's standard async context manager protocol
RATE LIMITING STRATEGY:
We use slowapi (a FastAPI wrapper around limits/redis-py) for per-tenant rate limiting.
Rate limits are applied per tenant_id extracted from the JWT:
- POST /triage: 100 requests/minute per tenant (generous for B2B)
- GET /triage/{id}: 600 requests/minute (polling is frequent)
- Global fallback: 1000 requests/minute per IP
Why per-tenant and not per-IP?
In B2B, one tenant may use multiple IPs (multiple offices, CDN, load balancers).
IP-based limiting would incorrectly throttle legitimate traffic.
Tenant-based limiting enforces fair usage across the platform.
SECURITY HEADERS (added by middleware):
X-Content-Type-Options: nosniff β€” prevents MIME type sniffing attacks
X-Frame-Options: DENY β€” prevents clickjacking
X-XSS-Protection: 1; mode=block β€” legacy XSS filter (modern browsers ignore this)
Strict-Transport-Security β€” enforces HTTPS (only active in production)
Content-Security-Policy β€” restricts resource loading origins
"""
from __future__ import annotations
import os
import time
import uuid
from contextlib import asynccontextmanager
import structlog
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from src.api.routers import health, metrics, stream, triage
# ─────────────────────────────────────────────────────────────────────────────
# Structured logging setup (structlog)
# ─────────────────────────────────────────────────────────────────────────────
# structlog produces machine-readable JSON logs in production and human-readable
# colored logs in development. This is critical for log aggregation systems
# (Datadog, Grafana Loki, AWS CloudWatch) that parse JSON log lines.
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.ConsoleRenderer()
if os.getenv("APP_ENV", "development") == "development"
else structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(20), # INFO level
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
log = structlog.get_logger("customercore.api")
# ─────────────────────────────────────────────────────────────────────────────
# Rate limiter
# ─────────────────────────────────────────────────────────────────────────────
def _get_tenant_or_ip(request: Request) -> str:
"""
Rate limit key function: use tenant_id from JWT if available, else fall back to IP.
This enables per-tenant rate limiting while still protecting unauthenticated endpoints.
"""
# Try to extract tenant_id from Authorization header (without full JWT validation)
# Full validation happens in the route dependencies β€” here we just need the key
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
try:
import jwt as pyjwt
token = auth_header.split(" ")[1]
secret = os.getenv("LITELLM_MASTER_KEY", "dev-key")
payload = pyjwt.decode(token, secret, algorithms=["HS256"],
options={"verify_exp": False})
tenant_id = payload.get("tenant_id")
if tenant_id:
return f"tenant:{tenant_id}"
except Exception:
pass
return get_remote_address(request)
limiter = Limiter(key_func=_get_tenant_or_ip)
# ─────────────────────────────────────────────────────────────────────────────
# Application lifespan (startup / shutdown)
# ─────────────────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan context manager.
STARTUP: Initialize all stateful components (connections, caches, model clients).
SHUTDOWN: Gracefully close connections to prevent data loss.
The pattern mirrors how production services handle rolling deployments:
1. New pod starts β€” lifespan startup runs
2. Kubernetes readiness probe passes β€” traffic starts flowing to new pod
3. Old pod receives SIGTERM β€” lifespan shutdown runs, connections drain
4. Old pod exits cleanly
"""
# ── Startup ────────────────────────────────────────────────────────────
log.info("CustomerCore API starting up",
env=os.getenv("APP_ENV", "development"),
version="1.0.0")
# Run database migration checks
try:
from src.db.migrations import run_db_migrations
run_db_migrations()
except Exception as exc:
log.warning("Database migration checks failed (non-fatal)", error=str(exc))
# Register Langfuse as LiteLLM callback (Phase 11 β€” LLM Observability)
try:
from src.monitoring.langfuse_tracer import setup_litellm_tracing
tracing_enabled = setup_litellm_tracing()
log.info("Langfuse LLM tracing",
enabled=tracing_enabled,
configured=bool(os.getenv("LANGFUSE_PUBLIC_KEY")))
except Exception as exc:
log.warning("Langfuse init failed (non-fatal)", error=str(exc))
# Warm up LLM router (loads routing table into memory)
try:
from src.rag.router import LLMRouter, ROUTING_TABLE
app.state.llm_router = LLMRouter()
log.info("LLM router initialized", routing_entries=len(ROUTING_TABLE))
except Exception as exc:
log.warning("LLM router init failed (non-fatal)", error=str(exc))
app.state.llm_router = None
log.info("CustomerCore API ready to serve requests")
yield # ← Application runs here
# ── Shutdown ───────────────────────────────────────────────────────────
log.info("CustomerCore API shutting down gracefully")
# ─────────────────────────────────────────────────────────────────────────────
# FastAPI Application Factory
# ─────────────────────────────────────────────────────────────────────────────
def create_app() -> FastAPI:
"""
Create and configure the FastAPI application.
Using a factory function (instead of a module-level `app = FastAPI()`) allows:
- Easy creation of test instances with different configurations
- Clean separation between app creation and configuration
- Future support for multiple app variants (e.g., lightweight health-only app)
"""
app = FastAPI(
title="CustomerCore API",
description=(
"Enterprise B2B AI Customer Support Platform\n\n"
"Powered by: LangGraph 6-agent supervisor, Multi-tenant Hybrid RAG "
"(ChromaDB + BM25 + RRF), Cryptographic Privacy Vault (AES-256-GCM), "
"and a fine-tuned local LLM for on-premise deployment.\n\n"
"**Authentication:** All endpoints require a JWT Bearer token. "
"The `tenant_id` claim in the token determines data isolation boundaries.\n\n"
"**Rate limits:** 100 triage submissions/minute per tenant."
),
version="1.0.0",
lifespan=lifespan,
docs_url="/docs", # Swagger UI
redoc_url="/redoc", # ReDoc UI (cleaner for sharing with stakeholders)
openapi_url="/openapi.json",
contact={
"name": "Saibalaji Namburi",
"url": "https://github.com/saibalajinamburi/CustomerCore",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
)
# Attach rate limiter
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# ── CORS Middleware ─────────────────────────────────────────────────────
# Allow the Operations Console frontend (Phase 13) to call the API.
# In production, replace "*" with the specific frontend origin.
allowed_origins = os.getenv(
"CORS_ORIGINS",
"http://localhost:3000,http://localhost:8080,https://customercore.hf.space"
).split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["Authorization", "Content-Type", "X-Request-ID"],
)
# ── Security Headers Middleware ─────────────────────────────────────────
@app.middleware("http")
async def security_headers_middleware(request: Request, call_next) -> Response:
"""
Inject security headers on every response.
These are required by OWASP and expected by enterprise security scanners.
"""
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
structlog.contextvars.bind_contextvars(request_id=request_id)
start_time = time.time()
response: Response = await call_next(request)
duration_ms = int((time.time() - start_time) * 1000)
# Security headers
response.headers["X-Request-ID"] = request_id
response.headers["X-Content-Type-Options"] = "nosniff"
# Omit X-Frame-Options in Hugging Face Spaces to allow embedding in its iframe
if not os.getenv("SPACE_ID"):
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
if os.getenv("APP_ENV") == "production":
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Structured request log
log.info(
"HTTP request",
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration_ms=duration_ms,
request_id=request_id,
)
structlog.contextvars.clear_contextvars()
return response
# ── Global Exception Handlers ───────────────────────────────────────────
from src.db.repository import RepositoryError
@app.exception_handler(RepositoryError)
async def repository_error_handler(request: Request, exc: RepositoryError) -> JSONResponse:
"""
Catch repository exceptions (database, Supabase, connections).
Returns detailed exception details so we can debug cloud issues in real-time.
"""
request_id = request.headers.get("X-Request-ID", "unknown")
log.exception(
"Repository error",
path=request.url.path,
request_id=request_id,
error=str(exc),
)
return JSONResponse(
status_code=500,
content={
"error": "Database repository error",
"request_id": request_id,
"detail": str(exc),
},
)
@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""
Catch-all handler for unexpected errors.
Returns a clean JSON error (not a raw Python traceback) and logs the full
exception for debugging. In Phase 13, this also sends to Sentry.
"""
request_id = request.headers.get("X-Request-ID", "unknown")
log.exception(
"Unhandled exception",
path=request.url.path,
request_id=request_id,
error=str(exc),
)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"request_id": request_id,
"detail": str(exc), # Expose error detail for cloud debugging
},
)
# ── Routers ─────────────────────────────────────────────────────────────
app.include_router(health.router)
app.include_router(triage.router)
app.include_router(stream.router)
app.include_router(metrics.router)
# ── Test Token endpoint (for UI workspace) ────────────────────────────────
@app.get("/api/v1/test-token", include_in_schema=False)
async def get_test_token(tenant_id: str = "acme-corp", role: str = "support_agent") -> JSONResponse:
import jwt as pyjwt
from src.api.auth import _get_secret_key
payload = {
"tenant_id": tenant_id,
"role": role,
"iat": int(time.time()),
"exp": int(time.time()) + 86_400,
}
secret = _get_secret_key()
token = pyjwt.encode(payload, secret, algorithm="HS256")
return JSONResponse({"token": token})
# ── Root redirect to docs ───────────────────────────────────────────────
@app.get("/", include_in_schema=False)
async def root(request: Request) -> Response:
accept = request.headers.get("accept", "")
# Serve the premium UI console to browsers or when running in HF Spaces
if "text/html" in accept or os.getenv("SPACE_ID"):
from fastapi.responses import HTMLResponse
from src.api.ui import HTML_CONTENT
return HTMLResponse(content=HTML_CONTENT)
return JSONResponse({
"name": "CustomerCore API",
"version": "1.0.0",
"docs": "/docs",
"health": "/api/v1/health",
"github": "https://github.com/saibalajinamburi/CustomerCore",
})
return app
# ─────────────────────────────────────────────────────────────────────────────
# Application instance (used by uvicorn)
# ─────────────────────────────────────────────────────────────────────────────
# Trigger rebuild to load new secrets: 2026-05-29T12:57:00
app = create_app()
# ─────────────────────────────────────────────────────────────────────────────
# Development runner
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"src.api.main:app",
host="0.0.0.0",
port=8080,
reload=True, # Auto-reload on file changes during development
log_level="info",
access_log=False, # Disabled β€” we have our own structured middleware logging
)