Yash030's picture
Add admin dashboard for session monitoring
f3220aa
raw
history blame
6.19 kB
"""FastAPI application factory and configuration."""
import traceback
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, Request
from fastapi.exception_handlers import request_validation_exception_handler
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from loguru import logger
from starlette.types import Receive, Scope, Send
from config.logging_config import configure_logging
from config.settings import get_settings
from providers.exceptions import ProviderError
from .admin import router as admin_router
from .routes import router
from .runtime import AppRuntime, startup_failure_message
from .validation_log import summarize_request_validation_body
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
runtime = AppRuntime.for_app(app, settings=get_settings())
await runtime.startup()
yield
await runtime.shutdown()
class GracefulLifespanApp:
"""ASGI wrapper that reports startup failures without Starlette tracebacks."""
def __init__(self, app: FastAPI):
self.app = app
def __getattr__(self, name: str) -> Any:
return getattr(self.app, name)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "lifespan":
await self.app(scope, receive, send)
return
await self._lifespan(receive, send)
async def _lifespan(self, receive: Receive, send: Send) -> None:
settings = get_settings()
runtime = AppRuntime.for_app(self.app, settings=settings)
startup_complete = False
while True:
message = await receive()
if message["type"] == "lifespan.startup":
try:
await runtime.startup()
except Exception as exc:
await send(
{
"type": "lifespan.startup.failed",
"message": startup_failure_message(settings, exc),
}
)
return
startup_complete = True
await send({"type": "lifespan.startup.complete"})
continue
if message["type"] == "lifespan.shutdown":
if startup_complete:
try:
await runtime.shutdown()
except Exception as exc:
logger.error("Shutdown failed: exc_type={}", type(exc).__name__)
await send({"type": "lifespan.shutdown.failed", "message": ""})
return
await send({"type": "lifespan.shutdown.complete"})
return
def create_app(*, lifespan_enabled: bool = True) -> FastAPI:
"""Create and configure the FastAPI application."""
settings = get_settings()
configure_logging(
settings.log_file, verbose_third_party=settings.log_raw_api_payloads
)
app_kwargs: dict[str, Any] = {
"title": "Claude Code Proxy",
"version": "2.0.0",
}
if lifespan_enabled:
app_kwargs["lifespan"] = lifespan
app = FastAPI(**app_kwargs)
# Register routes
app.include_router(router)
app.include_router(admin_router)
# Exception handlers
@app.exception_handler(RequestValidationError)
async def validation_error_handler(request: Request, exc: RequestValidationError):
"""Log request shape for 422 debugging without content values."""
body: Any
try:
body = await request.json()
except Exception as e:
body = {"_json_error": type(e).__name__}
message_summary, tool_names = summarize_request_validation_body(body)
logger.debug(
"Request validation failed: path={} query={} error_locs={} error_types={} message_summary={} tool_names={}",
request.url.path,
str(request.url.query),
[list(error.get("loc", ())) for error in exc.errors()],
[str(error.get("type", "")) for error in exc.errors()],
message_summary,
tool_names,
)
return await request_validation_exception_handler(request, exc)
@app.exception_handler(ProviderError)
async def provider_error_handler(request: Request, exc: ProviderError):
"""Handle provider-specific errors and return Anthropic format."""
err_settings = get_settings()
if err_settings.log_api_error_tracebacks:
logger.error(
"Provider Error: error_type={} status_code={} message={}",
exc.error_type,
exc.status_code,
exc.message,
)
else:
logger.error(
"Provider Error: error_type={} status_code={}",
exc.error_type,
exc.status_code,
)
return JSONResponse(
status_code=exc.status_code,
content=exc.to_anthropic_format(),
)
@app.exception_handler(Exception)
async def general_error_handler(request: Request, exc: Exception):
"""Handle general errors and return Anthropic format."""
settings = get_settings()
if settings.log_api_error_tracebacks:
logger.error("General Error: {}", exc)
logger.error(traceback.format_exc())
else:
logger.error(
"General Error: path={} method={} exc_type={}",
request.url.path,
request.method,
type(exc).__name__,
)
return JSONResponse(
status_code=500,
content={
"type": "error",
"error": {
"type": "api_error",
"message": "An unexpected error occurred.",
},
},
)
return app
def create_asgi_app() -> GracefulLifespanApp:
"""Create the server ASGI app with graceful lifespan failure reporting."""
return GracefulLifespanApp(create_app(lifespan_enabled=False))