"""Duel of Nemotron - Hugging Face Space entry point. Hybrid architecture (decoupled, non-blocking): Browser (player attack) │ │ POST /api/pick_move ──▶ Gemma 3 270M + LoRA (CPU, ~100ms) │ reads STRATEGY_CACHE synchronously │ returns move + reasoning │ ▼ STRATEGY_CACHE ◀── background asyncio.Task refreshes every ~11s │ by calling Modal Nemotron (A10) in the │ background. NEVER blocks an attack. │ ▼ Modal Nemotron writes new aggression/defense/... weights + reasoning Duel lifecycle: POST /api/duel/start -- player presses DUEL; starts the refresher + opens an event log. Auto-stops after 5min idle. POST /api/duel/event -- frontend appends each exchange (the "chat log" between Nemotron strategy and Gemma execution). POST /api/duel/end -- freezes the log, returns the full transcript. GET /api/duel/summary -- Nemotron narrates the whole match once. Gradio interface stays at /gradio for the hackathon requirement. """ import asyncio import json import os import time import uuid from pathlib import Path import gradio as gr import httpx from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from gemma_npc import MOVES, get_model, pick_counter_move, make_move_mask, state_to_features, remap_bn_state_to_ln STATIC_DIR = Path(__file__).parent / "static" MODEL_SERVER = os.environ.get("MODEL_SERVER", "") GEMMA_SERVER = os.environ.get("GEMMA_SERVER", "").rstrip("/") ADAPTER_DIR = Path(__file__).parent / "adapters" / "ref" # How often the background task refreshes the strategy cache from Modal. # Jittered 10-12s so we don't hammer a cold-starting container on a fixed # cadence. The tiny model serves every attack from the existing cache in # the meantime, so this latency is invisible to the player. REFRESH_MIN = 10.0 REFRESH_MAX = 12.0 # If no duel is active for this long, the background refresher parks itself. DUEL_IDLE_TIMEOUT = 5 * 60 _tiny_model = None # legacy global; superseded by gemma_npc singleton # ---------------------------------------------------------------------------- # Strategy cache -- written by the background refresher, read synchronously # by /api/pick_move. This is what decouples Modal latency from the fast loop. # ---------------------------------------------------------------------------- DEFAULT_WEIGHTS = { "aggression": 0.55, "defense": 0.50, "parry_affinity": 0.40, "kick_affinity": 0.35, "grapple_affinity": 0.30, } _STRATEGY_LOCK = asyncio.Lock() _STRATEGY_CACHE: dict = { "weights": dict(DEFAULT_WEIGHTS), "reasoning": "Initial balanced stance -- waiting for first Nemotron read.", "source": "default", "updated_at": 0.0, "last_sequence": "", } # ---------------------------------------------------------------------------- # Duel lifecycle state # ---------------------------------------------------------------------------- _duel_state: dict = { "active": False, # is a match in progress? "match_id": None, "started_at": 0.0, "last_activity": 0.0, # for the 5-min idle auto-stop "events": [], # the transcript / chat log "ended": False, } _summary_cache: dict = {} # match_id -> summary payload def get_model(): """Lazy-load Gemma 3 270M + the cyber-duel-tiny-users LoRA adapter. The real loader lives in gemma_npc.py; this thin wrapper is kept as the app-level entry point so the startup hook (`get_model()` in `_startup`) and the /health endpoint both work. """ from gemma_npc import get_model as _gemma_get return _gemma_get() app = FastAPI(title="Nemotron Duel") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # React static files if STATIC_DIR.exists(): _assets = STATIC_DIR / "assets" if _assets.exists(): app.mount("/assets", StaticFiles(directory=str(_assets)), name="assets") @app.on_event("startup") async def _startup(): """Kick off the background strategy refresher on the event loop.""" get_model() asyncio.create_task(_strategy_refresher_loop()) async def _strategy_refresher_loop(): """Refresh STRATEGY_CACHE from Modal every ~11s while a duel is active. Parks itself (sleeps longer) when no duel is active or after 5 minutes of inactivity, so we never wake a cold Modal container for nothing. """ while True: try: active = _duel_state["active"] and not _duel_state["ended"] idle_for = time.time() - _duel_state.get("last_activity", 0) if active and idle_for < DUEL_IDLE_TIMEOUT and MODEL_SERVER: await _refresh_strategy_from_modal() await asyncio.sleep(REFRESH_MIN + (uuid.uuid4().int % 1000) / 1000.0 * (REFRESH_MAX - REFRESH_MIN)) elif active and idle_for >= DUEL_IDLE_TIMEOUT: # 5 minutes of inactivity -> auto-stop the duel. _duel_state["active"] = False await asyncio.sleep(15.0) else: # No active duel: poll infrequently so /strategize still works # if someone calls it manually, but don't burn Modal credits. await asyncio.sleep(15.0) except asyncio.CancelledError: raise except Exception as e: # noqa: BLE001 - the loop must never die print(f"[strategy_refresher] error: {e!r}") await asyncio.sleep(20.0) async def _refresh_strategy_from_modal(): """One background fetch to Modal. Updates STRATEGY_CACHE on success.""" sequence = _duel_state.get("last_sequence", "") or _STRATEGY_CACHE.get("last_sequence", "") state = { "playerHp": _duel_state.get("player_hp", 100), "npcHp": _duel_state.get("npc_hp", 100), "playerStamina": _duel_state.get("player_stamina", 100), "npcStamina": _duel_state.get("npc_stamina", 100), "round": _duel_state.get("round", 1), "distance": _duel_state.get("distance", "mid"), } try: async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{MODEL_SERVER}/strategize", json={"sequence": sequence, "state": state}, ) data = resp.json() weights = data.get("weights", {}) or {} merged = dict(DEFAULT_WEIGHTS) for k in merged: if k in weights: try: merged[k] = float(weights[k]) except (TypeError, ValueError): pass async with _STRATEGY_LOCK: _STRATEGY_CACHE.update({ "weights": merged, "reasoning": data.get("reasoning", _STRATEGY_CACHE["reasoning"]), "source": "nemotron_modal", "updated_at": time.time(), "last_sequence": sequence, }) # Record the strategist's read into the event log. if _duel_state["active"] and not _duel_state["ended"]: _duel_state["events"].append({ "t": round(time.time() - _duel_state["started_at"], 2), "kind": "nemotron_strategy", "weights": merged, "reasoning": data.get("reasoning", ""), }) except Exception as e: # noqa: BLE001 - keep the cache, just log print(f"[strategy_refresher] Modal fetch failed (cache retained): {e!r}") async with _STRATEGY_LOCK: _STRATEGY_CACHE["source"] = "offline" async def _current_strategy() -> dict: async with _STRATEGY_LOCK: return { "weights": dict(_STRATEGY_CACHE["weights"]), "reasoning": _STRATEGY_CACHE["reasoning"], "source": _STRATEGY_CACHE["source"], "updated_at": _STRATEGY_CACHE["updated_at"], } @app.get("/") async def serve_index(): idx = STATIC_DIR / "index.html" if idx.exists(): return FileResponse(str(idx)) return JSONResponse({"error": "Frontend not built"}, status_code=500) @app.get("/favicon.svg") async def serve_favicon(): fav = STATIC_DIR / "favicon.svg" if fav.exists(): return FileResponse(str(fav)) return JSONResponse({"error": "Not found"}, status_code=404) @app.get("/icons.svg") async def serve_icons(): ico = STATIC_DIR / "icons.svg" if ico.exists(): return FileResponse(str(ico)) return JSONResponse({"error": "Not found"}, status_code=404) @app.get("/health") async def health(): """Health + readiness. `ready` is what the frontend probes to decide whether to call the backend or fall back to the client-side mock. """ from gemma_npc import _model as _gemma_loaded return { "status": "ok", "ready": _gemma_loaded is not None, "tiny_model_loaded": _gemma_loaded is not None, "model_server_configured": bool(MODEL_SERVER), "duel_active": _duel_state["active"], "moves": list(MOVES), } # --------------------------------------------------------------------------- # Real-time move selection -- Gemma 3 270M + LoRA (replaces TinyFighter MLP) # --------------------------------------------------------------------------- @app.post("/api/pick_move") async def pick_move(request: Request): """Real-time NPC move selection using Gemma 3 270M + LoRA. Reads STRATEGY_CACHE synchronously (whatever Nemotron last wrote). The model takes the player's recent move sequence and returns a counter-move plus its reasoning. Returns in ~100ms on CPU. """ try: body = await request.json() except Exception: body = {} state = body.get("state", body) # Use cached strategy unless the caller explicitly overrides it. strategy = body.get("strategy") if not strategy: strategy = (await _current_strategy())["weights"] # Build the move-sequence prompt the model was trained on. The state # payload carries lastPlayerMoves/lastMoves -- take the most recent ones. last_player = ( state.get("lastPlayerMoves") or state.get("last_player_moves") or state.get("lastMoves") or [] ) if isinstance(last_player, str): last_player = [m.strip() for m in last_player.split(",") if m.strip()] sequence = ",".join(last_player[-5:]) or "jab" move, reasoning, source = pick_counter_move(sequence) # Build a one-hot confidence shape that matches the previous API so the # frontend's confidence UI keeps working. one_hot = {m: 0.0 for m in MOVES} one_hot[move] = 1.0 if move in one_hot else 0.0 confidence = 1.0 if source == "gemma_lora" else 0.25 return { "move": move, "confidence": confidence, "top5": [{"move": move, "prob": confidence}], "all_probs": one_hot, "strategy_used": strategy, "strategy_source": (await _current_strategy())["source"], } # --------------------------------------------------------------------------- # Duel lifecycle # --------------------------------------------------------------------------- @app.post("/api/duel/start") async def duel_start(request: Request): """Player pressed DUEL. Open the event log and ensure the background refresher is awake. Nemotron begins reading the fight from here so that by the time the player reaches the action, the cache is warm. """ try: body = await request.json() except Exception: body = {} match_id = body.get("matchId") or uuid.uuid4().hex[:12] _duel_state.update({ "active": True, "ended": False, "match_id": match_id, "started_at": time.time(), "last_activity": time.time(), "events": [{ "t": 0.0, "kind": "duel_start", "playerCharacter": body.get("playerCharacter"), "npcCharacter": body.get("npcCharacter"), }], "player_hp": 100, "npc_hp": 100, "player_stamina": 100, "npc_stamina": 100, "round": 1, "distance": "mid", }) return {"matchId": match_id, "started": True, "modelServer": bool(MODEL_SERVER)} @app.post("/api/duel/event") async def duel_event(request: Request): """Append one exchange to the transcript. This is the "chat" between Nemotron (strategy) and Tiny (execution) that the post-match summary narrates. Also threads live HP/stamina/round into the refresher. """ if not _duel_state["active"] or _duel_state["ended"]: return JSONResponse({"ok": False, "reason": "no_active_duel"}, status_code=400) try: body = await request.json() except Exception: body = {} _duel_state["last_activity"] = time.time() # Keep the live snapshot fresh for the background refresher. for k in ("player_hp", "npc_hp", "player_stamina", "npc_stamina", "round", "distance"): if k in body: _duel_state[k] = body[k] seq = ",".join(body.get("lastMoves", []) or []) if seq: _duel_state["last_sequence"] = seq entry = { "t": round(time.time() - _duel_state["started_at"], 2), "kind": body.get("kind", "exchange"), "playerMove": body.get("playerMove"), "npcMove": body.get("npcMove"), "outcome": body.get("outcome"), # "hit" | "blocked" | "parried" | "whiff" "damage": body.get("damage"), "playerHp": body.get("player_hp", _duel_state.get("player_hp")), "npcHp": body.get("npc_hp", _duel_state.get("npc_hp")), "round": body.get("round", _duel_state.get("round")), } # Optional: the tiny model's pick at this moment, if the client sent it. if "tinyMove" in body: entry["tinyMove"] = body["tinyMove"] entry["tinyConfidence"] = body.get("tinyConfidence") _duel_state["events"].append(entry) return {"ok": True, "count": len(_duel_state["events"])} @app.post("/api/duel/end") async def duel_end(request: Request): """Freeze the transcript and return it for the summary screen.""" try: body = await request.json() except Exception: body = {} _duel_state["ended"] = True _duel_state["active"] = False winner = body.get("winner") _duel_state["events"].append({ "t": round(time.time() - _duel_state["started_at"], 2), "kind": "duel_end", "winner": winner, "playerScore": body.get("playerScore"), "npcScore": body.get("npcScore"), }) return { "matchId": _duel_state["match_id"], "events": _duel_state["events"], "winner": winner, } @app.get("/api/duel/summary") async def duel_summary(): """Ask Nemotron to narrate the whole match once (cached per match). Falls back to a locally-generated summary when MODEL_SERVER is unset or the call fails, so the UI always has something to show. """ match_id = _duel_state.get("match_id") if match_id and match_id in _summary_cache: return _summary_cache[match_id] events = list(_duel_state.get("events", [])) transcript = _format_transcript(events) if not MODEL_SERVER: payload = _local_summary(events, transcript) if match_id: _summary_cache[match_id] = payload return payload try: async with httpx.AsyncClient(timeout=60.0) as client: resp = await client.post( f"{MODEL_SERVER}/summarize", json={"transcript": transcript, "events": events}, ) data = resp.json() payload = { "summary": data.get("summary", _local_summary(events, transcript)["summary"]), "moments": data.get("moments", []), "transcript": transcript, "source": "nemotron_modal", } except Exception as e: # noqa: BLE001 local = _local_summary(events, transcript) payload = {**local, "source": "offline", "error": str(e)[:120]} if match_id: _summary_cache[match_id] = payload return payload def _format_transcript(events: list) -> str: """Render the event log as a compact text transcript for Nemotron.""" lines = [] for e in events: kind = e.get("kind", "?") if kind == "nemotron_strategy": w = e.get("weights", {}) lines.append( f"[{e.get('t',0):.1f}s] STRATEGY agg={w.get('aggression',0):.2f} " f"def={w.get('defense',0):.2f} kick={w.get('kick_affinity',0):.2f} " f"grapple={w.get('grapple_affinity',0):.2f} :: {e.get('reasoning','')[:80]}" ) elif kind == "exchange": lines.append( f"[{e.get('t',0):.1f}s] player={e.get('playerMove')} npc={e.get('npcMove')} " f"-> {e.get('outcome')} dmg={e.get('damage')} " f"(playerHp={e.get('playerHp')} npcHp={e.get('npcHp')})" + (f" tinyPicked={e.get('tinyMove')}" if e.get("tinyMove") else "") ) elif kind == "duel_end": lines.append(f"[{e.get('t',0):.1f}s] MATCH END winner={e.get('winner')} " f"score={e.get('playerScore')}-{e.get('npcScore')}") return "\n".join(lines) def _local_summary(events: list, transcript: str) -> dict: """Heuristic summary used when Nemotron is unavailable.""" exchanges = [e for e in events if e.get("kind") == "exchange"] strat_count = sum(1 for e in events if e.get("kind") == "nemotron_strategy") end = next((e for e in reversed(events) if e.get("kind") == "duel_end"), {}) winner = end.get("winner", "unknown") total_dmg = sum((e.get("damage") or 0) for e in exchanges) last_w = next((e.get("weights", {}) for e in reversed(events) if e.get("kind") == "nemotron_strategy"), {}) if last_w: stance = ("aggressive" if last_w.get("aggression", 0.5) > 0.6 else "defensive" if last_w.get("defense", 0.5) > 0.6 else "balanced") else: stance = "balanced (local fallback -- Nemotron not connected)" summary = ( f"The duel ran {len(exchanges)} exchanges across the match, dealing " f"~{total_dmg} total damage. Nemotron's strategist issued {strat_count} " f"reads and settled into a {stance} stance. " f"{'The player carried the duel.' if winner == 'player' else 'The NPC prevailed.' if winner == 'npc' else 'The match ended.'} " f"(Generated locally -- connect MODEL_SERVER for Nemotron's own narration.)" ) return { "summary": summary, "moments": [], "transcript": transcript, "source": "local_fallback", } # --------------------------------------------------------------------------- # Legacy /strategize -- kept for the Gradio panel and manual probing. # Returns the cache instantly if fresh, else does one synchronous fetch. # --------------------------------------------------------------------------- @app.post("/strategize") async def strategize(request: Request): try: body = await request.json() except Exception: body = {} sequence = body.get("sequence", "") state = body.get("state", {}) cached = await _current_strategy() fresh = cached["updated_at"] and (time.time() - cached["updated_at"] < REFRESH_MIN) if fresh and not sequence: return JSONResponse({ "weights": cached["weights"], "reasoning": cached["reasoning"], "source": cached["source"], }) if not MODEL_SERVER: return JSONResponse({ "weights": dict(DEFAULT_WEIGHTS), "reasoning": f"Local fallback mode -- adapting to: {sequence[:60]}", "source": "local_fallback", }) try: async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post( f"{MODEL_SERVER}/strategize", json={"sequence": sequence, "state": state}, ) data = resp.json() weights = data.get("weights", DEFAULT_WEIGHTS) merged = dict(DEFAULT_WEIGHTS) for k in merged: if k in weights: try: merged[k] = float(weights[k]) except (TypeError, ValueError): pass async with _STRATEGY_LOCK: _STRATEGY_CACHE.update({ "weights": merged, "reasoning": data.get("reasoning", cached["reasoning"]), "source": "nemotron_modal", "updated_at": time.time(), "last_sequence": sequence, }) return JSONResponse({ "weights": merged, "reasoning": data.get("reasoning", ""), "source": "nemotron_modal", }) except Exception as e: return JSONResponse({ "weights": dict(DEFAULT_WEIGHTS), "reasoning": f"Offline mode: {str(e)[:80]}", "source": "offline", }) # --------------------------------------------------------------------------- # Gradio interface (hackathon requirement) # --------------------------------------------------------------------------- def gradio_predict( player_hp, npc_hp, player_stamina, npc_stamina, distance, aggression, defense, parry_affinity, kick_affinity, grapple_affinity, round_num, last_npc_moves_str, last_player_moves_str, ): """Gradio demo: ask Gemma+LoRA for a counter-move to the last player moves.""" last_player = [m.strip() for m in last_player_moves_str.split(",") if m.strip()] sequence = ",".join(last_player[-5:]) or "jab" move, reasoning, source = pick_counter_move(sequence) return move, f"source: {source}\n{reasoning}" with gr.Blocks(title="Nemotron Duel -- Gemma NPC") as demo: gr.Markdown( "# Nemotron Duel -- Gemma 3 270M + LoRA NPC\n" "Real-time NPC counter-move generation.\n" "Full 3D game at root path. This Gradio panel demonstrates the model directly." ) with gr.Row(): with gr.Column(): player_hp = gr.Slider(0, 100, value=80, label="Player HP (informational)") npc_hp = gr.Slider(0, 100, value=50, label="NPC HP (informational)") player_stamina = gr.Slider(0, 100, value=60, label="Player Stamina (informational)") npc_stamina = gr.Slider(0, 100, value=40, label="NPC Stamina (informational)") distance = gr.Radio(["near", "mid", "far"], value="mid", label="Distance (informational)") round_num = gr.Slider(1, 10, value=3, step=1, label="Round (informational)") with gr.Column(): aggression = gr.Slider(0, 1, value=0.7, label="Aggression (Nemotron, informational)") defense = gr.Slider(0, 1, value=0.3, label="Defense (Nemotron, informational)") parry_affinity = gr.Slider(0, 1, value=0.4, label="Parry Affinity (informational)") kick_affinity = gr.Slider(0, 1, value=0.6, label="Kick Affinity (informational)") grapple_affinity = gr.Slider(0, 1, value=0.2, label="Grapple Affinity (informational)") last_npc_moves_str = gr.Textbox("jab, block, kick", label="Last NPC moves (ignored)") last_player_moves_str = gr.Textbox("jab, jab, jab", label="Last player moves") btn = gr.Button("Pick Counter Move", variant="primary") move_out = gr.Textbox(label="Selected Move") detail_out = gr.Textbox(label="Model output", lines=4) btn.click( gradio_predict, inputs=[player_hp, npc_hp, player_stamina, npc_stamina, distance, aggression, defense, parry_affinity, kick_affinity, grapple_affinity, round_num, last_npc_moves_str, last_player_moves_str], outputs=[move_out, detail_out], ) # Eagerly load the model at startup. get_model() app = gr.mount_gradio_app(app, demo, path="/gradio") if __name__ == "__main__": import uvicorn print(f"Starting Nemotron Duel Space on port {os.environ.get('PORT', 7860)}") print(f" NPC model: Gemma 3 270M + LoRA adapter at {ADAPTER_DIR}") print(f" Gemma server (Modal): {GEMMA_SERVER or 'NOT SET (local CPU fallback)'}") print(f" Model server (Nemotron Modal): {MODEL_SERVER or 'NOT SET (local fallback)'}") uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))