recall / server.py
arturogp3's picture
Upload server.py with huggingface_hub
c1871d1 verified
Raw
History Blame Contribute Delete
12.7 kB
"""
Recall — custom frontend server (NAH-36).
Serves the polished `Recall.dc.html` design (frontend/index.html) and exposes a
thin JSON API over the EXISTING backend. The learning/content logic and the
`schema.py` data contract are treated as an API and are NOT modified here.
The Session dict lives server-side, keyed by a short id the client carries
around (mirrors the single-session gr.State model the Gradio app uses) — so the
reference answers in the deck never leave the server.
Run it (stub mode is on by default):
pip install -r requirements.txt
python server.py # http://127.0.0.1:7860
Flip RECALL_STUB=0 once the real model is wired:
RECALL_STUB=0 python server.py
The legacy Gradio UI is still mounted at /gradio.
"""
from __future__ import annotations
import os
import tempfile
import time
import uuid
from collections import OrderedDict
from pathlib import Path
from fastapi import FastAPI, File, Form, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from starlette.concurrency import run_in_threadpool
import content_pipeline as cp
import learning_engine as le
FRONTEND = Path(__file__).parent / "frontend"
# Single-process session store. Bounded so a public Space can't be OOM'd by a
# client looping /api/generate: sessions are evicted least-recently-used past
# MAX_SESSIONS and expire after SESSION_TTL_SECONDS of inactivity.
MAX_SESSIONS = int(os.getenv("RECALL_MAX_SESSIONS", "500"))
SESSION_TTL_SECONDS = int(os.getenv("RECALL_SESSION_TTL", str(2 * 60 * 60))) # 2h
# Caps on input size so a single request can't exhaust memory/disk.
MAX_UPLOAD_BYTES = int(os.getenv("RECALL_MAX_UPLOAD_MB", "10")) * 1024 * 1024
MAX_TEXT_CHARS = int(os.getenv("RECALL_MAX_TEXT_CHARS", "200000")) # ~50k tokens
# sid -> (session, last_access_epoch). OrderedDict gives O(1) LRU eviction.
SESSIONS: "OrderedDict[str, tuple[dict, float]]" = OrderedDict()
def _purge_expired(now: float | None = None) -> None:
now = time.time() if now is None else now
stale = [sid for sid, (_, ts) in SESSIONS.items()
if now - ts > SESSION_TTL_SECONDS]
for sid in stale:
SESSIONS.pop(sid, None)
def get_session(sid: str) -> dict | None:
"""Fetch a live session and mark it most-recently-used, or None if it's
unknown/expired (callers already return a friendly 'session expired')."""
_purge_expired()
entry = SESSIONS.get(sid)
if entry is None:
return None
session, _ = entry
SESSIONS[sid] = (session, time.time())
SESSIONS.move_to_end(sid)
return session
def put_session(sid: str, session: dict) -> None:
"""Store/refresh a session, evicting the least-recently-used past the cap."""
_purge_expired()
SESSIONS[sid] = (session, time.time())
SESSIONS.move_to_end(sid)
while len(SESSIONS) > MAX_SESSIONS:
SESSIONS.popitem(last=False)
# The photosynthesis notes the design's "sample" affordances load. Lets the
# Upload screen's sample chip work even with no real PDF on disk.
SAMPLE_NOTES = (
"Photosynthesis happens in the chloroplast. The light-dependent reactions "
"occur in the thylakoid membranes, where water is split, ATP and NADPH are "
"produced, and oxygen is released. The Calvin cycle takes place in the "
"stroma, where the enzyme RuBisCO fixes CO2 onto RuBP. Cellular respiration "
"occurs in the mitochondria; most ATP is made during oxidative "
"phosphorylation, as the electron transport chain pumps protons and oxygen "
"acts as the final electron acceptor, forming water."
)
# Reused verbatim from content_pipeline's image-only branch so the "scanned PDF"
# sample chip demonstrates the real error copy even in stub mode.
IMAGE_ONLY_MSG = (
"This PDF has no selectable text (looks scanned/image-only). "
"Try a text-based PDF, or paste the notes instead."
)
app = FastAPI(title="Recall")
# ---- serialization ---------------------------------------------------------
def _card_out(card: dict | None) -> dict | None:
"""The client never needs (or should see) the reference answer or the raw
source chunk — strip the card down to what the UI renders."""
if not card:
return None
return {
"id": card["id"],
"question": card["question"],
"topic": card["topic"],
"difficulty": card["difficulty"],
"parent_id": card.get("parent_id"),
}
def _view(session: dict) -> dict:
"""Display state the header / mastery bars / deck rail are built from."""
deck = session["deck"]
history = session["history"]
answered = len(history)
total = len(deck)
stats: dict[str, dict] = {}
for h in history:
s = stats.setdefault(h["topic"], {"correct": 0, "total": 0})
s["total"] += 1
if h["grade"] >= 3:
s["correct"] += 1
return {
"total": total,
"answered": answered,
"posDisplay": min(answered + 1, total) if total else 0,
"streak": session["streak"],
"topicStats": stats,
"rail": [
{"id": c["id"], "topic": c["topic"], "injected": bool(c.get("parent_id"))}
for c in deck
],
}
# ---- request models --------------------------------------------------------
class SidBody(BaseModel):
sid: str
class GradeBody(BaseModel):
sid: str
answer: str = ""
class RegenBody(BaseModel):
sid: str
direction: str # "harder" | "easier"
# ---- API -------------------------------------------------------------------
@app.post("/api/generate")
async def api_generate(
text: str = Form(""),
sample: str = Form(""),
file: UploadFile | None = File(None),
):
# The scanned-slides sample shows the real image-only error state.
if sample == "scan":
return JSONResponse({"error": IMAGE_ONLY_MSG}, status_code=422)
need_more = ("I need a little more to work with — paste a paragraph of "
"notes or pick a PDF, and I'll build your deck.")
source = ""
if text and text.strip():
source = text.strip()[:MAX_TEXT_CHARS] # cap to bound chunking work
if len(source) < 40:
return JSONResponse({"error": need_more}, status_code=400)
elif sample == "bio":
source = SAMPLE_NOTES
elif file is not None:
suffix = Path(file.filename or "upload").suffix or ".txt"
# Stream to a temp file in capped chunks so an oversized upload never
# gets fully buffered in memory.
too_large = False
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp_path = tmp.name
size = 0
while chunk := await file.read(1024 * 1024):
size += len(chunk)
if size > MAX_UPLOAD_BYTES:
too_large = True
break
tmp.write(chunk)
try:
if too_large:
return JSONResponse(
{"error": f"That file is too large (limit "
f"{MAX_UPLOAD_BYTES // (1024 * 1024)} MB). Upload a "
"smaller PDF or paste the notes instead."},
status_code=413,
)
# Extraction (PDF parsing) is blocking — keep it off the event loop.
source = await run_in_threadpool(cp.extract_text, tmp_path)
except cp.ExtractionError as e:
return JSONResponse({"error": str(e)}, status_code=422)
finally:
os.unlink(tmp_path)
else:
return JSONResponse({"error": need_more}, status_code=400)
try:
# Deck generation hits the model — run it in a worker thread so a slow
# call doesn't block every other request on the event loop.
deck = await run_in_threadpool(cp.generate_deck, source)
except Exception as e: # noqa: BLE001 — surface as friendly copy, never crash
return JSONResponse(
{"error": f"Couldn't build a deck from that ({type(e).__name__}). "
"Try different material."},
status_code=422,
)
if not deck:
return JSONResponse(
{"error": "Couldn't generate questions from that. Try different material."},
status_code=422,
)
session = le.init_session(deck)
card = le.next_card(session)
sid = uuid.uuid4().hex
put_session(sid, session)
return {"sid": sid, "card": _card_out(card), "view": _view(session)}
@app.post("/api/grade")
async def api_grade(body: GradeBody):
session = get_session(body.sid)
if session is None:
return JSONResponse({"error": "session expired"}, status_code=404)
# Grading + follow-up generation hit the model; run the whole study step in
# a worker thread so it doesn't block the event loop.
grade, fups = await run_in_threadpool(le.grade_and_adapt, session, body.answer or "")
if grade is None:
return {"done": True, "view": _view(session)}
injected_ids = [f["id"] for f in fups]
put_session(body.sid, session)
return {
"grade": {
"score": grade["score"],
"correct": grade["correct"],
"explanation": grade["explanation"],
"missed": grade["missed_concept"],
},
"injectedIds": injected_ids,
"view": _view(session),
}
@app.post("/api/regenerate")
async def api_regenerate(body: RegenBody):
session = get_session(body.sid)
if session is None:
return JSONResponse({"error": "session expired"}, status_code=404)
card = le.next_card(session)
if card is None:
return {"card": None, "view": _view(session)}
new = await run_in_threadpool(cp.regenerate, card, body.direction) # hits the model
session = le.replace_card(session, card["id"], new)
put_session(body.sid, session)
out = _card_out(new)
out["diffLabel"] = "harder" if body.direction == "harder" else "easier"
return {"card": out, "view": _view(session)}
@app.post("/api/next")
async def api_next(body: SidBody):
session = get_session(body.sid)
if session is None:
return JSONResponse({"error": "session expired"}, status_code=404)
card = le.next_card(session)
return {"card": _card_out(card), "view": _view(session)}
@app.post("/api/recap")
async def api_recap(body: SidBody):
session = get_session(body.sid)
if session is None:
return JSONResponse({"error": "session expired"}, status_code=404)
r = await run_in_threadpool(le.recap, session) # reflection line hits the model
return {"recap": r, "view": _view(session)}
@app.post("/api/restart")
async def api_restart(body: SidBody):
"""Study the same source deck again from a clean session (Recap → restart)."""
session = get_session(body.sid)
if session is None:
return JSONResponse({"error": "session expired"}, status_code=404)
# Rebuild from the original (non-injected) cards only.
base = [c for c in session["deck"] if not c.get("parent_id")]
fresh = le.init_session(base)
card = le.next_card(fresh)
put_session(body.sid, fresh)
return {"card": _card_out(card), "view": _view(fresh)}
# ---- frontend --------------------------------------------------------------
@app.get("/")
async def index():
return FileResponse(FRONTEND / "index.html")
# Keep the original Gradio Blocks app available as a fallback / debug surface.
# Optional: the custom frontend + API don't need Gradio, so a broken/missing
# Gradio install never takes the new server down.
try:
import gradio as gr
from app import demo as _gradio_demo
app = gr.mount_gradio_app(app, _gradio_demo, path="/gradio")
# On HuggingFace Spaces, the Gradio SDK looks for a module-level `demo`
# object. `gr.mount_gradio_app` returns a FastAPI app, not a Gradio demo,
# so the SDK would fall back to running `python server.py` — which would
# then call uvicorn and collide with the Space's own port. Expose the
# underlying Gradio Blocks as `demo` so the SDK picks it up directly.
demo = _gradio_demo
except Exception as _e: # noqa: BLE001
print(f"[recall] legacy Gradio UI not mounted ({type(_e).__name__}: {_e})")
if __name__ == "__main__":
import uvicorn
# On HuggingFace Spaces the Gradio SDK serves the app — running uvicorn
# here would collide with the Space's own server on the same port.
# Locally (no SPACE_ID), start uvicorn directly.
if not os.getenv("SPACE_ID"):
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", "7860")))