chih.yikuan
Fix: Add explicit CORS headers and improve file serving for report-demo route
6884131
"""FastAPI entrypoint for the ClassLens backend."""
from __future__ import annotations
import os
import json
import asyncio
from pathlib import Path
from contextlib import asynccontextmanager
from typing import Optional, AsyncIterator
from chatkit.server import StreamingResult
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response, StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from .server import ClassLensChatServer
from .oauth import router as oauth_router
from .database import init_database
from .google_sheets import fetch_google_form_responses, parse_csv_responses
from .email_service import send_email_report
from .status_tracker import get_status, subscribe, unsubscribe
# Static files directory (for production deployment)
STATIC_DIR = Path(__file__).parent.parent / "static"
# Report template path (check multiple possible locations)
# Docker: /home/user/app/report-template.html (parent.parent from app/main.py)
# Dev: project_root/report-template.html (parent.parent.parent from backend/app/main.py)
def find_report_template():
"""Find report template in common locations."""
possible_paths = [
Path(__file__).parent.parent / "report-template.html", # Docker location
Path(__file__).parent.parent.parent / "report-template.html", # Dev location
]
for path in possible_paths:
if path.exists():
return path
return None
REPORT_TEMPLATE_PATH = find_report_template()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize database on startup."""
await init_database()
yield
app = FastAPI(
title="ClassLens API",
description="AI-powered exam analysis for teachers",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include OAuth routes
app.include_router(oauth_router)
# ChatKit server instance
chatkit_server = ClassLensChatServer()
# =============================================================================
# ChatKit Endpoint
# =============================================================================
@app.get("/api/models")
async def get_available_models():
"""Get list of available AI models."""
from .server import AVAILABLE_MODELS, DEFAULT_MODEL
return {
"models": AVAILABLE_MODELS,
"default": DEFAULT_MODEL,
}
@app.post("/chatkit")
async def chatkit_endpoint(request: Request) -> Response:
"""Proxy the ChatKit web component payload to the server implementation."""
payload = await request.body()
# Try to extract model from multiple sources
model = (
request.headers.get("X-Model") or
request.query_params.get("model") or
# Check if model is in cookies (set by frontend)
request.cookies.get("selected_model")
)
context = {
"request": request,
"model": model, # Pass model in context
}
# Also try to extract from payload if it's JSON
try:
import json
payload_json = json.loads(payload)
if isinstance(payload_json, dict):
# Check various possible locations for model
payload_model = (
payload_json.get("metadata", {}).get("model") or
payload_json.get("model") or
payload_json.get("config", {}).get("model")
)
if payload_model:
context["model"] = payload_model
except:
pass
result = await chatkit_server.process(payload, context)
if isinstance(result, StreamingResult):
return StreamingResponse(result, media_type="text/event-stream")
if hasattr(result, "json"):
return Response(content=result.json, media_type="application/json")
return JSONResponse(result)
# =============================================================================
# Workflow Status Endpoints
# =============================================================================
@app.get("/api/status/{session_id}")
async def get_workflow_status(session_id: str):
"""Get current workflow status for a session."""
return get_status(session_id)
async def status_stream(session_id: str) -> AsyncIterator[str]:
"""Generate SSE events for status updates."""
queue = await subscribe(session_id)
try:
# Send initial status
status = get_status(session_id)
yield f"data: {json.dumps(status)}\n\n"
while True:
try:
# Wait for updates with timeout
status = await asyncio.wait_for(queue.get(), timeout=30.0)
yield f"data: {json.dumps(status)}\n\n"
except asyncio.TimeoutError:
# Send keepalive
yield f": keepalive\n\n"
except asyncio.CancelledError:
pass
finally:
unsubscribe(session_id, queue)
@app.get("/api/status/{session_id}/stream")
async def stream_workflow_status(session_id: str):
"""Stream workflow status updates via Server-Sent Events (SSE)."""
return StreamingResponse(
status_stream(session_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
# =============================================================================
# Direct API Endpoints (for non-ChatKit integrations)
# =============================================================================
class FetchResponsesRequest(BaseModel):
google_form_url: str
teacher_email: str
answer_key: Optional[dict] = None
class ParseCSVRequest(BaseModel):
csv_content: str
answer_key: Optional[dict] = None
class SendEmailRequest(BaseModel):
email: str
subject: str
body_markdown: str
@app.post("/api/fetch_google_form_responses")
async def api_fetch_responses(request: FetchResponsesRequest):
"""
Fetch and normalize responses from a Google Form/Sheets URL.
This endpoint requires the teacher to have connected their Google account.
"""
result = await fetch_google_form_responses(
request.google_form_url,
request.teacher_email,
request.answer_key
)
return result
@app.post("/api/parse_csv")
async def api_parse_csv(request: ParseCSVRequest):
"""
Parse CSV content directly (fallback when Google OAuth is not available).
"""
result = parse_csv_responses(request.csv_content, request.answer_key)
return result
@app.post("/api/send_email_report")
async def api_send_email(request: SendEmailRequest):
"""
Send an exam analysis report via email.
"""
result = await send_email_report(
request.email,
request.subject,
request.body_markdown
)
return result
@app.get("/api/health")
@app.get("/health")
async def health_check():
"""Health check endpoint for HF Spaces."""
return {"status": "healthy", "service": "ClassLens"}
@app.get("/test-routes")
async def test_routes():
"""Test endpoint to verify routes are working."""
return {
"message": "Routes are working!",
"report_demo_route": "/report-demo",
"available_paths": [
"/health",
"/api/health",
"/report-demo",
"/test-routes"
]
}
@app.get("/report-template.html")
async def serve_report_template():
"""Serve the report template HTML file."""
template_path = Path(__file__).parent.parent / "static" / "report-template.html"
if template_path.exists():
return FileResponse(
template_path,
media_type="text/html",
headers={"Cache-Control": "public, max-age=3600"}
)
return Response(status_code=404, content="Report template not found")
# =============================================================================
# Report Demo Route
# =============================================================================
@app.get("/report-demo", response_class=Response)
async def serve_report_demo():
"""Serve the HTML report template demo - publicly accessible."""
# Try multiple possible locations (in order of preference)
base_dir = Path(__file__).parent.parent # /home/user/app in Docker
possible_paths = [
base_dir / "report-template.html", # Docker: /home/user/app/report-template.html
REPORT_TEMPLATE_PATH, # From find_report_template() (fallback)
STATIC_DIR / "report-template.html" if STATIC_DIR.exists() else None, # Static dir
Path(__file__).parent.parent.parent / "report-template.html", # Dev fallback
]
# Filter out None values
possible_paths = [p for p in possible_paths if p is not None]
for template_path in possible_paths:
if template_path and template_path.exists():
# Read file content and return with explicit headers
try:
content = template_path.read_text(encoding='utf-8')
return Response(
content=content,
media_type="text/html; charset=utf-8",
headers={
"Cache-Control": "public, max-age=3600",
"X-Content-Type-Options": "nosniff",
"Access-Control-Allow-Origin": "*",
}
)
except Exception as e:
# If reading fails, try FileResponse as fallback
return FileResponse(
template_path,
media_type="text/html; charset=utf-8",
headers={
"Cache-Control": "public, max-age=3600",
"Access-Control-Allow-Origin": "*",
}
)
# If not found, return helpful error message with debug info
current_dir = base_dir
files_in_dir = list(current_dir.glob("*")) if current_dir.exists() else []
searched_paths = [str(p) for p in possible_paths]
debug_info = {
"searched_paths": searched_paths,
"current_dir": str(current_dir),
"files_in_dir": [str(f.name) for f in files_in_dir[:10]],
"static_dir": str(STATIC_DIR) if STATIC_DIR.exists() else "does not exist",
"__file__": str(Path(__file__)),
}
return Response(
content=f"Report template not found.\nDebug info:\n{json.dumps(debug_info, indent=2)}",
status_code=404,
media_type="text/plain",
headers={"Access-Control-Allow-Origin": "*"}
)
# =============================================================================
# Static File Serving (Production)
# =============================================================================
# Mount static files if directory exists (production Docker build)
if STATIC_DIR.exists():
# Serve static assets (JS, CSS, images)
app.mount("/assets", StaticFiles(directory=STATIC_DIR / "assets"), name="assets")
@app.get("/favicon.ico")
async def favicon():
favicon_path = STATIC_DIR / "favicon.ico"
if favicon_path.exists():
return FileResponse(favicon_path)
return Response(status_code=404)
@app.get("/")
async def serve_spa():
"""Serve the React SPA."""
return FileResponse(STATIC_DIR / "index.html")
@app.get("/{full_path:path}")
async def serve_spa_routes(full_path: str):
"""Catch-all route to serve React SPA for client-side routing."""
# Don't serve SPA for API routes or specific endpoints
excluded_paths = ("api/", "auth/", "chatkit", "report-demo", "test-routes", "health")
if full_path.startswith(excluded_paths) or full_path in excluded_paths:
return Response(status_code=404)
# Check if it's a static file
file_path = STATIC_DIR / full_path
if file_path.exists() and file_path.is_file():
return FileResponse(file_path)
# Otherwise serve index.html for SPA routing
return FileResponse(STATIC_DIR / "index.html")
else:
# Development mode - show API info
@app.get("/")
async def root():
"""Root endpoint with API information (dev mode)."""
return {
"name": "ClassLens API",
"version": "1.0.0",
"mode": "development",
"description": "AI-powered exam analysis for teachers",
"note": "Frontend served separately on port 3000",
"endpoints": {
"chatkit": "/chatkit",
"oauth_start": "/auth/start?teacher_email=...",
"oauth_callback": "/auth/callback",
"auth_status": "/auth/status?teacher_email=...",
"workflow_status": "/api/status/{session_id}",
"workflow_stream": "/api/status/{session_id}/stream",
"fetch_responses": "/api/fetch_google_form_responses",
"parse_csv": "/api/parse_csv",
"send_email": "/api/send_email_report",
"health": "/api/health",
}
}