"""FastAPI entrypoint for the ClassLens backend.""" from __future__ import annotations import os import json import asyncio from pathlib import Path from contextlib import asynccontextmanager from typing import Optional, AsyncIterator from chatkit.server import StreamingResult from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response, StreamingResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from .server import ClassLensChatServer from .oauth import router as oauth_router from .database import init_database from .google_sheets import fetch_google_form_responses, parse_csv_responses from .email_service import send_email_report from .status_tracker import get_status, subscribe, unsubscribe # Static files directory (for production deployment) STATIC_DIR = Path(__file__).parent.parent / "static" # Report template path (check multiple possible locations) # Docker: /home/user/app/report-template.html (parent.parent from app/main.py) # Dev: project_root/report-template.html (parent.parent.parent from backend/app/main.py) def find_report_template(): """Find report template in common locations.""" possible_paths = [ Path(__file__).parent.parent / "report-template.html", # Docker location Path(__file__).parent.parent.parent / "report-template.html", # Dev location ] for path in possible_paths: if path.exists(): return path return None REPORT_TEMPLATE_PATH = find_report_template() @asynccontextmanager async def lifespan(app: FastAPI): """Initialize database on startup.""" await init_database() yield app = FastAPI( title="ClassLens API", description="AI-powered exam analysis for teachers", version="1.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Include OAuth routes app.include_router(oauth_router) # ChatKit server instance chatkit_server = ClassLensChatServer() # ============================================================================= # ChatKit Endpoint # ============================================================================= @app.get("/api/models") async def get_available_models(): """Get list of available AI models.""" from .server import AVAILABLE_MODELS, DEFAULT_MODEL return { "models": AVAILABLE_MODELS, "default": DEFAULT_MODEL, } @app.post("/chatkit") async def chatkit_endpoint(request: Request) -> Response: """Proxy the ChatKit web component payload to the server implementation.""" payload = await request.body() # Try to extract model from multiple sources model = ( request.headers.get("X-Model") or request.query_params.get("model") or # Check if model is in cookies (set by frontend) request.cookies.get("selected_model") ) context = { "request": request, "model": model, # Pass model in context } # Also try to extract from payload if it's JSON try: import json payload_json = json.loads(payload) if isinstance(payload_json, dict): # Check various possible locations for model payload_model = ( payload_json.get("metadata", {}).get("model") or payload_json.get("model") or payload_json.get("config", {}).get("model") ) if payload_model: context["model"] = payload_model except: pass result = await chatkit_server.process(payload, context) if isinstance(result, StreamingResult): return StreamingResponse(result, media_type="text/event-stream") if hasattr(result, "json"): return Response(content=result.json, media_type="application/json") return JSONResponse(result) # ============================================================================= # Workflow Status Endpoints # ============================================================================= @app.get("/api/status/{session_id}") async def get_workflow_status(session_id: str): """Get current workflow status for a session.""" return get_status(session_id) async def status_stream(session_id: str) -> AsyncIterator[str]: """Generate SSE events for status updates.""" queue = await subscribe(session_id) try: # Send initial status status = get_status(session_id) yield f"data: {json.dumps(status)}\n\n" while True: try: # Wait for updates with timeout status = await asyncio.wait_for(queue.get(), timeout=30.0) yield f"data: {json.dumps(status)}\n\n" except asyncio.TimeoutError: # Send keepalive yield f": keepalive\n\n" except asyncio.CancelledError: pass finally: unsubscribe(session_id, queue) @app.get("/api/status/{session_id}/stream") async def stream_workflow_status(session_id: str): """Stream workflow status updates via Server-Sent Events (SSE).""" return StreamingResponse( status_stream(session_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) # ============================================================================= # Direct API Endpoints (for non-ChatKit integrations) # ============================================================================= class FetchResponsesRequest(BaseModel): google_form_url: str teacher_email: str answer_key: Optional[dict] = None class ParseCSVRequest(BaseModel): csv_content: str answer_key: Optional[dict] = None class SendEmailRequest(BaseModel): email: str subject: str body_markdown: str @app.post("/api/fetch_google_form_responses") async def api_fetch_responses(request: FetchResponsesRequest): """ Fetch and normalize responses from a Google Form/Sheets URL. This endpoint requires the teacher to have connected their Google account. """ result = await fetch_google_form_responses( request.google_form_url, request.teacher_email, request.answer_key ) return result @app.post("/api/parse_csv") async def api_parse_csv(request: ParseCSVRequest): """ Parse CSV content directly (fallback when Google OAuth is not available). """ result = parse_csv_responses(request.csv_content, request.answer_key) return result @app.post("/api/send_email_report") async def api_send_email(request: SendEmailRequest): """ Send an exam analysis report via email. """ result = await send_email_report( request.email, request.subject, request.body_markdown ) return result @app.get("/api/health") @app.get("/health") async def health_check(): """Health check endpoint for HF Spaces.""" return {"status": "healthy", "service": "ClassLens"} @app.get("/test-routes") async def test_routes(): """Test endpoint to verify routes are working.""" return { "message": "Routes are working!", "report_demo_route": "/report-demo", "available_paths": [ "/health", "/api/health", "/report-demo", "/test-routes" ] } @app.get("/report-template.html") async def serve_report_template(): """Serve the report template HTML file.""" template_path = Path(__file__).parent.parent / "static" / "report-template.html" if template_path.exists(): return FileResponse( template_path, media_type="text/html", headers={"Cache-Control": "public, max-age=3600"} ) return Response(status_code=404, content="Report template not found") # ============================================================================= # Report Demo Route # ============================================================================= @app.get("/report-demo", response_class=Response) async def serve_report_demo(): """Serve the HTML report template demo - publicly accessible.""" # Try multiple possible locations (in order of preference) base_dir = Path(__file__).parent.parent # /home/user/app in Docker possible_paths = [ base_dir / "report-template.html", # Docker: /home/user/app/report-template.html REPORT_TEMPLATE_PATH, # From find_report_template() (fallback) STATIC_DIR / "report-template.html" if STATIC_DIR.exists() else None, # Static dir Path(__file__).parent.parent.parent / "report-template.html", # Dev fallback ] # Filter out None values possible_paths = [p for p in possible_paths if p is not None] for template_path in possible_paths: if template_path and template_path.exists(): # Read file content and return with explicit headers try: content = template_path.read_text(encoding='utf-8') return Response( content=content, media_type="text/html; charset=utf-8", headers={ "Cache-Control": "public, max-age=3600", "X-Content-Type-Options": "nosniff", "Access-Control-Allow-Origin": "*", } ) except Exception as e: # If reading fails, try FileResponse as fallback return FileResponse( template_path, media_type="text/html; charset=utf-8", headers={ "Cache-Control": "public, max-age=3600", "Access-Control-Allow-Origin": "*", } ) # If not found, return helpful error message with debug info current_dir = base_dir files_in_dir = list(current_dir.glob("*")) if current_dir.exists() else [] searched_paths = [str(p) for p in possible_paths] debug_info = { "searched_paths": searched_paths, "current_dir": str(current_dir), "files_in_dir": [str(f.name) for f in files_in_dir[:10]], "static_dir": str(STATIC_DIR) if STATIC_DIR.exists() else "does not exist", "__file__": str(Path(__file__)), } return Response( content=f"Report template not found.\nDebug info:\n{json.dumps(debug_info, indent=2)}", status_code=404, media_type="text/plain", headers={"Access-Control-Allow-Origin": "*"} ) # ============================================================================= # Static File Serving (Production) # ============================================================================= # Mount static files if directory exists (production Docker build) if STATIC_DIR.exists(): # Serve static assets (JS, CSS, images) app.mount("/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="assets") @app.get("/favicon.ico") async def favicon(): favicon_path = STATIC_DIR / "favicon.ico" if favicon_path.exists(): return FileResponse(favicon_path) return Response(status_code=404) @app.get("/") async def serve_spa(): """Serve the React SPA.""" return FileResponse(STATIC_DIR / "index.html") @app.get("/{full_path:path}") async def serve_spa_routes(full_path: str): """Catch-all route to serve React SPA for client-side routing.""" # Don't serve SPA for API routes or specific endpoints excluded_paths = ("api/", "auth/", "chatkit", "report-demo", "test-routes", "health") if full_path.startswith(excluded_paths) or full_path in excluded_paths: return Response(status_code=404) # Check if it's a static file file_path = STATIC_DIR / full_path if file_path.exists() and file_path.is_file(): return FileResponse(file_path) # Otherwise serve index.html for SPA routing return FileResponse(STATIC_DIR / "index.html") else: # Development mode - show API info @app.get("/") async def root(): """Root endpoint with API information (dev mode).""" return { "name": "ClassLens API", "version": "1.0.0", "mode": "development", "description": "AI-powered exam analysis for teachers", "note": "Frontend served separately on port 3000", "endpoints": { "chatkit": "/chatkit", "oauth_start": "/auth/start?teacher_email=...", "oauth_callback": "/auth/callback", "auth_status": "/auth/status?teacher_email=...", "workflow_status": "/api/status/{session_id}", "workflow_stream": "/api/status/{session_id}/stream", "fetch_responses": "/api/fetch_google_form_responses", "parse_csv": "/api/parse_csv", "send_email": "/api/send_email_report", "health": "/api/health", } }