import os import time import uuid import json import html from pathlib import Path import gradio as gr try: import spaces # only available on Hugging Face Spaces runtime except ImportError: # Provide a dummy so @spaces.GPU(duration=...) is a no-op locally # while keeping the decorator statically visible in the source code. class _spaces: @staticmethod def GPU(duration=300): return lambda fn: fn spaces = _spaces from app.services.retrieval import load_games_dataset, normalize_game_record, retrieve_examples from app.services.generator import ( generate_game, generate_game_with_model, build_generation_prompt, unload_nemotron, ) from app.services.validator import validate_game, repair_game from app.services.schema_validator import create_minimal_game_template from app.services.tracing import log_event, log_generation_trace, load_events from app.services.journal import ( create_journal_entry, save_journal_entry, summarize_journal, load_journal_entries, detect_mood, assess_story_value, ) from app.services.scoring import compute_scores, HINT_PENALTY from app.services.story import build_story_packet, generate_story from app.services.image_gen import generate_poster_sync # ── Voice-journal language options ───────────────────────────────────────── # Shown on the Create Lobby screen as full names; mapped to the ISO codes the # ASR service expects before being routed into the play-screen audio handlers. LANGUAGE_CHOICES = [ "English", "French", "Spanish", "German", "Italian", "Portuguese", "Dutch", "Japanese", "Chinese (Mandarin)", "Arabic", "Hindi", "Korean", "Russian", "Turkish", "Polish", ] LANGUAGE_TO_ISO = { "English": "en", "French": "fr", "Spanish": "es", "German": "de", "Italian": "it", "Portuguese": "pt", "Dutch": "nl", "Japanese": "ja", "Chinese (Mandarin)": "zh", "Arabic": "ar", "Hindi": "hi", "Korean": "ko", "Russian": "ru", "Turkish": "tr", "Polish": "pl", } # ── Load datasets once on startup ────────────────────────────────────────── BASE_DIR = Path(__file__).resolve().parent DATASET_PATHS = { "old": BASE_DIR / "app/data/games_dataset.json", "scavenger_hunt": BASE_DIR / "app/data/scavenger_hunt/dataset.json", "hide_and_seek": BASE_DIR / "app/data/hide_and_seek/dataset.json", "tag": BASE_DIR / "app/data/tag/dataset.json", } DATA_RECORDS = [] for src, path in DATASET_PATHS.items(): try: raw = load_games_dataset(str(path)) normalized = [normalize_game_record(r) for r in raw] DATA_RECORDS.extend(normalized) print(f"[OK] Loaded {len(normalized)} records from {src} dataset ({path.name})") except FileNotFoundError: print(f"[!] Dataset not found at {path}, skipping {src}") except Exception as e: print(f"[!] Error loading {src} dataset from {path}: {e}") print(f"[OK] Total: {len(DATA_RECORDS)} game records loaded for retrieval") # ── Shared multiplayer state lives in app.services.rooms ─────────────────── # SESSION_STORE / ADVENTURE_CODES are owned by the rooms module so the lobby, # task wall and wait-lobby all read/write the same in-process truth. from app.services import rooms from app.services.rooms import SESSION_STORE, ADVENTURE_CODES, save_state, load_state # ── HF Spaces Zero GPU generation ────────────────────────────────────────── # This function is statically decorated so HF Spaces detects the GPU # requirement at startup. The 2.84 GB GGUF model is lazily downloaded # on first call inside the GPU context. @spaces.GPU(duration=300) def _generate_with_gpu(config: dict, retrieved: list[dict]): """Generate game using GPU (Nemotron 3 Nano 4B via llama.cpp). Runs inside @spaces.GPU so CUDA is available. The model download and llama.cpp initialisation happen lazily on first call. """ import json prompt = build_generation_prompt(config, retrieved) json_str = generate_game_with_model(prompt, model_name="nemotron") if json_str: try: game = json.loads(json_str) if all(field in game for field in ["game_id", "title", "setup", "tasks", "safety"]): return game except json.JSONDecodeError: pass return None def _generate_game(config: dict, retrieved: list[dict]): """Try GPU generation; fall back to CPU/mock if unavailable.""" if spaces is not None: game = _generate_with_gpu(config, retrieved) if game is not None: return game print("[app] GPU generation returned None, falling back to CPU/mock") return generate_game(config, retrieved) # ── Pipeline entry point ────────────────────────────────────────────────────── def run_pipeline( game_type: str, city: str, area: str, location_type: str, duration_minutes: int, num_players: int, difficulty: str, age_group: str, energy_level: str, ): """Run the full AI generation pipeline end-to-end.""" session_id = str(uuid.uuid4()) config = { "game_type": game_type, "city": city or "Paris", "area": area or "Downtown", "location_type": location_type, "duration_minutes": int(duration_minutes), "num_players": int(num_players), "difficulty": difficulty, "age_group": age_group, "energy_level": energy_level, "photo_enabled": True, # New fields for enhanced retrieval scoring "landscape_tags": [], "theme": "", "mobility": "standard", "allow_transport": False, } state = {} # 1 ── Retrieval state["num_retrieved"] = 0 if DATA_RECORDS: retrieved = retrieve_examples(config, DATA_RECORDS, k=3) state["num_retrieved"] = len(retrieved) state["retrieved_ids"] = [r["id"] for r in retrieved] else: retrieved = [] # 2 ── Generation (GPU if available, CPU/mock fallback otherwise) game = _generate_game(config, retrieved) state["game_id"] = game["game_id"] state["game_title"] = game["title"] # Free GPU memory — Nemotron is no longer needed after generation. # This makes room for FLUX poster / Cohere ASR later. unload_nemotron() # 3 ── Validation is_valid, failures = validate_game(game, config) state["validation_passed"] = is_valid state["validation_failures"] = failures # 4 ── Repair (if needed) repaired = None if not is_valid: repaired = repair_game(game, failures, config) state["repair_applied"] = True is_valid2, failures2 = validate_game(repaired, config) state["repair_valid"] = is_valid2 state["remaining_failures"] = failures2 else: state["repair_applied"] = False final_game = repaired if repaired is not None else game # Stamp the selected game type onto the game so it's recorded explicitly # (the model output / schema don't carry it) and can't drift from config. final_game["game_type"] = config.get("game_type", "scavenger_hunt") # 5 ── Log generation trace log_generation_trace( session_id=session_id, config=config, retrieved_examples=retrieved, game=final_game, validation_passed=is_valid or (repaired is not None and state.get("repair_valid", False)), validation_failures=failures, repaired_game=repaired, ) # 6 ── Reveal all tasks as events for task in final_game.get("tasks", []): log_event(session_id, "task_revealed", { "task_id": task["task_id"], "title": task["title"], "points": task["points"], }) # 7 ── Store session SESSION_STORE[session_id] = { "config": config, "game": final_game, "events": [], "journals": [], } # 8 ── Build summary text summary = build_summary(final_game, state, session_id) return summary, session_id def generate_for_session(session_id: str) -> dict: """Generate (and validate/repair) a game for an *existing* lobby session. Used when the host presses Start: the room/players already exist, so we only run retrieval → generation → validation → repair, store the game on the session, and reveal the tasks. Returns the final game dict. Reuses the same pipeline pieces as :func:`run_pipeline`. """ session = SESSION_STORE.get(session_id) if not session: raise ValueError("Unknown session") # Idempotent: once a quest exists for this session, never re-roll it. A # second call (double-clicked Start, a poll race, a reload mid-flow) must # return the SAME tasks — players completing a task should never see a # different set of tasks appear under them. if session.get("game"): return session["game"] config = session["config"] retrieved = retrieve_examples(config, DATA_RECORDS, k=3) if DATA_RECORDS else [] game = _generate_game(config, retrieved) unload_nemotron() # free GPU for ASR / MiniCPM / FLUX is_valid, failures = validate_game(game, config) repaired = repair_game(game, failures, config) if not is_valid else None final_game = repaired if repaired is not None else game # Record the selected game type on the game object (authoritative = config). final_game["game_type"] = config.get("game_type", "scavenger_hunt") log_generation_trace( session_id=session_id, config=config, retrieved_examples=retrieved, game=final_game, validation_passed=is_valid or (repaired is not None), validation_failures=failures, repaired_game=repaired, ) for task in final_game.get("tasks", []): log_event(session_id, "task_revealed", { "task_id": task["task_id"], "title": task["title"], "points": task["points"], }) session["game"] = final_game # Persist immediately so a process restart (e.g. dev hot-reload) between # generation and begin_play() can't lose the quest and trigger a re-roll. save_state() return final_game # ── Phase 3: Gameplay helpers ───────────────────────────────────────────────── def complete_task(session_id: str, task_id: str, team_id: str = "team-a"): """Log a task completion event and return updated scoreboard.""" if session_id not in SESSION_STORE: return "⚠ Unknown session" ev = log_event(session_id, "task_completed", { "task_id": task_id, "summary": f"Team {team_id} completed {task_id}", }, team_id=team_id) SESSION_STORE[session_id]["events"].append(ev) return f"✅ Task {task_id} completed!" def skip_task(session_id: str, task_id: str, team_id: str = "team-a"): """Log a task skip event.""" if session_id not in SESSION_STORE: return "⚠ Unknown session" ev = log_event(session_id, "task_skipped", { "task_id": task_id, "summary": f"Team {team_id} skipped {task_id}", }, team_id=team_id) SESSION_STORE[session_id]["events"].append(ev) return f"⏭️ Task {task_id} skipped." def use_hint(session_id: str, task_id: str, team_id: str = "team-a", question: str = "", answer: str = ""): """Log a hint usage event and return the team's new hint count for this task. The question/answer are stored on the event so the detail view can show a hint history. Scoring (``compute_scores``) deducts ``HINT_PENALTY`` per ``hint_used`` event, so no separate score mutation is needed here.""" if session_id not in SESSION_STORE: return 0 ev = log_event(session_id, "hint_used", { "task_id": task_id, "question": (question or "").strip(), "answer": (answer or "").strip(), "summary": f"Team {team_id} used a hint for {task_id}", }, team_id=team_id) SESSION_STORE[session_id]["events"].append(ev) return rooms.hints_used_count(session_id, team_id, task_id) @spaces.GPU(duration=120) def record_journal( session_id: str, transcript: str = "", task_id: str = "", location_note: str = "", team_id: str = "team-a", audio_path: str = "", language: str = "en", ): """Record a journal entry, summarize it, and return the result. Decorated with ``@spaces.GPU`` so the Cohere ASR model can run on the GPU. On HF ZeroGPU, ``torch.cuda.is_available()`` is ``False`` outside a ``@spaces.GPU`` function, so without this the ASR model would load on CPU and the voice-journal button would appear to hang. Two input paths are supported: * **Voice path** — pass ``audio_path`` (e.g. a path returned by ``gr.Audio(type="filepath")``). The audio is transcribed with the Cohere ASR service (``CohereLabs/cohere-transcribe-03-2026``) and the transcript is stored with ASR metadata. * **Typed path** — pass ``transcript`` directly. The user can also edit an ASR transcript in the UI before submitting; if both ``audio_path`` and ``transcript`` are present, the typed transcript wins and is treated as a manual correction (``transcript_source == "hybrid"``). """ if session_id not in SESSION_STORE: return "[!] Unknown session" asr_metadata: dict | None = None audio_ref: str | None = None transcript_source = "typed" # ── 1. Voice path — transcribe audio if provided ─────────────────── if audio_path: audio_ref = audio_path try: from app.services.asr import transcribe as _asr_transcribe asr_result = _asr_transcribe(audio_path, language=language) asr_metadata = { "model": asr_result.get("model"), "language": asr_result.get("language"), "status": asr_result.get("status"), "error": asr_result.get("error"), } asr_text = (asr_result.get("transcript") or "").strip() if asr_text and not transcript.strip(): transcript = asr_text transcript_source = "asr" elif asr_text and transcript.strip() and asr_text != transcript.strip(): transcript_source = "hybrid" except Exception as exc: print(f"[app] ASR transcription failed: {type(exc).__name__}: {exc}") asr_metadata = { "model": None, "language": language, "status": "error", "error": f"{type(exc).__name__}: {exc}", } if not transcript or not transcript.strip(): # Surface the actual error instead of a generic message hint = "" if asr_metadata: err = asr_metadata.get("error") or "" status = asr_metadata.get("status") or "" if status == "error" and err: hint = f"\n\n**ASR error:** {err}" elif status == "skipped": hint = ( "\n\n**ASR was skipped** — set `CITYQUEST_SKIP_MODEL=0` " "and ensure `HF_TOKEN` is configured." ) return ( "⚠️ No transcript available — record audio or type a note first." + hint ) # ── 2. Build journal entry ──────────────────────────────────────── entry = create_journal_entry( transcript=transcript, session_id=session_id, team_id=team_id, task_id=task_id or None, location_note=location_note, audio_ref=audio_ref, asr_metadata=asr_metadata, transcript_source=transcript_source, ) # Summarize summary = summarize_journal(transcript, task_id=task_id or None, location_note=location_note) entry["moment_summary"] = summary["moment_summary"] entry["tags"] = summary["tags"] entry["story_value"] = summary["story_value"] # Persist save_journal_entry(entry) # Log event ev = log_event(session_id, "journal_recorded", { "journal_id": entry["journal_id"], "mood": entry["mood"], "story_value": summary["story_value"], "summary": summary["moment_summary"], "transcript_source": transcript_source, "asr_status": asr_metadata.get("status") if asr_metadata else None, "asr_model": asr_metadata.get("model") if asr_metadata else None, }, team_id=team_id) SESSION_STORE[session_id]["events"].append(ev) SESSION_STORE[session_id]["journals"].append(entry) # Build display text source_label = { "asr": "🎙️ (transcribed)", "hybrid": "🎙️✏️ (transcribed, edited)", "typed": "⌨️ (typed)", }.get(transcript_source, transcript_source) asr_line = "" if asr_metadata and asr_metadata.get("status") != "ok": asr_line = f"- ASR status: **{asr_metadata.get('status')}** — {asr_metadata.get('error') or ''}\n" display = ( f"🎙️ **Journal recorded!** {source_label}\n" f"- Mood: *{entry['mood']}*\n" f"- Story value: **{summary['story_value']}**\n" f"- Tags: {', '.join(summary['tags'])}\n" f"{asr_line}" f"- Summary: {summary['moment_summary']}" ) return display def upload_photo( session_id: str, photo_file, caption: str = "", task_id: str = "", team_id: str = "team-a", ): """Log a photo upload event and return a confirmation.""" if session_id not in SESSION_STORE: return "[!] Unknown session" photo_name = "" if photo_file is not None: # Gradio 4+ returns a filepath string or a PIL image if isinstance(photo_file, str): photo_name = photo_file.split("/")[-1].split("\\")[-1] else: photo_name = getattr(photo_file, "name", "photo") photo_id = f"photo-{uuid.uuid4().hex[:8]}" payload = { "photo_id": photo_id, "photo_name": photo_name, "caption": caption, "summary": f"Team {team_id} uploaded photo for {task_id or 'general'}", } if task_id: payload["task_id"] = task_id ev = log_event(session_id, "photo_uploaded", payload, team_id=team_id) SESSION_STORE[session_id]["events"].append(ev) # Track photo in session store for recap if "photos" not in SESSION_STORE[session_id]: SESSION_STORE[session_id]["photos"] = [] SESSION_STORE[session_id]["photos"].append({ "photo_id": photo_id, "photo_name": photo_name, "photo_path": photo_file if isinstance(photo_file, str) else "", "caption": caption, "task_id": task_id, "team_id": team_id, }) # Build gallery list photos = SESSION_STORE[session_id]["photos"] gallery_lines = [f"📸 **{p['photo_id']}** — {p['caption'] or '(no caption)'} [{p['task_id'] or 'general'}]" for p in photos] display = ( f"📸 **Photo uploaded!**\n" f"- ID: `{photo_id}`\n" f"- Caption: {caption or '(none)'}\n" f"- Related task: {task_id or 'general'}\n\n" f"**All photos ({len(photos)}):**\n" + "\n".join(gallery_lines) ) return display def end_game(session_id: str, team_id: str = "team-a"): """End the game, compute scores, and return a scoreboard.""" if session_id not in SESSION_STORE: return "[!] Unknown session" session = SESSION_STORE[session_id] game = session["game"] events = load_events(session_id=session_id) ev = log_event(session_id, "game_finished", { "summary": f"Game finished — session {session_id}", }, team_id=team_id) events.append(ev) scores = compute_scores(events, game) session["scores"] = scores lines = ["# Final Scoreboard\n"] for ts in scores.get("team_scores", []): marker = " [WINNER]" if ts["team_id"] == scores.get("winner") else "" lines.append(f"### Team: {ts['team_id']}{marker}") lines.append(f"- **Total points:** {ts['points']}") lines.append(f"- Tasks completed: {ts['completed_tasks']}/{ts['total_tasks']}") lines.append(f"- Hints used: {ts['hints_used']}") if ts.get("bonuses"): lines.append(f"- Bonuses: {', '.join(ts['bonuses'])}") lines.append("") lines.append("**Breakdown:**") for b in ts.get("scoring_breakdown", []): lines.append(f" * {b}") lines.append("") if scores.get("winner"): lines.append(f"**Winner: {scores['winner']}**") return "\n".join(lines) def generate_recap(session_id: str): """Generate the final story recap from all collected session data.""" if session_id not in SESSION_STORE: return "[!] Unknown session", {} session = SESSION_STORE[session_id] game = session["game"] events = load_events(session_id=session_id) journals = load_journal_entries(session_id=session_id) scores = session.get("scores", compute_scores(events, game)) photos = session.get("photos", []) # Build story packet packet = build_story_packet( game=game, events=events, scores=scores, journal_entries=journals, photo_captions=photos, ) # Generate story result = generate_story(packet, session_id=session_id) # Format for display (poster goes in dedicated section below) lines = [ "# Episode Recap\n", result["short_recap"], "", result["long_summary"], "", ] # NOTE: poster_prompt is internal — do NOT render to user. # NOTE: do not duplicate Final Standings here; left column shows it. # TODO: result["long_summary"] may still contain a "Final standings" # sub-section produced upstream in generate_story(); if so, strip it # in the story generator module rather than here. return "\n".join(lines), result @spaces.GPU(duration=300) def generate_poster_from_session(session_id: str): """Generate a poster image from an existing session's recap data.""" if session_id not in SESSION_STORE: return None, "[!] Unknown session - generate a recap first" session = SESSION_STORE[session_id] # Check if we have a recap result or can build one if "poster_url" in session: return session["poster_url"], "Poster ready from previous generation" # Build story packet from stored data game = session.get("game") if not game: return None, "[!] No game data in session" events = load_events(session_id=session_id) journals = load_journal_entries(session_id=session_id) scores = session.get("scores", compute_scores(events, game)) photos = session.get("photos", []) packet = build_story_packet( game=game, events=events, scores=scores, journal_entries=journals, photo_captions=photos, ) result = generate_story(packet, session_id=session_id) poster_url, status = generate_poster_sync( session_id, result["poster_prompt"], photo_paths=[p.get("photo_path", "") for p in photos if p.get("photo_path")], ) session["poster_url"] = poster_url return poster_url, status # ── Build summary text ──────────────────────────────────────────────────────── def build_summary(game: dict, state: dict, session_id: str = "") -> str: """Format a human-readable game summary for the UI.""" lines = [] lines.append(f"# 🎮 {game.get('title', 'Untitled')}") lines.append("") if session_id: lines.append(f"> Session `{session_id[:12]}…` — paste this in the **Play** tab to log progress.") lines.append("") lines.append("## 📋 Setup") setup = game.get("setup", {}) lines.append(f"- **Location:** {setup.get('city', '?')} — {setup.get('area', '?')}") lines.append(f"- **Meeting point:** {setup.get('meeting_point', '?')}") lines.append(f"- **Duration:** {setup.get('duration_minutes', '?')} min") lines.append(f"- **Players:** {setup.get('num_players', '?')}") lines.append("") lines.append("## 📜 Rules") for i, rule in enumerate(game.get("rules", []), 1): lines.append(f" {i}. {rule}") lines.append("") lines.append("## 🎯 Tasks") for t in game.get("tasks", []): time_str = f"{t.get('time_limit_minutes', '∞')} min" if t.get("time_limit_minutes") else "No time limit" lines.append(f" **{t.get('task_id', '?')}:** {t.get('title', '?')}") lines.append(f" - *{t.get('description', '')[:80]}*") lines.append(f" - 🏆 {t.get('points', 0)} pts | ⏱ {time_str} | 📸 {t.get('proof_type', '?')}") lines.append(f" - 💡 {t.get('hint', '')[:70]}") lines.append(f" - 🛡 {t.get('safety_note', '')[:70]}") lines.append("") lines.append("## 💡 Global Hints") for h in game.get("global_hints", []): lines.append(f" • {h}") lines.append("") lines.append("## 🔒 Safety") safety = game.get("safety", {}) lines.append(f"- **Zone:** {safety.get('allowed_zone', '?')}") lines.append(f"- **Supervision required:** {'Yes' if safety.get('adult_supervision') else 'No'}") lines.append(f"- **Forbidden:** {', '.join(safety.get('forbidden_behaviors', []))}") lines.append("") lines.append("## 📊 Scoring") for s in game.get("score_rules", []): lines.append(f" • {s}") lines.append(f"- **Tie-breaker:** {game.get('tie_breaker', '?')}") lines.append("") lines.append("## 📖 Story Seed") seed = game.get("story_seed", {}) lines.append(f"- **Tone:** {seed.get('tone', '?')}") lines.append(f"- **Motifs:** {', '.join(seed.get('motifs', []))}") lines.append(f"- **Recap style:** {seed.get('recap_style', '?')}") lines.append("") # Pipeline trace lines.append("---") lines.append("### 🔍 Pipeline trace") lines.append(f"- Retrieval: {state.get('num_retrieved', 0)} examples found") if state.get("retrieved_ids"): lines.append(f"- Retrieved IDs: {', '.join(state['retrieved_ids'])}") lines.append(f"- Game ID: {state.get('game_id', '?')}") if state.get("validation_passed"): lines.append(f"- ✅ Validation passed") else: lines.append(f"- ❌ Validation failed ({len(state.get('validation_failures', []))} issues)") if state.get("repair_applied"): lines.append(f"- 🔧 Repair applied → {'✅ Passed' if state.get('repair_valid') else '❌ Still has issues'}") for f in state.get("validation_failures", [])[:5]: lines.append(f" - {f}") leftover = state.get("remaining_failures", []) if leftover: for f in leftover[:5]: lines.append(f" - ⚠ {f}") return "\n".join(lines) # ══════════════════════════════════════════════════════════════════════════ # LIQUID-FLOW MULTIPLAYER UI # ══════════════════════════════════════════════════════════════════════════ from app.ui.theme import CSS, JS_INIT, gradio_theme rooms.load_state() # restore live rooms after a restart ORDER = ["home", "create", "join", "lobby", "play", "wait", "win"] def vis(target: str): """Visibility updates for the 7 top-level screen groups.""" return tuple(gr.update(visible=(name == target)) for name in ORDER) def _mmss(secs) -> str: m, s = divmod(max(0, int(secs)), 60) return f"{m:02d}:{s:02d}" # ── HTML fragment renderers (polled) ────────────────────────────────────── def code_html(session_id: str) -> str: s = SESSION_STORE.get(session_id) or {} code = s.get("room_code", "——————") return ( "
" "
Share this code with your friends to join
" f"
{code}
" ) def copy_btn_html(session_id: str) -> str: """A clipboard button beside the room code. Kept in its own (non-polled) HTML component so the 'Copied!' state isn't reset by the 1.5s lobby poll. Pure inline JS — Gradio can't reach the clipboard from Python.""" s = SESSION_STORE.get(session_id) or {} code = s.get("room_code", "") js = ( "var b=this;navigator.clipboard.writeText('" + code + "').then(function(){" "b.classList.add('copied');var l=b.querySelector('.cq-copy-label');" "if(l)l.textContent='✓ Copied!';" "setTimeout(function(){b.classList.remove('copied');" "if(l)l.textContent='📋 Copy';},2000);});" ) return ( "
" f"
" ) def _teams_ready(session_id: str): """(teams_with_at_least_one_player, total_teams) for the lobby.""" s = SESSION_STORE.get(session_id) or {} teams = s.get("teams", []) have = {p["team_id"] for p in s.get("players", [])} ready = sum(1 for t in teams if t["id"] in have) return ready, len(teams) def roster_html(session_id: str, me: str = "") -> str: s = SESSION_STORE.get(session_id) or {} host = s.get("host_name", "") ready, total = _teams_ready(session_id) pct = int(round(100 * ready / total)) if total else 0 parts = ["
"] parts.append( "
" f"
{ready} / {total} teams ready
" "
" f"
" ) for t in s.get("teams", []): members = [p for p in s.get("players", []) if p["team_id"] == t["id"]] if members: chips = "".join( f"" f"{p['name']}{' ⭐' if p['name'] == host else ''}" for p in members ) else: chips = "waiting…" parts.append( "
" f"
" f"{t['name']} · {t['role']}
" f"
{chips}
" ) parts.append("
") return "".join(parts) def timer_html(session_id: str, team_id: str) -> str: ph = rooms.team_phase(session_id, team_id) meta = rooms.team_meta(session_id, team_id) if ph["phase"] == "waiting": label, val, sub = "Head start in progress — you launch in", _mmss(ph["seconds_to_start"]), "Sit tight, your quest is about to open" elif ph["phase"] == "overtime": label, val, sub = "Time's up — wrap up and finish", "00:00", "" else: label, val, sub = "Time remaining", _mmss(ph["seconds_remaining"]), "" return ( "
" f"
" f"{meta['name']} · {meta['role']}
" f"
{label}
" f"
{val}
" f"
{sub}
" ) def status_strip_html(session_id: str, my_team: str = "") -> str: s = SESSION_STORE.get(session_id) or {} pills = [] for t in s.get("teams", []): tid = t["id"] if rooms.team_finished(session_id, tid): cls, txt = "done", f"{t['name']} · finished 🏁" elif rooms.is_unlocked(session_id, tid): cls, txt = "live", f"{t['name']} · exploring" else: ph = rooms.team_phase(session_id, tid) cls, txt = "wait", f"{t['name']} · starts in {_mmss(ph['seconds_to_start'])}" me = " (you)" if tid == my_team else "" pills.append(f"{txt}{me}") return "
" + "".join(pills) + "
" def scoreboard_md(session_id: str) -> str: s = SESSION_STORE.get(session_id) or {} game = s.get("game") or {} events = load_events(session_id=session_id) scores = compute_scores(events, game) s["scores"] = scores winner = scores.get("winner") ts_sorted = sorted(scores.get("team_scores", []), key=lambda x: x.get("points", 0), reverse=True) medals = ["🥇", "🥈", "🥉"] lines = ["# 🏆 Final Standings", ""] for i, ts in enumerate(ts_sorted): meta = rooms.team_meta(session_id, ts["team_id"]) medal = medals[i] if i < 3 else "•" crown = " — Champion" if ts["team_id"] == winner else "" lines.append(f"### {medal} {meta['name']}{crown}") lines.append( f"- **{ts['points']} points** · {ts['completed_tasks']}/{ts['total_tasks']} tasks · " f"{ts['hints_used']} hints used" ) lines.append("") return "\n".join(lines) def _task_id_list(session_id: str) -> list[str]: s = SESSION_STORE.get(session_id) or {} return [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]] def _task_by_id(session_id: str, task_id: str) -> dict: s = SESSION_STORE.get(session_id) or {} return next((x for x in (s.get("game") or {}).get("tasks", []) if x["task_id"] == task_id), {}) def task_detail_md(session_id: str, team_id: str, task_id: str) -> str: t = _task_by_id(session_id, task_id) tl = t.get("time_limit_minutes") tls = f"{tl} min" if tl else "no time limit" return ( f"## {t.get('title', 'Task')}\n\n" f"**🏆 {t.get('points', 0)} pts · ⏱ {tls} · 📸 {t.get('proof_type', 'photo')}**\n\n" f"{t.get('description', '')}" ) def task_hint_html(session_id: str, team_id: str, task_id: str) -> str: """Hint rendered with its own class so it reads as 'discovery' (amber), visually distinct from a safety warning.""" hint = (_task_by_id(session_id, task_id).get("hint") or "").strip() if not hint: return "" return f"
💡 {html.escape(hint)}
" def task_safety_html(session_id: str, team_id: str, task_id: str) -> str: """Safety flag rendered with a red 'warning' treatment, distinct from a hint.""" note = (_task_by_id(session_id, task_id).get("safety_note") or "Stay aware of your surroundings.").strip() return f"
🛡 {html.escape(note)}
" def task_gallery(session_id: str, team_id: str, task_id: str): return [ (p["photo_path"], p.get("caption") or "") for p in rooms.team_photos(session_id, team_id, task_id) if p.get("photo_path") ] def proof_md(session_id: str, team_id: str, task_id: str) -> str: n_photos, n_journals = rooms.proof_count(session_id, team_id, task_id) if n_photos + n_journals > 0: return f"✅ Proof captured — **{n_photos}** photo(s), **{n_journals}** journal(s). You can complete this task." return "🔒 Add at least **one photo** or **one voice journal** to unlock **Mark Completed**." def journal_log_md(session_id: str, team_id: str, task_id: str) -> str: """List the voice/typed journals already saved for this task so players can keep adding more recordings.""" entries = rooms.team_journals(session_id, team_id, task_id) if not entries: return "" lines = [f"**🎙️ Journals for this task ({len(entries)})** — record more anytime:"] for i, e in enumerate(entries, 1): txt = (e.get("transcript") or "").strip().replace("\n", " ") if len(txt) > 90: txt = txt[:90] + "…" lines.append(f"{i}. {txt or '*(no text)*'}") return "\n".join(lines) def _hint_points_update(session_id: str, team_id: str, task_id: str): """Live 'available points' for this task: base points minus the per-hint penalty for every hint this team has revealed, clamped at 0. Turns orange-red once any hint has been spent.""" base = _task_by_id(session_id, task_id).get("points", 0) used = rooms.hints_used_count(session_id, team_id, task_id) avail = max(0, base - used * HINT_PENALTY) cls = ["points-display"] + (["reduced"] if used > 0 else []) return gr.update(value=f"⭐ {avail} pts available (−{HINT_PENALTY} pts per hint)", elem_classes=cls) def _hint_btn_state(remaining: int): """Hint button label/interactivity from the number of hints still allowed.""" if remaining <= 0: return gr.update(value="No hints left", interactive=False, elem_classes=["btn-hint", "spent"]) return gr.update(value=f"Get a hint −{HINT_PENALTY} pts ({remaining} left)", interactive=True, elem_classes=["btn-hint"]) def _hint_btn_update(session_id: str, team_id: str, task_id: str): used = rooms.hints_used_count(session_id, team_id, task_id) return _hint_btn_state(max(0, rooms.HINT_CAP - used)) def hint_log_md(session_id: str, team_id: str, task_id: str) -> str: """History of hints already revealed for this task so players don't have to re-buy a hint they've seen — shows the guide's answer for each.""" hints = rooms.team_hints(session_id, team_id, task_id) if not hints: return "" total = len(hints) * HINT_PENALTY lines = [f"**Hints used ({len(hints)} of {rooms.HINT_CAP}) — −{total} pts total**"] for i, p in enumerate(hints, 1): ans = (p.get("answer") or "").strip().replace("\n", " ") if len(ans) > 140: ans = ans[:140] + "…" lines.append(f"💡 Hint {i}: {ans or '*(no answer captured)*'}") return "\n\n".join(lines) def _detail_tab_updates(session_id: str, team_id: str, active_tid: str): """Updates for the 12 horizontal task-tab buttons in the detail view: each shows ✅/○ + name + points, the completed ones flagged 'done', and the one you're viewing flagged 'active'.""" s = SESSION_STORE.get(session_id) or {} tasks = (s.get("game") or {}).get("tasks", [])[:12] wall = rooms.task_wall_state(session_id, team_id) ups = [] for i in range(12): if i < len(tasks): t = tasks[i] tid = t["task_id"] done = wall.get(tid, False) name = t.get("title", "Task") if len(name) > 22: name = name[:21] + "…" cls = ["task-tab"] if done: cls.append("task-tab-done") if tid == active_tid: cls.append("task-tab-active") ups.append(gr.update( value=f"{'✅' if done else '○'} {name} · {t.get('points', 0)}pts", visible=True, elem_classes=cls, )) else: ups.append(gr.update(visible=False)) return ups def _task_position(session_id: str, active_tid: str) -> str: s = SESSION_STORE.get(session_id) or {} ids = [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]] if not ids: return "" idx = ids.index(active_tid) if active_tid in ids else 0 return f"Task {idx + 1} of {len(ids)}" def _complete_btn_update(can: bool): """Locked (grey, disabled) vs unlocked (green, active) Mark Completed button.""" if can: return gr.update(interactive=True, value="✅ Mark completed", elem_classes=["mark-complete-btn", "complete-btn", "unlocked"]) return gr.update(interactive=False, value="🔒 Mark completed", elem_classes=["mark-complete-btn", "complete-btn", "locked"]) def _completion_instruction_update(can: bool): """The elevated unlock instruction shown just above Mark Completed.""" if can: return gr.update(value="✅ Proof added — you can mark this complete!", elem_classes=["completion-instruction", "unlocked"]) return gr.update(value="🔒 Add a photo or voice note to unlock completion", elem_classes=["completion-instruction", "locked"]) # ── Polling dispatchers (attached to the shared gr.Timer) ───────────────── # Adaptive polling: stay snappy (FAST) while anything is changing, ease to # SLOW after a stretch of identical ticks. The countdown timer on the play # screen changes every second, so play naturally stays FAST; lobby/wait back # off when idle. Capped at SLOW so screen transitions still arrive within ~3s. POLL_FAST = 1.5 POLL_SLOW = 3.0 POLL_IDLE_LIMIT = 4 # identical ticks before easing off def poll_tick(session_id, screen, team_id, player_name, is_host, poll_state): nav = [gr.update()] * 7 new_screen = screen lobby_code = lobby_roster = play_timer = play_status = wait_status = win_score = gr.update() lobby_start = lobby_start_anyway = lobby_hint = gr.update() wait_line = wait_results_btn = gr.update() if session_id: status = rooms.session_status(session_id) # Canonical team id — never "" — so same-team members converge on one id. team_id = rooms.resolve_team_id(session_id, player_name) or team_id if screen == "lobby": lobby_code = code_html(session_id) lobby_roster = roster_html(session_id, player_name) # Start gate (Model B): the primary Start enables only when the whole # party is here (full AND min_ok). The host-only "Start anyway" # override covers the short-handed-but-legal case (min_ok, not full). pcs = rooms.player_count_status(session_id) full, min_ok = pcs["full"], pcs["min_ok"] lobby_start = gr.update(interactive=(full and min_ok)) if is_host: lobby_start_anyway = gr.update( visible=(not full), interactive=(min_ok and not full), value=f"▶ Start anyway ({pcs['joined']}/{pcs['target']} here)") lobby_hint = lobby_hint_text(session_id, is_host) if status == "playing": new_screen = "play" nav = list(vis("play")) play_timer = timer_html(session_id, team_id) play_status = status_strip_html(session_id, team_id) elif screen == "play": play_timer = timer_html(session_id, team_id) play_status = status_strip_html(session_id, team_id) elif screen == "wait": # The wait screen NEVER navigates on its own — for any player, under # any condition. Each tick only UPDATES: the team strip, the # "who's still playing" line, and the results button's enabled state. # The wait->win transition happens ONLY when a player clicks the # "See final results" button (see do_see_results). wait_status = status_strip_html(session_id, team_id) wait_line = wait_playing_md(session_id) pcs = rooms.player_count_status(session_id) can_results = rooms.all_finished(session_id) and pcs["min_ok"] wait_results_btn = gr.update(interactive=can_results) # ── Decide next poll interval from whether anything changed ── def _s(x): return x if isinstance(x, str) else "" sig = "|".join([new_screen, _s(lobby_code), _s(lobby_roster), _s(play_timer), _s(play_status), _s(wait_status), _s(wait_line), _s(win_score)]) last_sig, idle, cur_iv = poll_state or ("", 0, POLL_FAST) if sig != last_sig: idle, target_iv = 0, POLL_FAST else: idle += 1 target_iv = POLL_SLOW if idle >= POLL_IDLE_LIMIT else POLL_FAST # Only emit a Timer change when the interval actually flips, so we don't # reset the running countdown every tick. timer_update = gr.Timer(target_iv) if target_iv != cur_iv else gr.update() new_state = (sig, idle, target_iv) return (*nav, new_screen, lobby_code, lobby_roster, lobby_start, lobby_start_anyway, lobby_hint, play_timer, play_status, wait_status, wait_line, wait_results_btn, win_score, timer_update, new_state) def play_tick(session_id, screen, team_id, taskids, cur_task): """On the play screen keep the task-id list current and, on first entry (no task open yet), open the first task in place. This covers both the host (who clicks Start) and guests (who auto-transition to play on the poll), so the detail view is always populated without a separate landing screen. Returns the full detail-open tuple plus the refreshed ``s_taskids``.""" if screen != "play" or not session_id: return (gr.update(),) * _N_DETAIL_OUTPUTS + (taskids,) ids = _task_id_list(session_id) ids_json = json.dumps(ids) if not cur_task and ids: # First time on the play screen for this client: open task 1. return _open_task(session_id, team_id, ids[0]) + (ids_json,) # Already viewing a task — leave the detail view untouched (don't reset the # player's in-progress photo/transcript every tick), just sync the id list. return (gr.update(),) * _N_DETAIL_OUTPUTS + (ids_json,) # ── Screen-action handlers ──────────────────────────────────────────────── # Lobby instruction copy — differs for the host vs. everyone else. The host # copy shows progress (joined/target); HINT_HOST_WAIT/HINT_HOST_SHORT are # format strings filled by lobby_hint_text(). HINT_HOST_WAIT = "⏳ Waiting for players… {joined}/{target} joined" HINT_HOST_SHORT = "▶ {joined}/{target} here — start now, or wait for the rest." HINT_HOST_READY = "✅ Everyone's in — start the adventure whenever you're ready." HINT_GUEST = "⏳ Waiting for the host to start the adventure…" def lobby_hint_text(session_id: str, is_host: bool) -> str: """Lobby instruction line. Guests always wait on the host; the host sees progress and which gate (full vs. structural minimum) they're at.""" if not is_host: return HINT_GUEST pcs = rooms.player_count_status(session_id) if pcs["full"] and pcs["min_ok"]: return HINT_HOST_READY if pcs["min_ok"]: return HINT_HOST_SHORT.format(joined=pcs["joined"], target=pcs["target"]) return HINT_HOST_WAIT.format(joined=pcs["joined"], target=pcs["target"]) def wait_playing_md(session_id: str) -> str: """Live 'who's still playing' line for the wait screen — active teams (>=1 player) that haven't finished yet.""" active = rooms.active_team_ids(session_id) still = [tid for tid in active if not rooms.team_finished(session_id, tid)] if not still: return "Everyone's finished — see results whenever you're ready." names = ", ".join(rooms.team_meta(session_id, tid)["name"] for tid in still) return f"Waiting on: {names}" def _team_btn_updates(session_id, selected_id): """Button updates for the 4 team-pick slots: label + visibility, with the player's current team flagged 'selected' so it reads as chosen (green).""" teams = rooms.list_teams(session_id) ups = [] for i in range(4): if i < len(teams): cls = ["team-pick", "selected"] if teams[i]["id"] == selected_id else ["team-pick"] ups.append(gr.update(value=f"{teams[i]['name']} · {teams[i]['role']}", visible=True, elem_classes=cls)) else: ups.append(gr.update(visible=False)) return ups def do_create(name, gtype, city, area, duration, teams_n, head_start, difficulty, age_group, energy, players_n): name = (name or "Host").strip() or "Host" # Defense-in-depth: the slider min is 2, but guard here too so a game can # never be created with a single team (build_teams also floors at 2). teams_n = int(teams_n) if teams_n < 2: gr.Warning("Minimum 2 teams required — using 2.") teams_n = 2 session_id = str(uuid.uuid4()) config = { "game_type": gtype, "city": city or "Paris", "area": area or "Downtown", "location_type": "mixed", "duration_minutes": int(duration), "num_players": int(players_n), "num_teams": int(teams_n), "head_start_seconds": int(head_start), "difficulty": difficulty, "age_group": age_group, "energy_level": energy, "photo_enabled": True, "landscape_tags": [], "theme": "", "mobility": "standard", "allow_transport": False, } rooms.create_room(name, config, session_id) teams = rooms.list_teams(session_id) tb = _team_btn_updates(session_id, teams[0]["id"]) # Host's s_team is their real team id (teams[0] — create_room files the host # under it too), NOT "", so host proof/journal/finish state and final # standings all key off the same id as every other same-team member. return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], True, code_html(session_id), roster_html(session_id, name), gr.update(visible=True, interactive=False), copy_btn_html(session_id), lobby_hint_text(session_id, True), *tb) def do_join(code, name): session_id, msg, ok = rooms.join_room(code, name) if not ok: gr.Warning(msg) return (*vis("join"), "join", "", "", "", False, f"⚠ {msg}", gr.update(), gr.update(), gr.update(visible=False), gr.update(), gr.update(), *([gr.update(visible=False)] * 4)) gr.Info(f"Joined room — {msg}") name = (name or "").strip() teams = rooms.list_teams(session_id) tb = _team_btn_updates(session_id, teams[0]["id"]) return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], False, f"✅ {msg}", code_html(session_id), roster_html(session_id, name), gr.update(visible=False), copy_btn_html(session_id), HINT_GUEST, *tb) def make_pick(idx): def _pick(session_id, player): teams = rooms.list_teams(session_id) if idx >= len(teams): return (gr.update(), gr.update(), *([gr.update()] * 4)) tid = teams[idx]["id"] rooms.pick_team(session_id, player, tid) return (tid, roster_html(session_id, player), *_team_btn_updates(session_id, tid)) return _pick def prepare_game(session_id): """Generate the game during the lobby wait so Start is instant. Triggered right after the host creates the room. Config is fully known at creation time (teams/players don't affect Nemotron), so we download + generate here, hidden behind the lobby. Runs in a normal Gradio event (request context) so the inner @spaces.GPU generation gets a real GPU. """ if not session_id: yield "" return s = SESSION_STORE.get(session_id) if not s: yield "" return if s.get("game"): yield "
✓ Quest ready
" return yield "
Preparing your quest — warming Nemotron & generating tasks…
" try: generate_for_session(session_id) yield "
✓ Quest ready — start whenever everyone's in
" except Exception as e: print(f"[prepare_game] {type(e).__name__}: {e}") yield "
Couldn't pre-generate — it'll generate when you start.
" def do_start(session_id, team_id=""): s = SESSION_STORE.get(session_id) or {} # Render the starter's OWN team strip/timer, not always team A — a host who # picked a later team shouldn't see Team A's strip. Never empty: fall back # to the first team (mirrors rooms.resolve_team_id's fallback). team_id = team_id or (s.get("teams") or [{"id": "team-a"}])[0]["id"] if not s.get("game"): # Fallback: lobby pre-generation didn't finish — show a loader. yield (*vis("lobby"), "lobby", gr.update(), gr.update(), "
Summoning your quest…
") generate_for_session(session_id) rooms.begin_play(session_id, SESSION_STORE[session_id].get("game")) yield (*vis("play"), "play", timer_html(session_id, team_id), status_strip_html(session_id, team_id), "") def do_leave(): return (*vis("home"), "home", "", "", "", False) # Number of outputs produced by _open_task / consumed by DETAIL_OPEN_OUTPUTS. # 17 fixed slots + 12 task-tab buttons. _N_DETAIL_OUTPUTS = 19 + 12 def _open_task(session_id, team_id, tid): """Render the full task-detail view for one task. Shared by the play-screen entry, the horizontal task tabs, and the prev/next arrows so every entry point produces an identical, consistent detail screen.""" # NOTE: tasks are session-global (session["game"]["tasks"]) and are the SAME # for everyone — never slice/filter them per player. team_id here only keys # the per-team completion/proof/hint OVERLAY, so same-team members (sharing # one resolved team_id) see one shared task/proof view. Do not regress this. can = rooms.can_complete(session_id, team_id, tid) return ( tid, gr.update(visible=True), # detail view (always shown on the play screen) task_detail_md(session_id, team_id, tid), task_hint_html(session_id, team_id, tid), task_safety_html(session_id, team_id, tid), task_gallery(session_id, team_id, tid), proof_md(session_id, team_id, tid), _complete_btn_update(can), _completion_instruction_update(can), "", None, "", "", "", # transcript, audio, tagline, ask answer, capture status journal_log_md(session_id, team_id, tid), _task_position(session_id, tid), _hint_points_update(session_id, team_id, tid), hint_log_md(session_id, team_id, tid), _hint_btn_update(session_id, team_id, tid), *_detail_tab_updates(session_id, team_id, tid), ) def open_task_factory(idx): """Open the task at position ``idx`` in the team's task list. Used by both the wall cards and the horizontal task tabs (both index into s_taskids).""" def _open(session_id, team_id, taskids): ids = json.loads(taskids or "[]") if idx >= len(ids): return (gr.update(),) * _N_DETAIL_OUTPUTS return _open_task(session_id, team_id, ids[idx]) return _open def go_adjacent_factory(delta): """Step to the previous (delta=-1) or next (delta=+1) task, clamped to the list ends, and open it in place.""" def _go(session_id, team_id, taskids, cur_tid): ids = json.loads(taskids or "[]") if not ids: return (gr.update(),) * _N_DETAIL_OUTPUTS i = ids.index(cur_tid) if cur_tid in ids else 0 i = max(0, min(len(ids) - 1, i + delta)) return _open_task(session_id, team_id, ids[i]) return _go def do_transcribe(audio_path, language): if not audio_path: yield "No recording yet — tap record above, then transcribe." return yield "🌀 Transcribing your note…" text = transcribe_only(audio_path, language or "en") yield text or "(No speech detected — pick the right spoken language, or type your note.)" def do_save_journal(session_id, transcript, audio_path, language, task_id, team_id): # 7 outputs: capture status, proof, complete-btn, completion instruction, # journal log, audio, transcript yield ("🌀 Saving your journal…", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()) msg = record_journal(session_id, transcript=transcript or "", task_id=task_id, team_id=team_id, audio_path=audio_path or "", language=language or "en") if msg.lstrip().startswith("⚠"): gr.Warning("Couldn't save — record audio or type a note first.") else: gr.Info("Journal saved 🎙️") can = rooms.can_complete(session_id, team_id, task_id) # Clear the recorder + transcript so the player can immediately record another. yield (msg, proof_md(session_id, team_id, task_id), _complete_btn_update(can), _completion_instruction_update(can), journal_log_md(session_id, team_id, task_id), None, "") def do_add_photo(session_id, image, tagline, task_id, team_id): # 7 outputs: capture status, gallery, proof, complete-btn, completion # instruction, photo, tagline if image is None: gr.Warning("Choose a photo first.") return ("Choose a photo first.", task_gallery(session_id, team_id, task_id), proof_md(session_id, team_id, task_id), gr.update(), gr.update(), image, tagline) upload_photo(session_id, image, caption=tagline or "", task_id=task_id, team_id=team_id) can = rooms.can_complete(session_id, team_id, task_id) gr.Info("Photo added 📸") return ("📸 Photo added with its tagline.", task_gallery(session_id, team_id, task_id), proof_md(session_id, team_id, task_id), _complete_btn_update(can), _completion_instruction_update(can), None, "") @spaces.GPU(duration=120) def _ask_gpu(question, session_id, team_id, task_id): s = SESSION_STORE.get(session_id) or {} game = s.get("game") or {} t = next((x for x in game.get("tasks", []) if x["task_id"] == task_id), {}) city = game.get("setup", {}).get("city", "") ctx = f"City: {city}. Task: {t.get('title','')} — {t.get('description','')}. Hint: {t.get('hint','')}" ans = None try: from app.services import minicpm ans = minicpm.ask(question, context=ctx) except Exception as e: print(f"[ask] failed: {e}") if not ans: ans = "*(The guide is resting — re-read the hint and scan for the named landmark. Try again in a moment.)*" return ans def on_get_hint(question, session_id, team_id, task_id): """Reveal a hint from the in-game guide and charge the team for it. Outputs (4): guide answer, available-points display, hint button, hint log. Points are only charged once a real answer is produced — an empty question or a spent task costs nothing. """ used = rooms.hints_used_count(session_id, team_id, task_id) remaining = max(0, rooms.HINT_CAP - used) if remaining <= 0: gr.Warning("No hints left for this task.") yield (gr.update(value="You've used all the hints for this task — check the hint history below."), gr.update(), _hint_btn_state(0), gr.update()) return if not (question or "").strip(): yield ("Tell me where you're stuck, or ask for a clue — then tap the button.", gr.update(), gr.update(), gr.update()) return # Thinking: disable the button so the hint can't be double-charged; don't # deduct until we actually have an answer. yield ("🌀 The guide is thinking…", gr.update(), gr.update(value="Thinking…", interactive=False), gr.update()) answer = _ask_gpu(question, session_id, team_id, task_id) used_now = use_hint(session_id, task_id, team_id, question=question, answer=answer) remaining_now = max(0, rooms.HINT_CAP - used_now) gr.Info(f"−{HINT_PENALTY} pts · Hint used · {remaining_now} hint(s) left") yield (f"💡 {answer}", _hint_points_update(session_id, team_id, task_id), _hint_btn_state(remaining_now), hint_log_md(session_id, team_id, task_id)) def do_complete(session_id, team_id, task_id): if rooms.can_complete(session_id, team_id, task_id): complete_task(session_id, task_id, team_id) gr.Info("Task completed — nice work! 🎉") # Re-render the same task so its tab shows ✓ and the completed state # is reflected in place (there's no wall to return to). return _open_task(session_id, team_id, task_id) gr.Warning("Add a photo or voice journal first to complete this task.") return (gr.update(),) * _N_DETAIL_OUTPUTS def do_finish(session_id, team_id): log_event(session_id, "game_finished", {"summary": f"{team_id} finished"}, team_id=team_id) rooms.finish_team(session_id, team_id) return (*vis("wait"), "wait", status_strip_html(session_id, team_id)) def do_see_results(session_id): """Manual wait->win advance — the ONLY path to the win screen. Fired by the 'See final results' button (gated/enabled by poll_tick), never automatic.""" return (*vis("win"), "win", scoreboard_md(session_id)) def _gather_moments(session_id, team_id): bits = [j.get("transcript", "") for j in rooms.team_journals(session_id, team_id)] bits += [p.get("caption", "") for p in rooms.team_photos(session_id, team_id)] return [b for b in bits if b][:12] @spaces.GPU(duration=180) def _funny_recap_gpu(moments): prompt = ( "Our playful city adventure moments: " + "; ".join(moments) + ". Write a short, funny, warm recap (4-6 sentences). Keep it light and silly. " "Do NOT mention winning, losing, scores, or rankings." ) ans = None try: from app.services import minicpm ans = minicpm.ask(prompt, context="", max_tokens=320) except Exception as e: print(f"[funny_recap] {e}") if not ans: ans = "### A few ridiculous highlights\n\n- " + "\n- ".join(moments) return ans def _stream_text(full: str): """Yield a string progressively for a narrative reveal (UI pacing only). Reveals paragraph-by-paragraph, falling back to sentence chunks for a single long block, accumulating so the Markdown stays well-formed at every step. The text itself is produced by the backend and is never altered. """ full = (full or "").strip() if not full: yield full return paras = [p for p in full.split("\n\n") if p.strip()] if len(paras) <= 1: # One block — chunk by sentence so long recaps still feel alive. import re paras = re.findall(r"[^.!?]+[.!?]*\s*", full) or [full] sep = "" else: sep = "\n\n" acc = "" for i, chunk in enumerate(paras): acc += (sep if i else "") + chunk yield acc time.sleep(0.12) def do_funny_recap(session_id, team_id): moments = _gather_moments(session_id, team_id) if not moments: yield "_Snap a photo or record a voice note first — then I'll spin a silly recap!_" return yield "🌀 Spinning a silly recap…" yield from _stream_text(_funny_recap_gpu(moments)) @spaces.GPU(duration=300) def _funny_post_gpu(session_id, team_id): paths = [p["photo_path"] for p in rooms.team_photos(session_id, team_id) if p.get("photo_path")] prompt = ("A whimsical, funny travel scrapbook poster, warm pastel beige tones, playful " "hand-drawn doodles, lighthearted city adventure, cozy and silly, no text") return generate_poster_sync(f"{session_id}-funny", prompt, photo_paths=paths) def do_funny_post(session_id, team_id): yield None, "🎨 Painting your funny poster…" yield _funny_post_gpu(session_id, team_id) @spaces.GPU(duration=180) def _win_recap_gpu(session_id): md, _ = generate_recap(session_id) return md def do_win_recap(session_id): yield "🌀 Writing your winning recap…" yield from _stream_text(_win_recap_gpu(session_id)) @spaces.GPU(duration=300) def _win_poster_gpu(session_id, kind): s = SESSION_STORE.get(session_id) or {} paths = [p["photo_path"] for p in s.get("photos", []) if p.get("photo_path")] prompts = { "ending": ("A triumphant golden-hour ending poster for a city adventure, warm beige and " "teal tones, celebratory and cinematic, no text"), "group": ("A joyful group celebration scrapbook poster, warm pastel beige tones, friends " "adventuring together, confetti, no text"), } return generate_poster_sync(f"{session_id}-{kind}", prompts.get(kind, prompts["ending"]), photo_paths=paths) def do_win_endpost(session_id): yield None, "🎨 Painting your ending poster…" yield _win_poster_gpu(session_id, "ending") def do_win_grouppost(session_id): yield None, "🎨 Painting your group poster…" yield _win_poster_gpu(session_id, "group") def do_new(): return (*vis("home"), "home", "", "", "", False) # ── Live form validation (pure UI, no backend calls) ────────────────────── # Mirror the room-code charset/length from rooms so the live hint matches what # join_room() will accept (it upper-cases + strips before checking). _CODE_LEN = 6 _CODE_CHARS = getattr(rooms, "_CODE_ALPHABET", "ABCDEFGHJKMNPQRSTUVWXYZ23456789") def validate_join_code(code: str, name: str): """Live feedback for the join form: code counter/validity + submit gating. Returns (info_html, submit_update). Purely presentational — never calls the backend; join_room() still does the authoritative check on submit. """ raw = (code or "").strip().upper() n = len(raw) bad = [c for c in raw if c not in _CODE_CHARS] has_name = bool((name or "").strip()) if n == 0: pill = "enter your 6-character code" elif n < _CODE_LEN: pill = f"{n}/{_CODE_LEN} characters" elif n > _CODE_LEN or bad: detail = "too long" if n > _CODE_LEN else f"unexpected: {' '.join(sorted(set(bad)))}" pill = f"{n}/{_CODE_LEN} · {detail}" else: pill = f"✓ {n}/{_CODE_LEN} · looks good" name_pill = "" if has_name else "add your name" info = f"
{pill}{name_pill}
" valid = (n == _CODE_LEN and not bad and has_name) return info, gr.update(interactive=valid) # Button busy-state helpers for .then() chaining around LLM/IO calls. # Disabling during the call prevents double-submits and gives a visual cue; # the in-component text loaders ("🌀 …") still communicate progress. def _btn_off(): return gr.update(interactive=False) def _btn_on(): return gr.update(interactive=True) _GTYPE_LABELS = { "scavenger_hunt": "Scavenger hunt", "hide_and_seek": "Hide & seek", "tag": "Tag", } def create_summary(name, gtype, city, area, teams, duration, players): """Live one-line recap of the create form. Pure string formatting.""" bits = [ _GTYPE_LABELS.get(gtype, gtype), (city or "Paris").strip(), ] if (area or "").strip(): bits.append(area.strip()) bits.append(f"{int(teams)} team{'s' if int(teams) != 1 else ''}") bits.append(f"{int(duration)} min") bits.append(f"{int(players)} players") summary = " · ".join(bits) host = (name or "").strip() who = f"Hosted by {host} — " if host else "" return ( "
" f"{who}" f"{summary}
" ) # ── ASR transcribe-only helper (no save) ────────────────────────────────── @spaces.GPU(duration=120) def transcribe_only(audio_path: str, language: str = "en") -> str: if not audio_path: return "" try: from app.services.asr import transcribe as _t result = _t(audio_path, language=language or "en") text = (result.get("transcript") or "").strip() print(f"[transcribe_only] lang={language} status={result.get('status')} " f"len={len(text)} preview={text[:80]!r}") return text except Exception as e: print(f"[transcribe_only] {type(e).__name__}: {e}") return "" # ══════════════════════════════════════════════════════════════════════════ # BLOCKS # ══════════════════════════════════════════════════════════════════════════ with gr.Blocks(title="CityQuest · AI") as demo: # per-browser state s_session = gr.State("") s_player = gr.State("") s_team = gr.State("") s_host = gr.State(False) s_screen = gr.State("home") s_task = gr.State("") s_taskids = gr.State("[]") s_lang = gr.State("en") # voice-journal ISO code, set on the Create screen s_poll_state = gr.State(("", 0, POLL_FAST)) # (signature, idle_ticks, interval) gr.HTML( "
" "🌲 CityQuest · AI" "
explore your city, the unhurried way
" ) # ── SCREEN: HOME ────────────────────────────────────────────────────── with gr.Group(visible=True, elem_classes=["cq-screen", "cq-home"]) as g_home: gr.HTML("
" "
A calm, playful city adventure
" "
") with gr.Row(): with gr.Column(): with gr.Group(elem_classes=["cq-card", "cq-home-card"]): gr.Markdown("Host a new adventure and invite your friends.", elem_classes=["landing-description"]) home_create = gr.Button("Create a room", variant="primary", size="lg", elem_id="home-create", elem_classes=["landing-btn"]) with gr.Column(): with gr.Group(elem_classes=["cq-card", "cq-home-card"]): gr.Markdown("Got a code from a friend? Hop in.", elem_classes=["landing-description"]) home_join = gr.Button("Join a room", variant="secondary", size="lg", elem_id="home-join", elem_classes=["landing-btn"]) # ── SCREEN: CREATE ──────────────────────────────────────────────────── with gr.Group(visible=False, elem_classes=["cq-screen", "cq-create"]) as g_create: with gr.Column(elem_classes=["cq-card"]): gr.Markdown("## Host a new adventure") c_name = gr.Textbox(label="What to call you?", placeholder="e.g. Arjun", value="") with gr.Row(): c_type = gr.Dropdown(["scavenger_hunt", "hide_and_seek", "tag"], value="scavenger_hunt", label="Game type", filterable=False, elem_classes=["cq-dropdown"]) c_teams = gr.Slider(2, 4, value=2, step=1, label="Teams (min 2)") with gr.Row(): c_city = gr.Textbox(label="City", placeholder="Paris") c_area = gr.Textbox(label="Area / neighbourhood", placeholder="Le Marais") with gr.Row(): c_duration = gr.Slider(15, 120, value=45, step=5, label="Duration (min)") # Starts at 0 / hidden because the default game type is # scavenger_hunt (no head start). toggle_head_start raises it to # 60 and reveals it when a staggered-start game type is picked. c_headstart = gr.Slider(0, 300, value=0, step=15, label="Head start between teams (sec)", visible=False) with gr.Row(): c_diff = gr.Dropdown(["easy", "medium", "hard"], value="medium", label="Difficulty", elem_classes=["cq-dropdown"]) c_age = gr.Dropdown(["kids", "teens", "adults", "all"], value="all", label="Age group", elem_classes=["cq-dropdown"]) with gr.Row(): c_energy = gr.Dropdown(["chill", "balanced", "high"], value="balanced", label="Energy", elem_classes=["cq-dropdown"]) c_players = gr.Slider(2, 20, value=4, step=1, label="Players") with gr.Row(): c_language = gr.Dropdown(LANGUAGE_CHOICES, value="English", label="Voice journal language", filterable=False, elem_classes=["cq-dropdown"]) create_summary_html = gr.HTML("", elem_classes=["game-summary"]) with gr.Row(): create_back = gr.Button("← Back", variant="secondary", elem_id="create-back", elem_classes=["back-btn", "create-btn"]) create_submit = gr.Button("Open the lobby →", variant="primary", elem_id="create-submit", elem_classes=["open-lobby-btn", "create-btn"]) # ── SCREEN: JOIN ────────────────────────────────────────────────────── with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_join: with gr.Column(elem_classes=["cq-card"]): gr.Markdown("## Join a room") j_code = gr.Textbox(label="Room code", placeholder="e.g. WAVE42") join_code_info = gr.HTML("") j_name = gr.Textbox(label="Your name", placeholder="e.g. Maya") join_feedback = gr.Markdown("") with gr.Row(): join_back = gr.Button("← Back", variant="secondary", elem_id="join-back") join_submit = gr.Button("Join →", variant="primary", interactive=False, elem_id="join-submit") # ── SCREEN: LOBBY ───────────────────────────────────────────────────── with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_lobby: with gr.Column(elem_classes=["cq-card"]): lobby_code = gr.HTML("") lobby_copy = gr.HTML("") gr.HTML("
Pick your team
") with gr.Row(): team_btn = [gr.Button("", variant="secondary", visible=False, elem_classes=["team-pick"]) for _ in range(4)] gr.HTML("
Who's here
") lobby_roster = gr.HTML("") lobby_prep = gr.HTML("", elem_classes=["lobby-prep"]) lobby_hint = gr.Markdown(HINT_GUEST, elem_classes=["lobby-hint"]) with gr.Row(): lobby_leave = gr.Button("Leave", variant="secondary", elem_id="lobby-leave") lobby_start = gr.Button("Start the adventure →", variant="primary", visible=False, interactive=False, elem_id="lobby-start", elem_classes=["lobby-start-btn"]) # Host-only short-handed override, sitting just below Start as a green # underlined text link (not a button). Hidden until poll_tick reveals # it (min_ok and not full); the normal Start covers the full case. lobby_start_anyway = gr.Button("▶ Start anyway", variant="secondary", visible=False, interactive=False, elem_id="lobby-start-anyway", elem_classes=["lobby-start-link"]) # ── SCREEN: PLAY ────────────────────────────────────────────────────── with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_play: play_timer = gr.HTML("") gr.HTML("
Teams
") play_status = gr.HTML("") # The play screen lands directly in the task detail; the horizontal task # tabs below are the only task switcher (the old vertical task wall was with gr.Group(visible=True, elem_classes=["cq-reveal"]) as g_detail: # Wrap everything in a master row with spacers to align the ENTIRE page with gr.Row(): # 1. Left global spacer gr.Column(scale=1, min_width=0) # 2. Main Central Column (Aligns Tabs, Text, and Boxes perfectly) with gr.Column(scale=10, elem_classes=["play-card"]): # --- Tabs & Nav --- with gr.Row(elem_classes=["task-tab-row"]): detail_tab_btn = [gr.Button("", visible=False, size="sm", elem_classes=["task-tab"]) for _ in range(12)] with gr.Row(elem_classes=["task-nav-row"]): prev_task_btn = gr.Button("← Prev", size="sm", scale=0, elem_id="td-prev", elem_classes=["task-nav-btn"]) task_position_label = gr.Markdown("", elem_classes=["task-position"]) next_task_btn = gr.Button("Next →", size="sm", scale=0, elem_id="td-next", elem_classes=["task-nav-btn"]) # --- Text Content --- detail_md = gr.Markdown("") detail_hint = gr.HTML("") detail_safety = gr.HTML("") # --- Voice & Photos --- with gr.Row(equal_height=True, elem_classes=["panels-row"]): with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]): gr.HTML("
🎙️ Voice journal
") d_audio = gr.Audio(sources=["microphone", "upload"], type="filepath", format="wav", show_label=False, elem_classes=["cq-audio"], waveform_options=gr.WaveformOptions( show_recording_waveform=False)) d_transcript = gr.Textbox(label="Your notes", lines=3, placeholder="Transcribe your audio, or type a note…") with gr.Row(): d_transcribe = gr.Button("Transcribe", variant="secondary", elem_id="td-transcribe", elem_classes=["btn-secondary-action"]) d_savejournal = gr.Button("Save journal", variant="primary", elem_id="td-savejournal", elem_classes=["btn-primary-action"]) d_journal_log = gr.Markdown("") with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]): gr.HTML("
📸 Photos & taglines
") d_photo = gr.Image(type="filepath", show_label=False, height=170) d_tagline = gr.Textbox(label="Caption", placeholder="A caption for this photo", lines=3) with gr.Row(): d_addphoto = gr.Button("Add photo", variant="primary", elem_id="td-addphoto", elem_classes=["btn-primary-action"]) d_gallery = gr.Gallery(label="Photos for this task", columns=3, height=170) # --- Footer Actions --- d_capture_status = gr.Markdown("") with gr.Row(elem_classes=["hints-header"]): gr.Markdown("💡 Stuck? Take a hint..", elem_classes=["guide-header", "hints-label"]) d_points = gr.Markdown("", elem_classes=["points-display"]) with gr.Row(elem_classes=["hints-input-row"]): d_ask_q = gr.Textbox(show_label=False, scale=1, placeholder="Ask for a clue, or tell me where you're stuck…", elem_classes=["hint-input"]) d_ask_btn = gr.Button(f"Get a hint −{HINT_PENALTY} pts", scale=0, min_width=220, elem_id="td-ask", elem_classes=["btn-hint"]) d_ask_a = gr.Markdown("") d_hint_log = gr.Markdown("") d_proof = gr.Markdown("") completion_instruction = gr.Markdown( "🔒 Add a photo or voice note to unlock completion", elem_classes=["completion-instruction", "locked"]) d_complete = gr.Button("🔒 Mark completed", variant="primary", interactive=False, elem_id="td-complete", elem_classes=["mark-complete-btn", "complete-btn", "locked"]) d_finish = gr.Button("🏁 Finish & wait for others", variant="primary", elem_id="td-finish", elem_classes=["finish-btn"]) # 3. Right global spacer gr.Column(scale=1, min_width=0) # ── SCREEN: WAIT-LOBBY ──────────────────────────────────────────────── with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_wait: with gr.Column(elem_classes=["cq-card"]): gr.Markdown("## 🌿 You're done — nicely paced!") gr.Markdown("*Waiting for the other teams to wrap up. Meanwhile, make something silly:*") wait_status = gr.HTML("") # Live "who's still playing" line + manual advance to results. The # button stays DISABLED until the results gate passes (all active # teams finished AND min_ok); poll_tick flips its enabled state. The # wait screen NEVER auto-navigates — this click is the only path. wait_status_line = gr.Markdown("", elem_classes=["lobby-hint"]) wait_see_results = gr.Button("🏆 See final results", variant="primary", interactive=False, elem_id="wait-see-results", elem_classes=["lobby-start-btn"]) with gr.Row(): wait_funny_recap = gr.Button("😄 Funny recap", variant="secondary", elem_id="wait-funny-recap") wait_funny_post = gr.Button("🎨 Funny poster", variant="secondary", elem_id="wait-funny-post") wait_recap_md = gr.Markdown("") wait_post_img = gr.Image(label="Funny poster", height=320, visible=True, elem_classes=["cq-poster"]) wait_post_status = gr.Markdown("") # ── SCREEN: WIN ─────────────────────────────────────────────────────── with gr.Group(visible=False, elem_classes=["cq-screen", "cq-win"]) as g_win: with gr.Column(elem_classes=["cq-win-standings"]): win_scoreboard = gr.Markdown("Final standings will appear here.") with gr.Group(elem_classes=["cq-card", "cq-win-celebrate"]): gr.Markdown("### 🎉 Celebrate the run") win_recap = gr.Button("📖 Winning recap", variant="primary", elem_id="win-recap", elem_classes=["cq-win-btn"]) win_recap_md = gr.Markdown("") with gr.Row(): win_endpost = gr.Button("🌅 Ending poster", variant="secondary", elem_id="win-endpost", elem_classes=["cq-win-btn"]) win_grouppost = gr.Button("👯 Group poster", variant="secondary", elem_id="win-grouppost", elem_classes=["cq-win-btn"]) win_poster_img = gr.Image(label="Poster", height=320, visible=True) win_poster_status = gr.Markdown("") win_new = gr.Button("✨ Start a new adventure", variant="primary", elem_id="win-new", elem_classes=["cq-win-btn", "cq-win-new"]) # ── Shared poll timer ───────────────────────────────────────────────── poll = gr.Timer(1.5) nav_groups = [g_home, g_create, g_join, g_lobby, g_play, g_wait, g_win] poll.tick(poll_tick, inputs=[s_session, s_screen, s_team, s_player, s_host, s_poll_state], outputs=nav_groups + [s_screen, lobby_code, lobby_roster, lobby_start, lobby_start_anyway, lobby_hint, play_timer, play_status, wait_status, wait_status_line, wait_see_results, win_scoreboard, poll, s_poll_state]) # ── Navigation ──────────────────────────────────────────────────────── create_inputs = [c_name, c_type, c_city, c_area, c_teams, c_duration, c_players] home_create.click(lambda: (*vis("create"), "create"), outputs=nav_groups + [s_screen] ).then(create_summary, inputs=create_inputs, outputs=[create_summary_html]) home_join.click(lambda: (*vis("join"), "join"), outputs=nav_groups + [s_screen]) create_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen]) join_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen]) # ── Head start visibility: hide for scavenger hunt (irrelevant there) ── def toggle_head_start(game_type): if game_type == "scavenger_hunt": return gr.update(visible=False, value=0) return gr.update(visible=True, value=60) c_type.change(toggle_head_start, inputs=[c_type], outputs=[c_headstart]) # ── Live form validation (pure UI) ──────────────────────────────────── for comp in create_inputs: comp.change(create_summary, inputs=create_inputs, outputs=[create_summary_html]) for comp in (j_code, j_name): comp.change(validate_join_code, inputs=[j_code, j_name], outputs=[join_code_info, join_submit]) # Route the host's chosen voice-journal language (full name) into the ISO # code held in s_lang, which the play-screen audio handlers consume. c_language.change(lambda name: LANGUAGE_TO_ISO.get(name, "en"), inputs=[c_language], outputs=[s_lang]) create_submit.click(_btn_off, None, [create_submit]).then( do_create, inputs=[c_name, c_type, c_city, c_area, c_duration, c_teams, c_headstart, c_diff, c_age, c_energy, c_players], outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host, lobby_code, lobby_roster, lobby_start, lobby_copy, lobby_hint] + team_btn, ).then(prepare_game, inputs=[s_session], outputs=[lobby_prep] ).then(_btn_on, None, [create_submit]) join_submit.click( do_join, inputs=[j_code, j_name], outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host, join_feedback, lobby_code, lobby_roster, lobby_start, lobby_copy, lobby_hint] + team_btn, ) for i, tb in enumerate(team_btn): tb.click(make_pick(i), inputs=[s_session, s_player], outputs=[s_team, lobby_roster] + team_btn) lobby_leave.click(do_leave, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host]) # Both Start and Start-anyway funnel through the SAME do_start path — no # duplicated start logic; only the gating (in poll_tick) differs. lobby_start.click(_btn_off, None, [lobby_start]).then( do_start, inputs=[s_session, s_team], outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep] ).then(_btn_on, None, [lobby_start]) lobby_start_anyway.click(_btn_off, None, [lobby_start_anyway]).then( do_start, inputs=[s_session, s_team], outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep] ).then(_btn_on, None, [lobby_start_anyway]) # ── Task tabs → detail ──────────────────────────────────────────────── # Order must match _open_task's return tuple (19 fixed + 12 tab buttons). detail_open_outputs = [s_task, g_detail, detail_md, detail_hint, detail_safety, d_gallery, d_proof, d_complete, completion_instruction, d_transcript, d_audio, d_tagline, d_ask_a, d_capture_status, d_journal_log, task_position_label, d_points, d_hint_log, d_ask_btn] + detail_tab_btn # Keep the task-id list fresh and auto-open the first task when a client # first reaches the play screen (host via Start, guests via poll). poll.tick(play_tick, inputs=[s_session, s_screen, s_team, s_taskids, s_task], outputs=detail_open_outputs + [s_taskids]) # The in-detail horizontal task tabs index into s_taskids. for i, dtb in enumerate(detail_tab_btn): dtb.click(open_task_factory(i), inputs=[s_session, s_team, s_taskids], outputs=detail_open_outputs) prev_task_btn.click(go_adjacent_factory(-1), inputs=[s_session, s_team, s_taskids, s_task], outputs=detail_open_outputs) next_task_btn.click(go_adjacent_factory(+1), inputs=[s_session, s_team, s_taskids, s_task], outputs=detail_open_outputs) d_finish.click(do_finish, inputs=[s_session, s_team], outputs=nav_groups + [s_screen, wait_status]) d_transcribe.click(_btn_off, None, [d_transcribe]).then( do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript] ).then(_btn_on, None, [d_transcribe]) # Auto-transcribe once the clip is actually on the server. stop_recording # (mic) and upload (file) fire *after* Gradio has committed/uploaded the # audio, so do_transcribe always sees a real filepath. This removes the # race where clicking "Transcribe" immediately after recording sent a # stale/None path and seemed to do nothing until you clicked around. d_audio.stop_recording(_btn_off, None, [d_transcribe]).then( do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript] ).then(_btn_on, None, [d_transcribe]) d_audio.upload(_btn_off, None, [d_transcribe]).then( do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript] ).then(_btn_on, None, [d_transcribe]) d_savejournal.click(_btn_off, None, [d_savejournal]).then( do_save_journal, inputs=[s_session, d_transcript, d_audio, s_lang, s_task, s_team], outputs=[d_capture_status, d_proof, d_complete, completion_instruction, d_journal_log, d_audio, d_transcript] ).then(_btn_on, None, [d_savejournal]) d_addphoto.click(do_add_photo, inputs=[s_session, d_photo, d_tagline, s_task, s_team], outputs=[d_capture_status, d_gallery, d_proof, d_complete, completion_instruction, d_photo, d_tagline]) # on_get_hint manages the button itself (disable while thinking, then set # the "(N left)"/"No hints left" state), so it does NOT use the generic # _btn_off/_btn_on toggle — that would re-enable a spent button. d_ask_btn.click( on_get_hint, inputs=[d_ask_q, s_session, s_team, s_task], outputs=[d_ask_a, d_points, d_ask_btn, d_hint_log] ) # After completing, re-render the same task in place so the ✓ shows on its # tab and the completed state is reflected (no wall to bounce back to). d_complete.click(do_complete, inputs=[s_session, s_team, s_task], outputs=detail_open_outputs) # ── Wait-lobby actions ──────────────────────────────────────────────── # Manual, click-only advance to the win screen (gate/enabled by poll_tick). wait_see_results.click(do_see_results, inputs=[s_session], outputs=nav_groups + [s_screen, win_scoreboard]) wait_funny_recap.click(_btn_off, None, [wait_funny_recap]).then( do_funny_recap, inputs=[s_session, s_team], outputs=[wait_recap_md] ).then(_btn_on, None, [wait_funny_recap]) wait_funny_post.click(_btn_off, None, [wait_funny_post]).then( do_funny_post, inputs=[s_session, s_team], outputs=[wait_post_img, wait_post_status] ).then(_btn_on, None, [wait_funny_post]) # ── Winning actions ─────────────────────────────────────────────────── win_recap.click(_btn_off, None, [win_recap]).then( do_win_recap, inputs=[s_session], outputs=[win_recap_md] ).then(_btn_on, None, [win_recap]) win_endpost.click(_btn_off, None, [win_endpost]).then( do_win_endpost, inputs=[s_session], outputs=[win_poster_img, win_poster_status] ).then(_btn_on, None, [win_endpost]) win_grouppost.click(_btn_off, None, [win_grouppost]).then( do_win_grouppost, inputs=[s_session], outputs=[win_poster_img, win_poster_status] ).then(_btn_on, None, [win_grouppost]) win_new.click(do_new, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host]) # ── Entry point ──────────────────────────────────────────────────────────── if __name__ == "__main__": # ssr_mode=False avoids the Node SSR proxy that emits spurious # `sse_stream 404` errors with our 1.5s polling timer. demo.launch(theme=gradio_theme(), css=CSS, js=JS_INIT, ssr_mode=False,share=True)