3d_model / ylff /app.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Unified application entry point for YLFF.
Supports both CLI and API modes:
- CLI: python -m ylff [command] or ylff [command]
- API: uvicorn ylff.app:api_app or python -m ylff --api
- Profiling: python -m ylff profile [command]
"""
import logging
import sys
import uuid
from typing import Any, Callable
try:
from .config import get_settings
settings = get_settings()
except ImportError:
# Fallback if pydantic-settings not available
class SimpleSettings:
log_level = "INFO"
log_format = "text"
profiling_enabled = True
api_port = 8000
api_host = "0.0.0.0"
api_reload = False
settings = SimpleSettings()
# Configure logging based on settings
log_format = settings.log_format
if log_format == "json":
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
request_id = getattr(record, "request_id", None)
if request_id is not None:
log_entry["request_id"] = request_id
job_id = getattr(record, "job_id", None)
if job_id is not None:
log_entry["job_id"] = job_id
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
formatter = JSONFormatter()
else:
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.basicConfig(
level=getattr(logging, settings.log_level.upper(), logging.INFO),
handlers=[handler],
)
logger = logging.getLogger(__name__)
# ============================================================================
# CLI Application (Typer)
# ============================================================================
# Import CLI app from cli.py (lazy import to avoid circular dependencies)
def get_cli_app() -> Any:
"""Get CLI app, importing only when needed."""
from .cli import app as cli_app
return cli_app
# For direct access: cli_app = get_cli_app()
# But we'll use get_cli_app() in main() to avoid import issues
# ============================================================================
# API Application (FastAPI)
# ============================================================================
from fastapi import FastAPI, Request # noqa: E402
from fastapi.responses import JSONResponse # noqa: E402
from pydantic import ValidationError as PydanticValidationError # noqa: E402
from starlette.middleware.base import BaseHTTPMiddleware # noqa: E402
from .routers import ( # noqa: E402
audit_router,
health_router,
inference_router,
ingest_router,
jobs_router,
models_router,
profiling_router,
smoke_router,
teacher_router,
training_router,
validation_router,
visualization_router,
)
api_app = FastAPI(
title="YLFF API",
description="You Learn From Failure: BA-Supervised Fine-Tuning API",
version="1.0.0",
docs_url="/docs", # Swagger UI
redoc_url="/redoc", # ReDoc
openapi_url="/openapi.json", # OpenAPI schema
)
@api_app.on_event("startup")
async def startup_event():
"""Configure logging and profiling on startup."""
logger.info("YLFF API server starting up", extra={"version": api_app.version})
logger.info(f"Log level: {settings.log_level}, Format: {settings.log_format}")
# Artifact store (local by default; optional S3 for production)
try:
from .utils.artifact_store import build_artifact_store
api_app.state.artifact_store = build_artifact_store(
backend=getattr(settings, "artifact_store_backend", "local"),
root_dir=getattr(settings, "artifact_store_root_dir", None),
s3_bucket=getattr(settings, "artifact_store_s3_bucket", None),
s3_prefix=getattr(settings, "artifact_store_s3_prefix", "ylff/artifacts"),
s3_region=getattr(settings, "artifact_store_s3_region", None),
s3_endpoint_url=getattr(settings, "artifact_store_s3_endpoint_url", None),
)
logger.info(
"Artifact store initialized",
extra={"backend": getattr(settings, "artifact_store_backend", "local")},
)
except Exception as e:
logger.warning(
"Failed to initialize configured artifact store; continuing without it",
extra={
"error": str(e),
"backend": getattr(settings, "artifact_store_backend", "local"),
},
)
# Durable job storage (in-memory by default; optional Redis for production)
try:
from .utils.job_store import build_job_store
api_app.state.job_store = build_job_store(
backend=getattr(settings, "job_store_backend", "memory"),
redis_url=getattr(settings, "redis_url", None),
redis_key_prefix=getattr(settings, "redis_key_prefix", "ylff:jobs"),
)
logger.info(
"Job store initialized",
extra={"backend": getattr(settings, "job_store_backend", "memory")},
)
except Exception as e:
# Keep API usable even if optional Redis isn't installed/misconfigured.
logger.warning(
"Failed to initialize configured job store; falling back to in-memory",
extra={"error": str(e), "backend": getattr(settings, "job_store_backend", "memory")},
)
try:
from .utils.profiler import Profiler
profiler = Profiler.get_instance()
profiler.enabled = settings.profiling_enabled
if settings.profiling_enabled:
logger.info("Profiling enabled")
else:
logger.info("Profiling disabled (set YLFF_PROFILING_ENABLED=true to enable)")
except ImportError:
logger.info("Profiling not available")
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""Middleware for logging all requests and responses."""
async def dispatch(self, request: Request, call_next: Callable[[Request], Any]) -> Any:
import time
# Generate request ID
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
# Log request
start_time = time.time()
client_ip = request.client.host if request.client else "unknown"
logger.info(
f"Request started: {request.method} {request.url.path}",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"query_params": dict(request.query_params),
"client_ip": client_ip,
},
)
# Process request
try:
response = await call_next(request)
duration = time.time() - start_time
# Log response
logger.info(
f"Request completed: {request.method} {request.url.path}",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": duration * 1000,
},
)
# Add request ID to response headers
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Request failed: {request.method} {request.url.path}",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"duration_ms": duration * 1000,
"error": str(e),
"error_type": type(e).__name__,
},
exc_info=True,
)
raise
# Add middleware
api_app.add_middleware(RequestLoggingMiddleware)
@api_app.exception_handler(PydanticValidationError)
async def validation_exception_handler(
request: Request, exc: PydanticValidationError
) -> JSONResponse:
"""Handle Pydantic validation errors."""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.warning(
"Validation error in request",
extra={
"request_id": request_id,
"path": request.url.path,
"errors": exc.errors(),
},
)
return JSONResponse(
status_code=422,
content={
"error": "ValidationError",
"message": "Invalid request data",
"details": exc.errors(),
"request_id": request_id,
},
)
@api_app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""Handle unexpected exceptions."""
request_id = request.headers.get("X-Request-ID", "unknown")
logger.error(
"Unhandled exception in request",
extra={
"request_id": request_id,
"path": request.url.path,
"error_type": type(exc).__name__,
"error": str(exc),
},
exc_info=True,
)
return JSONResponse(
status_code=500,
content={
"error": "InternalServerError",
"message": "An unexpected error occurred",
"request_id": request_id,
},
)
# Include routers
api_app.include_router(health_router)
api_app.include_router(models_router)
api_app.include_router(validation_router, prefix="/api/v1")
api_app.include_router(training_router, prefix="/api/v1")
api_app.include_router(jobs_router, prefix="/api/v1")
api_app.include_router(profiling_router) # Already has /api/v1/profiling prefix
api_app.include_router(visualization_router, prefix="/api/v1")
api_app.include_router(ingest_router, prefix="/api/v1")
api_app.include_router(teacher_router, prefix="/api/v1")
api_app.include_router(inference_router, prefix="/api/v1")
api_app.include_router(audit_router, prefix="/api/v1")
api_app.include_router(smoke_router, prefix="/api/v1")
# For backward compatibility with uvicorn ylff.app:app
app = api_app
# ============================================================================
# Entry Point
# ============================================================================
def main() -> None:
"""
Main entry point that detects context and runs CLI or API.
Usage:
# CLI mode (default)
python -m ylff validate sequence /path/to/sequence
ylff validate sequence /path/to/sequence
# API mode
python -m ylff --api [--host 0.0.0.0] [--port 8000]
# or
uvicorn ylff.app:api_app --host 0.0.0.0 --port 8000
"""
# Check if we should run API mode
# Look for --api flag or if running via uvicorn/gunicorn
is_api_mode = (
"--api" in sys.argv
or any("uvicorn" in arg or "gunicorn" in arg for arg in sys.argv)
or "uvicorn" in sys.argv[0]
or "gunicorn" in sys.argv[0]
)
if is_api_mode:
# API mode - run uvicorn
import uvicorn
# Remove --api from args if present
if "--api" in sys.argv:
sys.argv.remove("--api")
# Check for dev mode
dev_mode = "--dev" in sys.argv or settings.api_reload
if "--dev" in sys.argv:
sys.argv.remove("--dev")
settings.api_reload = True
# Get host/port from args or settings
port = settings.api_port
host = settings.api_host
# Parse port/host from args if provided (override settings)
if "--port" in sys.argv:
idx = sys.argv.index("--port")
if idx + 1 < len(sys.argv):
port = int(sys.argv[idx + 1])
if "--host" in sys.argv:
idx = sys.argv.index("--host")
if idx + 1 < len(sys.argv):
host = sys.argv[idx + 1]
reload_msg = " (with hot reload)" if dev_mode else ""
logger.info(f"Starting YLFF API server on {host}:{port}{reload_msg}")
logger.info(f"API docs available at http://{host}:{port}/docs")
uvicorn.run(
api_app,
host=host,
port=port,
reload=dev_mode,
log_level=settings.log_level.lower(),
)
else:
# CLI mode (default)
cli_app = get_cli_app()
cli_app()
if __name__ == "__main__":
main()