# api/server.py import os import time import threading from typing import Dict, List, Optional, Any, Tuple from fastapi import FastAPI, UploadFile, File, Form, Request from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from api.config import DEFAULT_COURSE_TOPICS, DEFAULT_MODEL from api.syllabus_utils import extract_course_topics_from_file from api.rag_engine import build_rag_chunks_from_file, retrieve_relevant_chunks from api.clare_core import ( detect_language, chat_with_clare, update_weaknesses_from_message, update_cognitive_state_from_message, render_session_status, export_conversation, summarize_conversation, ) # ✅ LangSmith (optional) try: from langsmith import Client except Exception: Client = None # ---------------------------- # Paths / Constants # ---------------------------- API_DIR = os.path.dirname(__file__) MODULE10_PATH = os.path.join(API_DIR, "module10_responsible_ai.pdf") MODULE10_DOC_TYPE = "Literature Review / Paper" WEB_DIST = os.path.abspath(os.path.join(API_DIR, "..", "web", "build")) WEB_INDEX = os.path.join(WEB_DIST, "index.html") WEB_ASSETS = os.path.join(WEB_DIST, "assets") LS_DATASET_NAME = os.getenv("LS_DATASET_NAME", "clare_user_events").strip() LS_PROJECT = os.getenv("LANGSMITH_PROJECT", os.getenv("LANGCHAIN_PROJECT", "")).strip() # optional EXPERIMENT_ID = os.getenv("CLARE_EXPERIMENT_ID", "RESP_AI_W10").strip() # ---------------------------- # Health / Warmup (cold start mitigation) # ---------------------------- APP_START_TS = time.time() WARMUP_DONE = False WARMUP_ERROR: Optional[str] = None WARMUP_STARTED = False # warmup knobs CLARE_ENABLE_WARMUP = os.getenv("CLARE_ENABLE_WARMUP", "1").strip() == "1" CLARE_WARMUP_BLOCK_READY = os.getenv("CLARE_WARMUP_BLOCK_READY", "0").strip() == "1" # langsmith knobs (important for latency) CLARE_ENABLE_LANGSMITH_LOG = os.getenv("CLARE_ENABLE_LANGSMITH_LOG", "0").strip() == "1" # If true, logging is done in background thread to avoid blocking /api/chat CLARE_LANGSMITH_ASYNC = os.getenv("CLARE_LANGSMITH_ASYNC", "1").strip() == "1" # ---------------------------- # App # ---------------------------- app = FastAPI(title="Clare API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ---------------------------- # Static hosting (Vite build) # ---------------------------- if os.path.isdir(WEB_ASSETS): app.mount("/assets", StaticFiles(directory=WEB_ASSETS), name="assets") if os.path.isdir(WEB_DIST): app.mount("/static", StaticFiles(directory=WEB_DIST), name="static") @app.get("/") def index(): if os.path.exists(WEB_INDEX): return FileResponse(WEB_INDEX) return JSONResponse( {"detail": "web/build not found. Build frontend first (web/build/index.html)."}, status_code=500, ) # ---------------------------- # In-memory session store (MVP) # ---------------------------- SESSIONS: Dict[str, Dict[str, Any]] = {} def _preload_module10_chunks() -> List[Dict[str, Any]]: if os.path.exists(MODULE10_PATH): try: return build_rag_chunks_from_file(MODULE10_PATH, MODULE10_DOC_TYPE) or [] except Exception as e: print(f"[preload] module10 parse failed: {repr(e)}") return [] return [] # Preload at import time (fast path for requests) MODULE10_CHUNKS_CACHE = _preload_module10_chunks() def _get_session(user_id: str) -> Dict[str, Any]: if user_id not in SESSIONS: SESSIONS[user_id] = { "user_id": user_id, "name": "", "history": [], # List[Tuple[str, str]] "weaknesses": [], "cognitive_state": {"confusion": 0, "mastery": 0}, "course_outline": DEFAULT_COURSE_TOPICS, "rag_chunks": list(MODULE10_CHUNKS_CACHE), "model_name": DEFAULT_MODEL, } return SESSIONS[user_id] # ---------------------------- # Warmup (runs once, background) # ---------------------------- def _do_warmup_once(): """ Warm OpenAI connection + touch module10 chunks cache. Best-effort; should never crash the app. """ global WARMUP_DONE, WARMUP_ERROR, WARMUP_STARTED if WARMUP_STARTED: return WARMUP_STARTED = True try: # Warm OpenAI network / TLS / keep-alive from api.config import client # cheapest call: models.list() (no token usage) client.models.list() # Touch module10 cache (already loaded at import; this is just a safety) _ = MODULE10_CHUNKS_CACHE WARMUP_DONE = True WARMUP_ERROR = None except Exception as e: WARMUP_DONE = False WARMUP_ERROR = repr(e) def _start_warmup_background(): if not CLARE_ENABLE_WARMUP: return threading.Thread(target=_do_warmup_once, daemon=True).start() @app.on_event("startup") def _on_startup(): _start_warmup_background() # ---------------------------- # LangSmith helpers (optional; default OFF) # ---------------------------- _ls_client = None if (Client is not None) and CLARE_ENABLE_LANGSMITH_LOG: try: _ls_client = Client() except Exception as e: print("[langsmith] init failed:", repr(e)) _ls_client = None def _log_event_to_langsmith(data: Dict[str, Any]): """ Create an Example in LangSmith Dataset. Best-effort and non-blocking by default (async thread). """ if _ls_client is None: return def _do(): try: inputs = { "question": data.get("question", ""), "student_id": data.get("student_id", ""), "student_name": data.get("student_name", ""), } outputs = {"answer": data.get("answer", "")} metadata = {k: v for k, v in data.items() if k not in ("question", "answer")} if LS_PROJECT: metadata.setdefault("langsmith_project", LS_PROJECT) _ls_client.create_example( inputs=inputs, outputs=outputs, metadata=metadata, dataset_name=LS_DATASET_NAME, ) except Exception as e: print("[langsmith] log failed:", repr(e)) if CLARE_LANGSMITH_ASYNC: threading.Thread(target=_do, daemon=True).start() else: _do() # ---------------------------- # Health endpoints (pure lightweight) # ---------------------------- @app.get("/health") def health(): # do not touch LLM/RAG/disk heavy work here return { "ok": True, "uptime_s": round(time.time() - APP_START_TS, 3), "warmup_enabled": CLARE_ENABLE_WARMUP, "warmup_started": bool(WARMUP_STARTED), "warmup_done": bool(WARMUP_DONE), "warmup_error": WARMUP_ERROR, "ready": bool(WARMUP_DONE) if CLARE_WARMUP_BLOCK_READY else True, "langsmith_enabled": bool(CLARE_ENABLE_LANGSMITH_LOG), "langsmith_async": bool(CLARE_LANGSMITH_ASYNC), "ts": int(time.time()), } @app.get("/ready") def ready(): # readiness probe: optionally block until warmup completes if not CLARE_ENABLE_WARMUP or not CLARE_WARMUP_BLOCK_READY: return {"ready": True} if WARMUP_DONE: return {"ready": True} return JSONResponse({"ready": False, "error": WARMUP_ERROR}, status_code=503) # ---------------------------- # Schemas # ---------------------------- class LoginReq(BaseModel): name: str user_id: str class ChatReq(BaseModel): user_id: str message: str learning_mode: str language_preference: str = "Auto" doc_type: str = "Syllabus" class ExportReq(BaseModel): user_id: str learning_mode: str class SummaryReq(BaseModel): user_id: str learning_mode: str language_preference: str = "Auto" class FeedbackReq(BaseModel): user_id: str rating: str # "helpful" | "not_helpful" assistant_message_id: Optional[str] = None assistant_text: str user_text: Optional[str] = "" comment: Optional[str] = "" refs: Optional[List[str]] = [] learning_mode: Optional[str] = None doc_type: Optional[str] = None timestamp_ms: Optional[int] = None # ---------------------------- # API Routes # ---------------------------- @app.post("/api/login") def login(req: LoginReq): user_id = (req.user_id or "").strip() name = (req.name or "").strip() if not user_id or not name: return JSONResponse({"ok": False, "error": "Missing name/user_id"}, status_code=400) sess = _get_session(user_id) sess["name"] = name return {"ok": True, "user": {"name": name, "user_id": user_id}} @app.post("/api/chat") def chat(req: ChatReq): user_id = (req.user_id or "").strip() msg = (req.message or "").strip() if not user_id: return JSONResponse({"error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) if not msg: return { "reply": "", "session_status_md": render_session_status( req.learning_mode, sess["weaknesses"], sess["cognitive_state"] ), "refs": [], "latency_ms": 0.0, } # ---------------------------- # Latency breakdown marks (ms) # ---------------------------- t0 = time.time() marks_ms: Dict[str, float] = {"start": 0.0} # language detect resolved_lang = detect_language(msg, req.language_preference) marks_ms["language_detect_done"] = (time.time() - t0) * 1000.0 # weakness update sess["weaknesses"] = update_weaknesses_from_message(msg, sess["weaknesses"]) marks_ms["weakness_update_done"] = (time.time() - t0) * 1000.0 # cognitive update sess["cognitive_state"] = update_cognitive_state_from_message(msg, sess["cognitive_state"]) marks_ms["cognitive_update_done"] = (time.time() - t0) * 1000.0 # rag retrieve (optional micro-gate for very short messages) if len(msg) < 20 and ("?" not in msg): rag_context_text, rag_used_chunks = "", [] else: rag_context_text, rag_used_chunks = retrieve_relevant_chunks(msg, sess["rag_chunks"]) marks_ms["rag_retrieve_done"] = (time.time() - t0) * 1000.0 # llm try: answer, new_history = chat_with_clare( message=msg, history=sess["history"], model_name=sess["model_name"], language_preference=resolved_lang, learning_mode=req.learning_mode, doc_type=req.doc_type, course_outline=sess["course_outline"], weaknesses=sess["weaknesses"], cognitive_state=sess["cognitive_state"], rag_context=rag_context_text, ) except Exception as e: print(f"[chat] error: {repr(e)}") return JSONResponse({"error": f"chat failed: {repr(e)}"}, status_code=500) marks_ms["llm_done"] = (time.time() - t0) * 1000.0 total_ms = marks_ms["llm_done"] # segments (delta) ordered = [ "start", "language_detect_done", "weakness_update_done", "cognitive_update_done", "rag_retrieve_done", "llm_done", ] segments_ms: Dict[str, float] = {} for i in range(1, len(ordered)): a = ordered[i - 1] b = ordered[i] segments_ms[b] = max(0.0, marks_ms.get(b, 0.0) - marks_ms.get(a, 0.0)) latency_breakdown = {"marks_ms": marks_ms, "segments_ms": segments_ms, "total_ms": total_ms} sess["history"] = new_history refs = [ {"source_file": c.get("source_file"), "section": c.get("section")} for c in (rag_used_chunks or []) ] # extra metadata fields rag_context_chars = len(rag_context_text or "") rag_used_chunks_count = len(rag_used_chunks or []) history_len = len(sess["history"]) # ✅ log chat_turn to LangSmith (optional; async by default) _log_event_to_langsmith( { "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": sess.get("name", ""), "event_type": "chat_turn", "timestamp": time.time(), "latency_ms": total_ms, "latency_breakdown": latency_breakdown, "rag_context_chars": rag_context_chars, "rag_used_chunks_count": rag_used_chunks_count, "history_len": history_len, "question": msg, "answer": answer, "model_name": sess["model_name"], "language": resolved_lang, "learning_mode": req.learning_mode, "doc_type": req.doc_type, "refs": refs, } ) return { "reply": answer, "session_status_md": render_session_status( req.learning_mode, sess["weaknesses"], sess["cognitive_state"] ), "refs": refs, "latency_ms": total_ms, } @app.post("/api/upload") async def upload( user_id: str = Form(...), doc_type: str = Form(...), file: UploadFile = File(...), ): user_id = (user_id or "").strip() doc_type = (doc_type or "").strip() if not user_id: return JSONResponse({"ok": False, "error": "Missing user_id"}, status_code=400) if not file or not file.filename: return JSONResponse({"ok": False, "error": "Missing file"}, status_code=400) sess = _get_session(user_id) safe_name = os.path.basename(file.filename).replace("..", "_") tmp_path = os.path.join("/tmp", safe_name) content = await file.read() with open(tmp_path, "wb") as f: f.write(content) if doc_type == "Syllabus": class _F: pass fo = _F() fo.name = tmp_path try: sess["course_outline"] = extract_course_topics_from_file(fo, doc_type) except Exception as e: print(f"[upload] syllabus parse error: {repr(e)}") try: new_chunks = build_rag_chunks_from_file(tmp_path, doc_type) or [] sess["rag_chunks"] = (sess["rag_chunks"] or []) + new_chunks except Exception as e: print(f"[upload] rag build error: {repr(e)}") new_chunks = [] status_md = f"✅ Loaded base reading + uploaded {doc_type} file." _log_event_to_langsmith( { "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": sess.get("name", ""), "event_type": "upload", "timestamp": time.time(), "doc_type": doc_type, "filename": safe_name, "added_chunks": len(new_chunks), "question": f"[upload] {safe_name}", "answer": status_md, } ) return {"ok": True, "added_chunks": len(new_chunks), "status_md": status_md} @app.post("/api/feedback") def api_feedback(req: FeedbackReq): user_id = (req.user_id or "").strip() if not user_id: return JSONResponse({"ok": False, "error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) student_name = sess.get("name", "") rating = (req.rating or "").strip().lower() if rating not in ("helpful", "not_helpful"): return JSONResponse({"ok": False, "error": "Invalid rating"}, status_code=400) _log_event_to_langsmith( { "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": student_name, "event_type": "feedback", "timestamp": time.time(), "rating": rating, "assistant_message_id": req.assistant_message_id, "question": (req.user_text or "").strip(), "answer": (req.assistant_text or "").strip(), "comment": (req.comment or "").strip(), "refs": req.refs or [], "learning_mode": req.learning_mode, "doc_type": req.doc_type, "timestamp_ms": req.timestamp_ms, } ) return {"ok": True} @app.post("/api/export") def api_export(req: ExportReq): user_id = (req.user_id or "").strip() if not user_id: return JSONResponse({"error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) md = export_conversation( sess["history"], sess["course_outline"], req.learning_mode, sess["weaknesses"], sess["cognitive_state"], ) return {"markdown": md} @app.post("/api/summary") def api_summary(req: SummaryReq): user_id = (req.user_id or "").strip() if not user_id: return JSONResponse({"error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) md = summarize_conversation( sess["history"], sess["course_outline"], sess["weaknesses"], sess["cognitive_state"], sess["model_name"], req.language_preference, ) return {"markdown": md} @app.get("/api/memoryline") def memoryline(user_id: str): _ = _get_session((user_id or "").strip()) return {"next_review_label": "T+7", "progress_pct": 0.4} # ---------------------------- # SPA Fallback # ---------------------------- @app.get("/{full_path:path}") def spa_fallback(full_path: str, request: Request): if ( full_path.startswith("api/") or full_path.startswith("assets/") or full_path.startswith("static/") ): return JSONResponse({"detail": "Not Found"}, status_code=404) if os.path.exists(WEB_INDEX): return FileResponse(WEB_INDEX) return JSONResponse( {"detail": "web/build not found. Build frontend first (web/build/index.html)."}, status_code=500, )