""" Recall — custom frontend server (NAH-36). Serves the polished `Recall.dc.html` design (frontend/index.html) and exposes a thin JSON API over the EXISTING backend. The learning/content logic and the `schema.py` data contract are treated as an API and are NOT modified here. The Session dict lives server-side, keyed by a short id the client carries around (mirrors the single-session gr.State model the Gradio app uses) — so the reference answers in the deck never leave the server. Run it (stub mode is on by default): pip install -r requirements.txt python server.py # http://127.0.0.1:7860 Flip RECALL_STUB=0 once the real model is wired: RECALL_STUB=0 python server.py The legacy Gradio UI is still mounted at /gradio. """ from __future__ import annotations import os import tempfile import time import uuid from collections import OrderedDict from pathlib import Path from fastapi import FastAPI, File, Form, UploadFile from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel from starlette.concurrency import run_in_threadpool import content_pipeline as cp import learning_engine as le FRONTEND = Path(__file__).parent / "frontend" # Single-process session store. Bounded so a public Space can't be OOM'd by a # client looping /api/generate: sessions are evicted least-recently-used past # MAX_SESSIONS and expire after SESSION_TTL_SECONDS of inactivity. MAX_SESSIONS = int(os.getenv("RECALL_MAX_SESSIONS", "500")) SESSION_TTL_SECONDS = int(os.getenv("RECALL_SESSION_TTL", str(2 * 60 * 60))) # 2h # Caps on input size so a single request can't exhaust memory/disk. MAX_UPLOAD_BYTES = int(os.getenv("RECALL_MAX_UPLOAD_MB", "10")) * 1024 * 1024 MAX_TEXT_CHARS = int(os.getenv("RECALL_MAX_TEXT_CHARS", "200000")) # ~50k tokens # sid -> (session, last_access_epoch). OrderedDict gives O(1) LRU eviction. SESSIONS: "OrderedDict[str, tuple[dict, float]]" = OrderedDict() def _purge_expired(now: float | None = None) -> None: now = time.time() if now is None else now stale = [sid for sid, (_, ts) in SESSIONS.items() if now - ts > SESSION_TTL_SECONDS] for sid in stale: SESSIONS.pop(sid, None) def get_session(sid: str) -> dict | None: """Fetch a live session and mark it most-recently-used, or None if it's unknown/expired (callers already return a friendly 'session expired').""" _purge_expired() entry = SESSIONS.get(sid) if entry is None: return None session, _ = entry SESSIONS[sid] = (session, time.time()) SESSIONS.move_to_end(sid) return session def put_session(sid: str, session: dict) -> None: """Store/refresh a session, evicting the least-recently-used past the cap.""" _purge_expired() SESSIONS[sid] = (session, time.time()) SESSIONS.move_to_end(sid) while len(SESSIONS) > MAX_SESSIONS: SESSIONS.popitem(last=False) # The photosynthesis notes the design's "sample" affordances load. Lets the # Upload screen's sample chip work even with no real PDF on disk. SAMPLE_NOTES = ( "Photosynthesis happens in the chloroplast. The light-dependent reactions " "occur in the thylakoid membranes, where water is split, ATP and NADPH are " "produced, and oxygen is released. The Calvin cycle takes place in the " "stroma, where the enzyme RuBisCO fixes CO2 onto RuBP. Cellular respiration " "occurs in the mitochondria; most ATP is made during oxidative " "phosphorylation, as the electron transport chain pumps protons and oxygen " "acts as the final electron acceptor, forming water." ) # Reused verbatim from content_pipeline's image-only branch so the "scanned PDF" # sample chip demonstrates the real error copy even in stub mode. IMAGE_ONLY_MSG = ( "This PDF has no selectable text (looks scanned/image-only). " "Try a text-based PDF, or paste the notes instead." ) app = FastAPI(title="Recall") # ---- serialization --------------------------------------------------------- def _card_out(card: dict | None) -> dict | None: """The client never needs (or should see) the reference answer or the raw source chunk — strip the card down to what the UI renders.""" if not card: return None return { "id": card["id"], "question": card["question"], "topic": card["topic"], "difficulty": card["difficulty"], "parent_id": card.get("parent_id"), } def _view(session: dict) -> dict: """Display state the header / mastery bars / deck rail are built from.""" deck = session["deck"] history = session["history"] answered = len(history) total = len(deck) stats: dict[str, dict] = {} for h in history: s = stats.setdefault(h["topic"], {"correct": 0, "total": 0}) s["total"] += 1 if h["grade"] >= 3: s["correct"] += 1 return { "total": total, "answered": answered, "posDisplay": min(answered + 1, total) if total else 0, "streak": session["streak"], "topicStats": stats, "rail": [ {"id": c["id"], "topic": c["topic"], "injected": bool(c.get("parent_id"))} for c in deck ], } # ---- request models -------------------------------------------------------- class SidBody(BaseModel): sid: str class GradeBody(BaseModel): sid: str answer: str = "" class RegenBody(BaseModel): sid: str direction: str # "harder" | "easier" # ---- API ------------------------------------------------------------------- @app.post("/api/generate") async def api_generate( text: str = Form(""), sample: str = Form(""), file: UploadFile | None = File(None), ): # The scanned-slides sample shows the real image-only error state. if sample == "scan": return JSONResponse({"error": IMAGE_ONLY_MSG}, status_code=422) need_more = ("I need a little more to work with — paste a paragraph of " "notes or pick a PDF, and I'll build your deck.") source = "" if text and text.strip(): source = text.strip()[:MAX_TEXT_CHARS] # cap to bound chunking work if len(source) < 40: return JSONResponse({"error": need_more}, status_code=400) elif sample == "bio": source = SAMPLE_NOTES elif file is not None: suffix = Path(file.filename or "upload").suffix or ".txt" # Stream to a temp file in capped chunks so an oversized upload never # gets fully buffered in memory. too_large = False with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp_path = tmp.name size = 0 while chunk := await file.read(1024 * 1024): size += len(chunk) if size > MAX_UPLOAD_BYTES: too_large = True break tmp.write(chunk) try: if too_large: return JSONResponse( {"error": f"That file is too large (limit " f"{MAX_UPLOAD_BYTES // (1024 * 1024)} MB). Upload a " "smaller PDF or paste the notes instead."}, status_code=413, ) # Extraction (PDF parsing) is blocking — keep it off the event loop. source = await run_in_threadpool(cp.extract_text, tmp_path) except cp.ExtractionError as e: return JSONResponse({"error": str(e)}, status_code=422) finally: os.unlink(tmp_path) else: return JSONResponse({"error": need_more}, status_code=400) try: # Deck generation hits the model — run it in a worker thread so a slow # call doesn't block every other request on the event loop. deck = await run_in_threadpool(cp.generate_deck, source) except Exception as e: # noqa: BLE001 — surface as friendly copy, never crash return JSONResponse( {"error": f"Couldn't build a deck from that ({type(e).__name__}). " "Try different material."}, status_code=422, ) if not deck: return JSONResponse( {"error": "Couldn't generate questions from that. Try different material."}, status_code=422, ) session = le.init_session(deck) card = le.next_card(session) sid = uuid.uuid4().hex put_session(sid, session) return {"sid": sid, "card": _card_out(card), "view": _view(session)} @app.post("/api/grade") async def api_grade(body: GradeBody): session = get_session(body.sid) if session is None: return JSONResponse({"error": "session expired"}, status_code=404) # Grading + follow-up generation hit the model; run the whole study step in # a worker thread so it doesn't block the event loop. grade, fups = await run_in_threadpool(le.grade_and_adapt, session, body.answer or "") if grade is None: return {"done": True, "view": _view(session)} injected_ids = [f["id"] for f in fups] put_session(body.sid, session) return { "grade": { "score": grade["score"], "correct": grade["correct"], "explanation": grade["explanation"], "missed": grade["missed_concept"], }, "injectedIds": injected_ids, "view": _view(session), } @app.post("/api/regenerate") async def api_regenerate(body: RegenBody): session = get_session(body.sid) if session is None: return JSONResponse({"error": "session expired"}, status_code=404) card = le.next_card(session) if card is None: return {"card": None, "view": _view(session)} new = await run_in_threadpool(cp.regenerate, card, body.direction) # hits the model session = le.replace_card(session, card["id"], new) put_session(body.sid, session) out = _card_out(new) out["diffLabel"] = "harder" if body.direction == "harder" else "easier" return {"card": out, "view": _view(session)} @app.post("/api/next") async def api_next(body: SidBody): session = get_session(body.sid) if session is None: return JSONResponse({"error": "session expired"}, status_code=404) card = le.next_card(session) return {"card": _card_out(card), "view": _view(session)} @app.post("/api/recap") async def api_recap(body: SidBody): session = get_session(body.sid) if session is None: return JSONResponse({"error": "session expired"}, status_code=404) r = await run_in_threadpool(le.recap, session) # reflection line hits the model return {"recap": r, "view": _view(session)} @app.post("/api/restart") async def api_restart(body: SidBody): """Study the same source deck again from a clean session (Recap → restart).""" session = get_session(body.sid) if session is None: return JSONResponse({"error": "session expired"}, status_code=404) # Rebuild from the original (non-injected) cards only. base = [c for c in session["deck"] if not c.get("parent_id")] fresh = le.init_session(base) card = le.next_card(fresh) put_session(body.sid, fresh) return {"card": _card_out(card), "view": _view(fresh)} # ---- frontend -------------------------------------------------------------- @app.get("/") async def index(): return FileResponse(FRONTEND / "index.html") # Keep the original Gradio Blocks app available as a fallback / debug surface. # Optional: the custom frontend + API don't need Gradio, so a broken/missing # Gradio install never takes the new server down. try: import gradio as gr from app import demo as _gradio_demo app = gr.mount_gradio_app(app, _gradio_demo, path="/gradio") # On HuggingFace Spaces, the Gradio SDK looks for a module-level `demo` # object. `gr.mount_gradio_app` returns a FastAPI app, not a Gradio demo, # so the SDK would fall back to running `python server.py` — which would # then call uvicorn and collide with the Space's own port. Expose the # underlying Gradio Blocks as `demo` so the SDK picks it up directly. demo = _gradio_demo except Exception as _e: # noqa: BLE001 print(f"[recall] legacy Gradio UI not mounted ({type(_e).__name__}: {_e})") if __name__ == "__main__": import uvicorn # On HuggingFace Spaces the Gradio SDK serves the app — running uvicorn # here would collide with the Space's own server on the same port. # Locally (no SPACE_ID), start uvicorn directly. if not os.getenv("SPACE_ID"): uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "7860")))