FairRelay / brain /app /main.py
MouleeswaranM's picture
Upload folder using huggingface_hub
fcf8749 verified
raw
history blame
6.61 kB
"""
Fair Dispatch System - FastAPI Application
Main entry point for the API server.
"""
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from starlette.responses import Response
from fastapi.staticfiles import StaticFiles
from app.config import get_settings
from app.api import (
allocation_router,
drivers_router,
routes_router,
feedback_router,
driver_api_router,
admin_router,
admin_learning_router,
allocation_langgraph_router,
consolidation_router,
)
from app.api.agent_events import router as agent_events_router
from app.api.runs import router as runs_router
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("fairrelay.brain")
settings = get_settings()
# Path to frontend directory
FRONTEND_DIR = Path(__file__).parent.parent / "frontend"
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler for startup and shutdown events."""
# Startup
logger.info(f"Starting {settings.app_title} v{settings.app_version} (env={settings.app_env})")
if not settings.is_production:
try:
from app.database import init_db
await init_db()
logger.info("Database tables initialized (dev mode)")
except Exception as e:
logger.warning(f"Database unavailable - running without persistence: {e}")
else:
# In production, just verify connectivity
try:
from app.database import check_db_health
healthy = await check_db_health()
if healthy:
logger.info("Database connected successfully")
else:
logger.warning("Database health check failed - some endpoints may not work")
except Exception as e:
logger.warning(f"Database check failed: {e}")
yield
# Shutdown
logger.info("Shutting down...")
# Create FastAPI application
app = FastAPI(
title=settings.app_title,
version=settings.app_version,
description="""
## Fair Dispatch System API
A fairness-focused route allocation system for delivery operations.
### Features
- **Route Clustering**: Groups packages using K-Means for efficient routes
- **Workload Scoring**: Calculates balanced workload metrics
- **Fairness Metrics**: Computes Gini index and fairness scores
- **Explainability**: Provides human-readable explanations for allocations
- **LangGraph Workflow**: Multi-agent orchestration with LangSmith tracing
### Main Endpoints
- `POST /api/v1/allocate` - Allocate packages to drivers (original)
- `POST /api/v1/allocate/langgraph` - Allocate with LangGraph workflow
- `POST /api/v1/consolidate` - AI Load Consolidation (5-agent LangGraph pipeline)
- `POST /api/v1/consolidate/simulate` - Compare consolidation scenarios
- `GET /api/v1/drivers/{id}` - Get driver details and stats
- `GET /api/v1/routes/{id}` - Get route details
- `POST /api/v1/feedback` - Submit driver feedback
- `GET /api/v1/agent-events/stream` - SSE stream for agent events
""",
lifespan=lifespan,
docs_url="/docs" if not settings.is_production else None,
redoc_url="/redoc" if not settings.is_production else None,
)
# Global exception handler - prevent stack trace leaks in production
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"Unhandled error on {request.method} {request.url.path}: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"},
)
# Add CORS middleware with configurable origins
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origin_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API routers
app.include_router(allocation_router, prefix=settings.api_prefix)
app.include_router(allocation_langgraph_router, prefix=settings.api_prefix)
app.include_router(drivers_router, prefix=settings.api_prefix)
app.include_router(routes_router, prefix=settings.api_prefix)
app.include_router(feedback_router, prefix=settings.api_prefix)
app.include_router(driver_api_router, prefix=settings.api_prefix)
app.include_router(admin_router, prefix=settings.api_prefix)
app.include_router(admin_learning_router, prefix=settings.api_prefix)
app.include_router(consolidation_router, prefix=settings.api_prefix)
# Include SSE agent events router (no prefix - it defines its own)
app.include_router(agent_events_router)
# Include run-scoped endpoints
app.include_router(runs_router, prefix=settings.api_prefix)
@app.get("/", tags=["Health"])
async def root():
"""Root endpoint - health check."""
return {
"status": "healthy",
"service": settings.app_title,
"version": settings.app_version,
}
@app.get("/health", tags=["Health"])
async def health_check():
"""Health check endpoint with actual DB verification."""
from app.database import check_db_health
db_ok = await check_db_health()
status = "healthy" if db_ok else "degraded"
return {
"status": status,
"database": "connected" if db_ok else "disconnected",
}
# Mount static files and demo endpoints
if FRONTEND_DIR.exists():
app.mount("/static", StaticFiles(directory=str(FRONTEND_DIR)), name="static")
NO_CACHE = {"Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0"}
@app.get("/demo/allocate", tags=["Demo"])
async def demo_allocate():
"""Serve the API demo page for testing allocation endpoint."""
demo_path = FRONTEND_DIR / "demo.html"
return FileResponse(demo_path, media_type="text/html", headers=NO_CACHE)
@app.get("/demo/visualization", tags=["Demo"])
async def demo_visualization():
"""Serve the agent visualization page."""
viz_path = FRONTEND_DIR / "visualization.html"
return FileResponse(viz_path, media_type="text/html", headers=NO_CACHE)
@app.get("/demo/consolidation", tags=["Demo"])
async def demo_consolidation():
"""Serve the 5-agent load consolidation pipeline visualization."""
path = FRONTEND_DIR / "consolidation.html"
return FileResponse(path, media_type="text/html", headers=NO_CACHE)