Spaces:
Runtime error
Runtime error
| """ | |
| Recall — custom frontend server (NAH-36). | |
| Serves the polished `Recall.dc.html` design (frontend/index.html) and exposes a | |
| thin JSON API over the EXISTING backend. The learning/content logic and the | |
| `schema.py` data contract are treated as an API and are NOT modified here. | |
| The Session dict lives server-side, keyed by a short id the client carries | |
| around (mirrors the single-session gr.State model the Gradio app uses) — so the | |
| reference answers in the deck never leave the server. | |
| Run it (stub mode is on by default): | |
| pip install -r requirements.txt | |
| python server.py # http://127.0.0.1:7860 | |
| Flip RECALL_STUB=0 once the real model is wired: | |
| RECALL_STUB=0 python server.py | |
| The legacy Gradio UI is still mounted at /gradio. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import tempfile | |
| import time | |
| import uuid | |
| from collections import OrderedDict | |
| from pathlib import Path | |
| from fastapi import FastAPI, File, Form, UploadFile | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from pydantic import BaseModel | |
| from starlette.concurrency import run_in_threadpool | |
| import content_pipeline as cp | |
| import learning_engine as le | |
| FRONTEND = Path(__file__).parent / "frontend" | |
| # Single-process session store. Bounded so a public Space can't be OOM'd by a | |
| # client looping /api/generate: sessions are evicted least-recently-used past | |
| # MAX_SESSIONS and expire after SESSION_TTL_SECONDS of inactivity. | |
| MAX_SESSIONS = int(os.getenv("RECALL_MAX_SESSIONS", "500")) | |
| SESSION_TTL_SECONDS = int(os.getenv("RECALL_SESSION_TTL", str(2 * 60 * 60))) # 2h | |
| # Caps on input size so a single request can't exhaust memory/disk. | |
| MAX_UPLOAD_BYTES = int(os.getenv("RECALL_MAX_UPLOAD_MB", "10")) * 1024 * 1024 | |
| MAX_TEXT_CHARS = int(os.getenv("RECALL_MAX_TEXT_CHARS", "200000")) # ~50k tokens | |
| # sid -> (session, last_access_epoch). OrderedDict gives O(1) LRU eviction. | |
| SESSIONS: "OrderedDict[str, tuple[dict, float]]" = OrderedDict() | |
| def _purge_expired(now: float | None = None) -> None: | |
| now = time.time() if now is None else now | |
| stale = [sid for sid, (_, ts) in SESSIONS.items() | |
| if now - ts > SESSION_TTL_SECONDS] | |
| for sid in stale: | |
| SESSIONS.pop(sid, None) | |
| def get_session(sid: str) -> dict | None: | |
| """Fetch a live session and mark it most-recently-used, or None if it's | |
| unknown/expired (callers already return a friendly 'session expired').""" | |
| _purge_expired() | |
| entry = SESSIONS.get(sid) | |
| if entry is None: | |
| return None | |
| session, _ = entry | |
| SESSIONS[sid] = (session, time.time()) | |
| SESSIONS.move_to_end(sid) | |
| return session | |
| def put_session(sid: str, session: dict) -> None: | |
| """Store/refresh a session, evicting the least-recently-used past the cap.""" | |
| _purge_expired() | |
| SESSIONS[sid] = (session, time.time()) | |
| SESSIONS.move_to_end(sid) | |
| while len(SESSIONS) > MAX_SESSIONS: | |
| SESSIONS.popitem(last=False) | |
| # The photosynthesis notes the design's "sample" affordances load. Lets the | |
| # Upload screen's sample chip work even with no real PDF on disk. | |
| SAMPLE_NOTES = ( | |
| "Photosynthesis happens in the chloroplast. The light-dependent reactions " | |
| "occur in the thylakoid membranes, where water is split, ATP and NADPH are " | |
| "produced, and oxygen is released. The Calvin cycle takes place in the " | |
| "stroma, where the enzyme RuBisCO fixes CO2 onto RuBP. Cellular respiration " | |
| "occurs in the mitochondria; most ATP is made during oxidative " | |
| "phosphorylation, as the electron transport chain pumps protons and oxygen " | |
| "acts as the final electron acceptor, forming water." | |
| ) | |
| # Reused verbatim from content_pipeline's image-only branch so the "scanned PDF" | |
| # sample chip demonstrates the real error copy even in stub mode. | |
| IMAGE_ONLY_MSG = ( | |
| "This PDF has no selectable text (looks scanned/image-only). " | |
| "Try a text-based PDF, or paste the notes instead." | |
| ) | |
| app = FastAPI(title="Recall") | |
| # ---- serialization --------------------------------------------------------- | |
| def _card_out(card: dict | None) -> dict | None: | |
| """The client never needs (or should see) the reference answer or the raw | |
| source chunk — strip the card down to what the UI renders.""" | |
| if not card: | |
| return None | |
| return { | |
| "id": card["id"], | |
| "question": card["question"], | |
| "topic": card["topic"], | |
| "difficulty": card["difficulty"], | |
| "parent_id": card.get("parent_id"), | |
| } | |
| def _view(session: dict) -> dict: | |
| """Display state the header / mastery bars / deck rail are built from.""" | |
| deck = session["deck"] | |
| history = session["history"] | |
| answered = len(history) | |
| total = len(deck) | |
| stats: dict[str, dict] = {} | |
| for h in history: | |
| s = stats.setdefault(h["topic"], {"correct": 0, "total": 0}) | |
| s["total"] += 1 | |
| if h["grade"] >= 3: | |
| s["correct"] += 1 | |
| return { | |
| "total": total, | |
| "answered": answered, | |
| "posDisplay": min(answered + 1, total) if total else 0, | |
| "streak": session["streak"], | |
| "topicStats": stats, | |
| "rail": [ | |
| {"id": c["id"], "topic": c["topic"], "injected": bool(c.get("parent_id"))} | |
| for c in deck | |
| ], | |
| } | |
| # ---- request models -------------------------------------------------------- | |
| class SidBody(BaseModel): | |
| sid: str | |
| class GradeBody(BaseModel): | |
| sid: str | |
| answer: str = "" | |
| class RegenBody(BaseModel): | |
| sid: str | |
| direction: str # "harder" | "easier" | |
| # ---- API ------------------------------------------------------------------- | |
| async def api_generate( | |
| text: str = Form(""), | |
| sample: str = Form(""), | |
| file: UploadFile | None = File(None), | |
| ): | |
| # The scanned-slides sample shows the real image-only error state. | |
| if sample == "scan": | |
| return JSONResponse({"error": IMAGE_ONLY_MSG}, status_code=422) | |
| need_more = ("I need a little more to work with — paste a paragraph of " | |
| "notes or pick a PDF, and I'll build your deck.") | |
| source = "" | |
| if text and text.strip(): | |
| source = text.strip()[:MAX_TEXT_CHARS] # cap to bound chunking work | |
| if len(source) < 40: | |
| return JSONResponse({"error": need_more}, status_code=400) | |
| elif sample == "bio": | |
| source = SAMPLE_NOTES | |
| elif file is not None: | |
| suffix = Path(file.filename or "upload").suffix or ".txt" | |
| # Stream to a temp file in capped chunks so an oversized upload never | |
| # gets fully buffered in memory. | |
| too_large = False | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| tmp_path = tmp.name | |
| size = 0 | |
| while chunk := await file.read(1024 * 1024): | |
| size += len(chunk) | |
| if size > MAX_UPLOAD_BYTES: | |
| too_large = True | |
| break | |
| tmp.write(chunk) | |
| try: | |
| if too_large: | |
| return JSONResponse( | |
| {"error": f"That file is too large (limit " | |
| f"{MAX_UPLOAD_BYTES // (1024 * 1024)} MB). Upload a " | |
| "smaller PDF or paste the notes instead."}, | |
| status_code=413, | |
| ) | |
| # Extraction (PDF parsing) is blocking — keep it off the event loop. | |
| source = await run_in_threadpool(cp.extract_text, tmp_path) | |
| except cp.ExtractionError as e: | |
| return JSONResponse({"error": str(e)}, status_code=422) | |
| finally: | |
| os.unlink(tmp_path) | |
| else: | |
| return JSONResponse({"error": need_more}, status_code=400) | |
| try: | |
| # Deck generation hits the model — run it in a worker thread so a slow | |
| # call doesn't block every other request on the event loop. | |
| deck = await run_in_threadpool(cp.generate_deck, source) | |
| except Exception as e: # noqa: BLE001 — surface as friendly copy, never crash | |
| return JSONResponse( | |
| {"error": f"Couldn't build a deck from that ({type(e).__name__}). " | |
| "Try different material."}, | |
| status_code=422, | |
| ) | |
| if not deck: | |
| return JSONResponse( | |
| {"error": "Couldn't generate questions from that. Try different material."}, | |
| status_code=422, | |
| ) | |
| session = le.init_session(deck) | |
| card = le.next_card(session) | |
| sid = uuid.uuid4().hex | |
| put_session(sid, session) | |
| return {"sid": sid, "card": _card_out(card), "view": _view(session)} | |
| async def api_grade(body: GradeBody): | |
| session = get_session(body.sid) | |
| if session is None: | |
| return JSONResponse({"error": "session expired"}, status_code=404) | |
| # Grading + follow-up generation hit the model; run the whole study step in | |
| # a worker thread so it doesn't block the event loop. | |
| grade, fups = await run_in_threadpool(le.grade_and_adapt, session, body.answer or "") | |
| if grade is None: | |
| return {"done": True, "view": _view(session)} | |
| injected_ids = [f["id"] for f in fups] | |
| put_session(body.sid, session) | |
| return { | |
| "grade": { | |
| "score": grade["score"], | |
| "correct": grade["correct"], | |
| "explanation": grade["explanation"], | |
| "missed": grade["missed_concept"], | |
| }, | |
| "injectedIds": injected_ids, | |
| "view": _view(session), | |
| } | |
| async def api_regenerate(body: RegenBody): | |
| session = get_session(body.sid) | |
| if session is None: | |
| return JSONResponse({"error": "session expired"}, status_code=404) | |
| card = le.next_card(session) | |
| if card is None: | |
| return {"card": None, "view": _view(session)} | |
| new = await run_in_threadpool(cp.regenerate, card, body.direction) # hits the model | |
| session = le.replace_card(session, card["id"], new) | |
| put_session(body.sid, session) | |
| out = _card_out(new) | |
| out["diffLabel"] = "harder" if body.direction == "harder" else "easier" | |
| return {"card": out, "view": _view(session)} | |
| async def api_next(body: SidBody): | |
| session = get_session(body.sid) | |
| if session is None: | |
| return JSONResponse({"error": "session expired"}, status_code=404) | |
| card = le.next_card(session) | |
| return {"card": _card_out(card), "view": _view(session)} | |
| async def api_recap(body: SidBody): | |
| session = get_session(body.sid) | |
| if session is None: | |
| return JSONResponse({"error": "session expired"}, status_code=404) | |
| r = await run_in_threadpool(le.recap, session) # reflection line hits the model | |
| return {"recap": r, "view": _view(session)} | |
| async def api_restart(body: SidBody): | |
| """Study the same source deck again from a clean session (Recap → restart).""" | |
| session = get_session(body.sid) | |
| if session is None: | |
| return JSONResponse({"error": "session expired"}, status_code=404) | |
| # Rebuild from the original (non-injected) cards only. | |
| base = [c for c in session["deck"] if not c.get("parent_id")] | |
| fresh = le.init_session(base) | |
| card = le.next_card(fresh) | |
| put_session(body.sid, fresh) | |
| return {"card": _card_out(card), "view": _view(fresh)} | |
| # ---- frontend -------------------------------------------------------------- | |
| async def index(): | |
| return FileResponse(FRONTEND / "index.html") | |
| # Keep the original Gradio Blocks app available as a fallback / debug surface. | |
| # Optional: the custom frontend + API don't need Gradio, so a broken/missing | |
| # Gradio install never takes the new server down. | |
| try: | |
| import gradio as gr | |
| from app import demo as _gradio_demo | |
| app = gr.mount_gradio_app(app, _gradio_demo, path="/gradio") | |
| # On HuggingFace Spaces, the Gradio SDK looks for a module-level `demo` | |
| # object. `gr.mount_gradio_app` returns a FastAPI app, not a Gradio demo, | |
| # so the SDK would fall back to running `python server.py` — which would | |
| # then call uvicorn and collide with the Space's own port. Expose the | |
| # underlying Gradio Blocks as `demo` so the SDK picks it up directly. | |
| demo = _gradio_demo | |
| except Exception as _e: # noqa: BLE001 | |
| print(f"[recall] legacy Gradio UI not mounted ({type(_e).__name__}: {_e})") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # On HuggingFace Spaces the Gradio SDK serves the app — running uvicorn | |
| # here would collide with the Space's own server on the same port. | |
| # Locally (no SPACE_ID), start uvicorn directly. | |
| if not os.getenv("SPACE_ID"): | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "7860"))) | |