# api/server.py import os import time import threading from typing import Dict, List, Optional, Any, Tuple from fastapi import FastAPI, UploadFile, File, Form, Request from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from api.config import DEFAULT_COURSE_TOPICS, DEFAULT_MODEL from api.syllabus_utils import extract_course_topics_from_file from api.rag_engine import build_rag_chunks_from_file, retrieve_relevant_chunks from api.clare_core import ( detect_language, chat_with_clare, update_weaknesses_from_message, update_cognitive_state_from_message, render_session_status, export_conversation, summarize_conversation, ) # ✅ LangSmith (optional) try: from langsmith import Client except Exception: Client = None # ---------------------------- # Paths / Constants # ---------------------------- API_DIR = os.path.dirname(__file__) MODULE10_PATH = os.path.join(API_DIR, "module10_responsible_ai.pdf") MODULE10_DOC_TYPE = "Literature Review / Paper" WEB_DIST = os.path.abspath(os.path.join(API_DIR, "..", "web", "build")) WEB_INDEX = os.path.join(WEB_DIST, "index.html") WEB_ASSETS = os.path.join(WEB_DIST, "assets") LS_DATASET_NAME = os.getenv("LS_DATASET_NAME", "clare_user_events").strip() LS_PROJECT = os.getenv("LANGSMITH_PROJECT", os.getenv("LANGCHAIN_PROJECT", "")).strip() EXPERIMENT_ID = os.getenv("CLARE_EXPERIMENT_ID", "RESP_AI_W10").strip() # ---------------------------- # Health / Warmup (cold start mitigation) # ---------------------------- APP_START_TS = time.time() WARMUP_DONE = False WARMUP_ERROR: Optional[str] = None WARMUP_STARTED = False CLARE_ENABLE_WARMUP = os.getenv("CLARE_ENABLE_WARMUP", "1").strip() == "1" CLARE_WARMUP_BLOCK_READY = os.getenv("CLARE_WARMUP_BLOCK_READY", "0").strip() == "1" # Dataset logging (create_example) CLARE_ENABLE_LANGSMITH_LOG = os.getenv("CLARE_ENABLE_LANGSMITH_LOG", "0").strip() == "1" CLARE_LANGSMITH_ASYNC = os.getenv("CLARE_LANGSMITH_ASYNC", "1").strip() == "1" # Feedback logging (create_feedback -> attach to run_id) CLARE_ENABLE_LANGSMITH_FEEDBACK = os.getenv("CLARE_ENABLE_LANGSMITH_FEEDBACK", "1").strip() == "1" # ---------------------------- # App # ---------------------------- app = FastAPI(title="Clare API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ---------------------------- # Static hosting (Vite build) # ---------------------------- if os.path.isdir(WEB_ASSETS): app.mount("/assets", StaticFiles(directory=WEB_ASSETS), name="assets") if os.path.isdir(WEB_DIST): app.mount("/static", StaticFiles(directory=WEB_DIST), name="static") @app.get("/") def index(): if os.path.exists(WEB_INDEX): return FileResponse(WEB_INDEX) return JSONResponse( {"detail": "web/build not found. Build frontend first (web/build/index.html)."}, status_code=500, ) # ---------------------------- # In-memory session store (MVP) # ---------------------------- SESSIONS: Dict[str, Dict[str, Any]] = {} def _preload_module10_chunks() -> List[Dict[str, Any]]: if os.path.exists(MODULE10_PATH): try: return build_rag_chunks_from_file(MODULE10_PATH, MODULE10_DOC_TYPE) or [] except Exception as e: print(f"[preload] module10 parse failed: {repr(e)}") return [] return [] MODULE10_CHUNKS_CACHE = _preload_module10_chunks() def _get_session(user_id: str) -> Dict[str, Any]: if user_id not in SESSIONS: SESSIONS[user_id] = { "user_id": user_id, "name": "", "history": [], # List[Tuple[str, str]] "weaknesses": [], "cognitive_state": {"confusion": 0, "mastery": 0}, "course_outline": DEFAULT_COURSE_TOPICS, "rag_chunks": list(MODULE10_CHUNKS_CACHE), "model_name": DEFAULT_MODEL, } return SESSIONS[user_id] # ---------------------------- # Warmup # ---------------------------- def _do_warmup_once(): global WARMUP_DONE, WARMUP_ERROR, WARMUP_STARTED if WARMUP_STARTED: return WARMUP_STARTED = True try: from api.config import client client.models.list() _ = MODULE10_CHUNKS_CACHE WARMUP_DONE = True WARMUP_ERROR = None except Exception as e: WARMUP_DONE = False WARMUP_ERROR = repr(e) def _start_warmup_background(): if not CLARE_ENABLE_WARMUP: return threading.Thread(target=_do_warmup_once, daemon=True).start() @app.on_event("startup") def _on_startup(): _start_warmup_background() # ---------------------------- # LangSmith helpers # ---------------------------- _ls_client = None if (Client is not None) and CLARE_ENABLE_LANGSMITH_LOG: try: _ls_client = Client() except Exception as e: print("[langsmith] init failed:", repr(e)) _ls_client = None def _log_event_to_langsmith(data: Dict[str, Any]): """ Dataset logging: create_example into LS_DATASET_NAME """ if _ls_client is None: return def _do(): try: inputs = { "question": data.get("question", ""), "student_id": data.get("student_id", ""), "student_name": data.get("student_name", ""), } outputs = {"answer": data.get("answer", "")} # keep metadata clean and JSON-serializable metadata = {k: v for k, v in data.items() if k not in ("question", "answer")} if LS_PROJECT: metadata.setdefault("langsmith_project", LS_PROJECT) _ls_client.create_example( inputs=inputs, outputs=outputs, metadata=metadata, dataset_name=LS_DATASET_NAME, ) except Exception as e: print("[langsmith] log failed:", repr(e)) if CLARE_LANGSMITH_ASYNC: threading.Thread(target=_do, daemon=True).start() else: _do() def _write_feedback_to_langsmith_run( run_id: str, rating: str, comment: str = "", tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> bool: """ Run-level feedback: create_feedback attached to a specific run_id. This is separate from dataset create_example logging. """ if not CLARE_ENABLE_LANGSMITH_FEEDBACK: return False if Client is None: return False rid = (run_id or "").strip() if not rid: return False try: ls = Client() score = 1 if rating == "helpful" else 0 meta = metadata or {} if tags is not None: meta["tags"] = tags if LS_PROJECT: meta.setdefault("langsmith_project", LS_PROJECT) ls.create_feedback( run_id=rid, key="ui_rating", score=score, comment=comment or "", metadata=meta, ) return True except Exception as e: print("[langsmith] create_feedback failed:", repr(e)) return False # ---------------------------- # Health endpoints # ---------------------------- @app.get("/health") def health(): return { "ok": True, "uptime_s": round(time.time() - APP_START_TS, 3), "warmup_enabled": CLARE_ENABLE_WARMUP, "warmup_started": bool(WARMUP_STARTED), "warmup_done": bool(WARMUP_DONE), "warmup_error": WARMUP_ERROR, "ready": bool(WARMUP_DONE) if CLARE_WARMUP_BLOCK_READY else True, "langsmith_enabled": bool(CLARE_ENABLE_LANGSMITH_LOG), "langsmith_async": bool(CLARE_LANGSMITH_ASYNC), "langsmith_feedback_enabled": bool(CLARE_ENABLE_LANGSMITH_FEEDBACK), "ts": int(time.time()), } @app.get("/ready") def ready(): if not CLARE_ENABLE_WARMUP or not CLARE_WARMUP_BLOCK_READY: return {"ready": True} if WARMUP_DONE: return {"ready": True} return JSONResponse({"ready": False, "error": WARMUP_ERROR}, status_code=503) # ---------------------------- # Quiz (Micro-Quiz) Instruction # ---------------------------- MICRO_QUIZ_INSTRUCTION = ( "We are running a short micro-quiz session based ONLY on **Module 10 – " "Responsible AI (Alto, 2024, Chapter 12)** and the pre-loaded materials.\n\n" "Step 1 – Before asking any content question:\n" "• First ask me which quiz style I prefer right now:\n" " - (1) Multiple-choice questions\n" " - (2) Short-answer / open-ended questions\n" "• Ask me explicitly: \"Which quiz style do you prefer now: 1) Multiple-choice or 2) Short-answer? " "Please reply with 1 or 2.\"\n" "• Do NOT start a content question until I have answered 1 or 2.\n\n" "Step 2 – After I choose the style:\n" "• If I choose 1 (multiple-choice):\n" " - Ask ONE multiple-choice question at a time, based on Module 10 concepts " "(Responsible AI definition, risk types, mitigation layers, EU AI Act, etc.).\n" " - Provide 3–4 options (A, B, C, D) and make only one option clearly correct.\n" "• If I choose 2 (short-answer):\n" " - Ask ONE short-answer question at a time, also based on Module 10 concepts.\n" " - Do NOT show the answer when you ask the question.\n\n" "Step 3 – For each answer I give:\n" "• Grade my answer (correct / partially correct / incorrect).\n" "• Give a brief explanation and the correct answer.\n" "• Then ask if I want another question of the SAME style.\n" "• Continue this pattern until I explicitly say to stop.\n\n" "Please start by asking me which quiz style I prefer (1 = multiple-choice, 2 = short-answer). " "Do not ask any content question before I choose." ) # ---------------------------- # Schemas # ---------------------------- class LoginReq(BaseModel): name: str user_id: str class ChatReq(BaseModel): user_id: str message: str learning_mode: str language_preference: str = "Auto" doc_type: str = "Syllabus" class QuizStartReq(BaseModel): user_id: str language_preference: str = "Auto" doc_type: str = MODULE10_DOC_TYPE learning_mode: str = "quiz" class ExportReq(BaseModel): user_id: str learning_mode: str class SummaryReq(BaseModel): user_id: str learning_mode: str language_preference: str = "Auto" class FeedbackReq(BaseModel): # IMPORTANT: allow extra fields so FE can evolve without breaking backend class Config: extra = "ignore" user_id: str rating: str # "helpful" | "not_helpful" # NEW: attach feedback to a specific LangSmith run run_id: Optional[str] = None assistant_message_id: Optional[str] = None assistant_text: str user_text: Optional[str] = "" comment: Optional[str] = "" # optional structured fields tags: Optional[List[str]] = [] refs: Optional[List[str]] = [] learning_mode: Optional[str] = None doc_type: Optional[str] = None timestamp_ms: Optional[int] = None # ---------------------------- # API Routes # ---------------------------- @app.post("/api/login") def login(req: LoginReq): user_id = (req.user_id or "").strip() name = (req.name or "").strip() if not user_id or not name: return JSONResponse({"ok": False, "error": "Missing name/user_id"}, status_code=400) sess = _get_session(user_id) sess["name"] = name return {"ok": True, "user": {"name": name, "user_id": user_id}} @app.post("/api/chat") def chat(req: ChatReq): user_id = (req.user_id or "").strip() msg = (req.message or "").strip() if not user_id: return JSONResponse({"error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) if not msg: return { "reply": "", "session_status_md": render_session_status( req.learning_mode, sess["weaknesses"], sess["cognitive_state"] ), "refs": [], "latency_ms": 0.0, "run_id": None, } t0 = time.time() marks_ms: Dict[str, float] = {"start": 0.0} resolved_lang = detect_language(msg, req.language_preference) marks_ms["language_detect_done"] = (time.time() - t0) * 1000.0 sess["weaknesses"] = update_weaknesses_from_message(msg, sess["weaknesses"]) marks_ms["weakness_update_done"] = (time.time() - t0) * 1000.0 sess["cognitive_state"] = update_cognitive_state_from_message(msg, sess["cognitive_state"]) marks_ms["cognitive_update_done"] = (time.time() - t0) * 1000.0 if len(msg) < 20 and ("?" not in msg): rag_context_text, rag_used_chunks = "", [] else: rag_context_text, rag_used_chunks = retrieve_relevant_chunks(msg, sess["rag_chunks"]) marks_ms["rag_retrieve_done"] = (time.time() - t0) * 1000.0 try: answer, new_history, run_id = chat_with_clare( message=msg, history=sess["history"], model_name=sess["model_name"], language_preference=resolved_lang, learning_mode=req.learning_mode, doc_type=req.doc_type, course_outline=sess["course_outline"], weaknesses=sess["weaknesses"], cognitive_state=sess["cognitive_state"], rag_context=rag_context_text, ) except Exception as e: print(f"[chat] error: {repr(e)}") return JSONResponse({"error": f"chat failed: {repr(e)}"}, status_code=500) marks_ms["llm_done"] = (time.time() - t0) * 1000.0 total_ms = marks_ms["llm_done"] ordered = [ "start", "language_detect_done", "weakness_update_done", "cognitive_update_done", "rag_retrieve_done", "llm_done", ] segments_ms: Dict[str, float] = {} for i in range(1, len(ordered)): a = ordered[i - 1] b = ordered[i] segments_ms[b] = max(0.0, marks_ms.get(b, 0.0) - marks_ms.get(a, 0.0)) latency_breakdown = {"marks_ms": marks_ms, "segments_ms": segments_ms, "total_ms": total_ms} sess["history"] = new_history refs = [ {"source_file": c.get("source_file"), "section": c.get("section")} for c in (rag_used_chunks or []) ] rag_context_chars = len(rag_context_text or "") rag_used_chunks_count = len(rag_used_chunks or []) history_len = len(sess["history"]) _log_event_to_langsmith( { "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": sess.get("name", ""), "event_type": "chat_turn", "timestamp": time.time(), "latency_ms": total_ms, "latency_breakdown": latency_breakdown, "rag_context_chars": rag_context_chars, "rag_used_chunks_count": rag_used_chunks_count, "history_len": history_len, "question": msg, "answer": answer, "model_name": sess["model_name"], "language": resolved_lang, "learning_mode": req.learning_mode, "doc_type": req.doc_type, "refs": refs, "run_id": run_id, # NEW: keep in dataset metadata for debugging } ) return { "reply": answer, "session_status_md": render_session_status( req.learning_mode, sess["weaknesses"], sess["cognitive_state"] ), "refs": refs, "latency_ms": total_ms, "run_id": run_id, # NEW: FE attaches feedback to this run } @app.post("/api/quiz/start") def quiz_start(req: QuizStartReq): user_id = (req.user_id or "").strip() if not user_id: return JSONResponse({"error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) # 用 quiz instruction 启动(不更新 weaknesses/cognitive_state,避免“系统指令”污染状态) quiz_instruction = MICRO_QUIZ_INSTRUCTION t0 = time.time() # 语言:如果 Auto,让 detect_language 决定;否则按传入语言 resolved_lang = detect_language(quiz_instruction, req.language_preference) # RAG:强制用 module10/当前 session 的 chunks,检索一个稳定 query rag_context_text, rag_used_chunks = retrieve_relevant_chunks( "Module 10 quiz", sess["rag_chunks"] ) try: answer, new_history, run_id = chat_with_clare( message=quiz_instruction, history=sess["history"], # 直接接在当前会话 history 后面 model_name=sess["model_name"], language_preference=resolved_lang, learning_mode=req.learning_mode, # 默认 "quiz" doc_type=req.doc_type, course_outline=sess["course_outline"], weaknesses=sess["weaknesses"], cognitive_state=sess["cognitive_state"], rag_context=rag_context_text, ) except Exception as e: print(f"[quiz_start] error: {repr(e)}") return JSONResponse({"error": f"quiz_start failed: {repr(e)}"}, status_code=500) total_ms = (time.time() - t0) * 1000.0 # 写回 session history(后续用户回答继续走 /api/chat,会延续 quiz 上下文) sess["history"] = new_history refs = [ {"source_file": c.get("source_file"), "section": c.get("section")} for c in (rag_used_chunks or []) ] _log_event_to_langsmith( { "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": sess.get("name", ""), "event_type": "micro_quiz_start", "timestamp": time.time(), "latency_ms": total_ms, "question": "[micro_quiz_start] " + quiz_instruction[:200], "answer": answer, "model_name": sess["model_name"], "language": resolved_lang, "learning_mode": req.learning_mode, "doc_type": req.doc_type, "refs": refs, "rag_used_chunks_count": len(rag_used_chunks or []), "history_len": len(sess["history"]), "run_id": run_id, # NEW } ) return { "reply": answer, "session_status_md": render_session_status( req.learning_mode, sess["weaknesses"], sess["cognitive_state"] ), "refs": refs, "latency_ms": total_ms, "run_id": run_id, # NEW } @app.post("/api/upload") async def upload( user_id: str = Form(...), doc_type: str = Form(...), file: UploadFile = File(...), ): user_id = (user_id or "").strip() doc_type = (doc_type or "").strip() if not user_id: return JSONResponse({"ok": False, "error": "Missing user_id"}, status_code=400) if not file or not file.filename: return JSONResponse({"ok": False, "error": "Missing file"}, status_code=400) sess = _get_session(user_id) safe_name = os.path.basename(file.filename).replace("..", "_") tmp_path = os.path.join("/tmp", safe_name) content = await file.read() with open(tmp_path, "wb") as f: f.write(content) if doc_type == "Syllabus": class _F: pass fo = _F() fo.name = tmp_path try: sess["course_outline"] = extract_course_topics_from_file(fo, doc_type) except Exception as e: print(f"[upload] syllabus parse error: {repr(e)}") try: new_chunks = build_rag_chunks_from_file(tmp_path, doc_type) or [] sess["rag_chunks"] = (sess["rag_chunks"] or []) + new_chunks except Exception as e: print(f"[upload] rag build error: {repr(e)}") new_chunks = [] status_md = f"✅ Loaded base reading + uploaded {doc_type} file." _log_event_to_langsmith( { "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": sess.get("name", ""), "event_type": "upload", "timestamp": time.time(), "doc_type": doc_type, "filename": safe_name, "added_chunks": len(new_chunks), "question": f"[upload] {safe_name}", "answer": status_md, } ) return {"ok": True, "added_chunks": len(new_chunks), "status_md": status_md} @app.post("/api/feedback") def api_feedback(req: FeedbackReq): user_id = (req.user_id or "").strip() if not user_id: return JSONResponse({"ok": False, "error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) student_name = sess.get("name", "") rating = (req.rating or "").strip().lower() if rating not in ("helpful", "not_helpful"): return JSONResponse({"ok": False, "error": "Invalid rating"}, status_code=400) # normalize fields assistant_text = (req.assistant_text or "").strip() user_text = (req.user_text or "").strip() comment = (req.comment or "").strip() refs = req.refs or [] tags = req.tags or [] timestamp_ms = int(req.timestamp_ms or int(time.time() * 1000)) # 1) Dataset event log (what you already have) _log_event_to_langsmith( { "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": student_name, "event_type": "feedback", "timestamp": time.time(), "timestamp_ms": timestamp_ms, "rating": rating, "assistant_message_id": req.assistant_message_id, "run_id": req.run_id, # NEW # Keep the Example readable: "question": user_text, # what user asked (optional) "answer": assistant_text, # the assistant response being rated # metadata "comment": comment, "tags": tags, "refs": refs, "learning_mode": req.learning_mode, "doc_type": req.doc_type, } ) # 2) Run-level feedback (attach to actual LangSmith run) # Only works when FE provides run_id and LangSmith credentials are configured. wrote_run_feedback = False if req.run_id: wrote_run_feedback = _write_feedback_to_langsmith_run( run_id=req.run_id, rating=rating, comment=comment, tags=tags, metadata={ "experiment_id": EXPERIMENT_ID, "student_id": user_id, "student_name": student_name, "assistant_message_id": req.assistant_message_id, "learning_mode": req.learning_mode, "doc_type": req.doc_type, "refs": refs, "timestamp_ms": timestamp_ms, }, ) return {"ok": True, "run_feedback_written": wrote_run_feedback} @app.post("/api/export") def api_export(req: ExportReq): user_id = (req.user_id or "").strip() if not user_id: return JSONResponse({"error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) md = export_conversation( sess["history"], sess["course_outline"], req.learning_mode, sess["weaknesses"], sess["cognitive_state"], ) return {"markdown": md} @app.post("/api/summary") def api_summary(req: SummaryReq): user_id = (req.user_id or "").strip() if not user_id: return JSONResponse({"error": "Missing user_id"}, status_code=400) sess = _get_session(user_id) md = summarize_conversation( sess["history"], sess["course_outline"], sess["weaknesses"], sess["cognitive_state"], sess["model_name"], req.language_preference, ) return {"markdown": md} @app.get("/api/memoryline") def memoryline(user_id: str): _ = _get_session((user_id or "").strip()) return {"next_review_label": "T+7", "progress_pct": 0.4} # ---------------------------- # SPA Fallback # ---------------------------- @app.get("/{full_path:path}") def spa_fallback(full_path: str, request: Request): if ( full_path.startswith("api/") or full_path.startswith("assets/") or full_path.startswith("static/") ): return JSONResponse({"detail": "Not Found"}, status_code=404) if os.path.exists(WEB_INDEX): return FileResponse(WEB_INDEX) return JSONResponse( {"detail": "web/build not found. Build frontend first (web/build/index.html)."}, status_code=500, )