Spaces:
Running on Zero
Running on Zero
| import os | |
| import time | |
| import uuid | |
| import json | |
| import html | |
| from pathlib import Path | |
| import gradio as gr | |
| try: | |
| import spaces # only available on Hugging Face Spaces runtime | |
| except ImportError: | |
| # Provide a dummy so @spaces.GPU(duration=...) is a no-op locally | |
| # while keeping the decorator statically visible in the source code. | |
| class _spaces: | |
| def GPU(duration=300): | |
| return lambda fn: fn | |
| spaces = _spaces | |
| from app.services.retrieval import load_games_dataset, normalize_game_record, retrieve_examples | |
| from app.services.generator import ( | |
| generate_game, generate_game_with_model, build_generation_prompt, | |
| unload_nemotron, | |
| ) | |
| from app.services.validator import validate_game, repair_game | |
| from app.services.schema_validator import create_minimal_game_template | |
| from app.services.tracing import log_event, log_generation_trace, load_events | |
| from app.services.journal import ( | |
| create_journal_entry, save_journal_entry, summarize_journal, | |
| load_journal_entries, detect_mood, assess_story_value, | |
| ) | |
| from app.services.scoring import compute_scores, HINT_PENALTY | |
| from app.services.story import build_story_packet, generate_story | |
| from app.services.image_gen import generate_poster_sync | |
| # ── Voice-journal language options ───────────────────────────────────────── | |
| # Shown on the Create Lobby screen as full names; mapped to the ISO codes the | |
| # ASR service expects before being routed into the play-screen audio handlers. | |
| LANGUAGE_CHOICES = [ | |
| "English", | |
| "French", | |
| "Spanish", | |
| "German", | |
| "Italian", | |
| "Portuguese", | |
| "Dutch", | |
| "Japanese", | |
| "Chinese (Mandarin)", | |
| "Arabic", | |
| "Hindi", | |
| "Korean", | |
| "Russian", | |
| "Turkish", | |
| "Polish", | |
| ] | |
| LANGUAGE_TO_ISO = { | |
| "English": "en", "French": "fr", "Spanish": "es", | |
| "German": "de", "Italian": "it", "Portuguese": "pt", | |
| "Dutch": "nl", "Japanese": "ja", "Chinese (Mandarin)": "zh", | |
| "Arabic": "ar", "Hindi": "hi", "Korean": "ko", | |
| "Russian": "ru", "Turkish": "tr", "Polish": "pl", | |
| } | |
| # ── Load datasets once on startup ────────────────────────────────────────── | |
| BASE_DIR = Path(__file__).resolve().parent | |
| DATASET_PATHS = { | |
| "old": BASE_DIR / "app/data/games_dataset.json", | |
| "scavenger_hunt": BASE_DIR / "app/data/scavenger_hunt/dataset.json", | |
| "hide_and_seek": BASE_DIR / "app/data/hide_and_seek/dataset.json", | |
| "tag": BASE_DIR / "app/data/tag/dataset.json", | |
| } | |
| DATA_RECORDS = [] | |
| for src, path in DATASET_PATHS.items(): | |
| try: | |
| raw = load_games_dataset(str(path)) | |
| normalized = [normalize_game_record(r) for r in raw] | |
| DATA_RECORDS.extend(normalized) | |
| print(f"[OK] Loaded {len(normalized)} records from {src} dataset ({path.name})") | |
| except FileNotFoundError: | |
| print(f"[!] Dataset not found at {path}, skipping {src}") | |
| except Exception as e: | |
| print(f"[!] Error loading {src} dataset from {path}: {e}") | |
| print(f"[OK] Total: {len(DATA_RECORDS)} game records loaded for retrieval") | |
| # ── Shared multiplayer state lives in app.services.rooms ─────────────────── | |
| # SESSION_STORE / ADVENTURE_CODES are owned by the rooms module so the lobby, | |
| # task wall and wait-lobby all read/write the same in-process truth. | |
| from app.services import rooms | |
| from app.services.rooms import SESSION_STORE, ADVENTURE_CODES, save_state, load_state | |
| # ── HF Spaces Zero GPU generation ────────────────────────────────────────── | |
| # This function is statically decorated so HF Spaces detects the GPU | |
| # requirement at startup. The 2.84 GB GGUF model is lazily downloaded | |
| # on first call inside the GPU context. | |
| def _generate_with_gpu(config: dict, retrieved: list[dict]): | |
| """Generate game using GPU (Nemotron 3 Nano 4B via llama.cpp). | |
| Runs inside @spaces.GPU so CUDA is available. The model download | |
| and llama.cpp initialisation happen lazily on first call. | |
| """ | |
| import json | |
| prompt = build_generation_prompt(config, retrieved) | |
| json_str = generate_game_with_model(prompt, model_name="nemotron") | |
| if json_str: | |
| try: | |
| game = json.loads(json_str) | |
| if all(field in game for field in ["game_id", "title", "setup", "tasks", "safety"]): | |
| return game | |
| except json.JSONDecodeError: | |
| pass | |
| return None | |
| def _generate_game(config: dict, retrieved: list[dict]): | |
| """Try GPU generation; fall back to CPU/mock if unavailable.""" | |
| if spaces is not None: | |
| game = _generate_with_gpu(config, retrieved) | |
| if game is not None: | |
| return game | |
| print("[app] GPU generation returned None, falling back to CPU/mock") | |
| return generate_game(config, retrieved) | |
| # ── Pipeline entry point ────────────────────────────────────────────────────── | |
| def run_pipeline( | |
| game_type: str, | |
| city: str, | |
| area: str, | |
| location_type: str, | |
| duration_minutes: int, | |
| num_players: int, | |
| difficulty: str, | |
| age_group: str, | |
| energy_level: str, | |
| ): | |
| """Run the full AI generation pipeline end-to-end.""" | |
| session_id = str(uuid.uuid4()) | |
| config = { | |
| "game_type": game_type, | |
| "city": city or "Paris", | |
| "area": area or "Downtown", | |
| "location_type": location_type, | |
| "duration_minutes": int(duration_minutes), | |
| "num_players": int(num_players), | |
| "difficulty": difficulty, | |
| "age_group": age_group, | |
| "energy_level": energy_level, | |
| "photo_enabled": True, | |
| # New fields for enhanced retrieval scoring | |
| "landscape_tags": [], | |
| "theme": "", | |
| "mobility": "standard", | |
| "allow_transport": False, | |
| } | |
| state = {} | |
| # 1 ── Retrieval | |
| state["num_retrieved"] = 0 | |
| if DATA_RECORDS: | |
| retrieved = retrieve_examples(config, DATA_RECORDS, k=3) | |
| state["num_retrieved"] = len(retrieved) | |
| state["retrieved_ids"] = [r["id"] for r in retrieved] | |
| else: | |
| retrieved = [] | |
| # 2 ── Generation (GPU if available, CPU/mock fallback otherwise) | |
| game = _generate_game(config, retrieved) | |
| state["game_id"] = game["game_id"] | |
| state["game_title"] = game["title"] | |
| # Free GPU memory — Nemotron is no longer needed after generation. | |
| # This makes room for FLUX poster / Cohere ASR later. | |
| unload_nemotron() | |
| # 3 ── Validation | |
| is_valid, failures = validate_game(game, config) | |
| state["validation_passed"] = is_valid | |
| state["validation_failures"] = failures | |
| # 4 ── Repair (if needed) | |
| repaired = None | |
| if not is_valid: | |
| repaired = repair_game(game, failures, config) | |
| state["repair_applied"] = True | |
| is_valid2, failures2 = validate_game(repaired, config) | |
| state["repair_valid"] = is_valid2 | |
| state["remaining_failures"] = failures2 | |
| else: | |
| state["repair_applied"] = False | |
| final_game = repaired if repaired is not None else game | |
| # Stamp the selected game type onto the game so it's recorded explicitly | |
| # (the model output / schema don't carry it) and can't drift from config. | |
| final_game["game_type"] = config.get("game_type", "scavenger_hunt") | |
| # 5 ── Log generation trace | |
| log_generation_trace( | |
| session_id=session_id, | |
| config=config, | |
| retrieved_examples=retrieved, | |
| game=final_game, | |
| validation_passed=is_valid or (repaired is not None and state.get("repair_valid", False)), | |
| validation_failures=failures, | |
| repaired_game=repaired, | |
| ) | |
| # 6 ── Reveal all tasks as events | |
| for task in final_game.get("tasks", []): | |
| log_event(session_id, "task_revealed", { | |
| "task_id": task["task_id"], | |
| "title": task["title"], | |
| "points": task["points"], | |
| }) | |
| # 7 ── Store session | |
| SESSION_STORE[session_id] = { | |
| "config": config, | |
| "game": final_game, | |
| "events": [], | |
| "journals": [], | |
| } | |
| # 8 ── Build summary text | |
| summary = build_summary(final_game, state, session_id) | |
| return summary, session_id | |
| def generate_for_session(session_id: str) -> dict: | |
| """Generate (and validate/repair) a game for an *existing* lobby session. | |
| Used when the host presses Start: the room/players already exist, so we | |
| only run retrieval → generation → validation → repair, store the game on | |
| the session, and reveal the tasks. Returns the final game dict. | |
| Reuses the same pipeline pieces as :func:`run_pipeline`. | |
| """ | |
| session = SESSION_STORE.get(session_id) | |
| if not session: | |
| raise ValueError("Unknown session") | |
| # Idempotent: once a quest exists for this session, never re-roll it. A | |
| # second call (double-clicked Start, a poll race, a reload mid-flow) must | |
| # return the SAME tasks — players completing a task should never see a | |
| # different set of tasks appear under them. | |
| if session.get("game"): | |
| return session["game"] | |
| config = session["config"] | |
| retrieved = retrieve_examples(config, DATA_RECORDS, k=3) if DATA_RECORDS else [] | |
| game = _generate_game(config, retrieved) | |
| unload_nemotron() # free GPU for ASR / MiniCPM / FLUX | |
| is_valid, failures = validate_game(game, config) | |
| repaired = repair_game(game, failures, config) if not is_valid else None | |
| final_game = repaired if repaired is not None else game | |
| # Record the selected game type on the game object (authoritative = config). | |
| final_game["game_type"] = config.get("game_type", "scavenger_hunt") | |
| log_generation_trace( | |
| session_id=session_id, config=config, retrieved_examples=retrieved, | |
| game=final_game, | |
| validation_passed=is_valid or (repaired is not None), | |
| validation_failures=failures, repaired_game=repaired, | |
| ) | |
| for task in final_game.get("tasks", []): | |
| log_event(session_id, "task_revealed", { | |
| "task_id": task["task_id"], "title": task["title"], | |
| "points": task["points"], | |
| }) | |
| session["game"] = final_game | |
| # Persist immediately so a process restart (e.g. dev hot-reload) between | |
| # generation and begin_play() can't lose the quest and trigger a re-roll. | |
| save_state() | |
| return final_game | |
| # ── Phase 3: Gameplay helpers ───────────────────────────────────────────────── | |
| def complete_task(session_id: str, task_id: str, team_id: str = "team-a"): | |
| """Log a task completion event and return updated scoreboard.""" | |
| if session_id not in SESSION_STORE: | |
| return "⚠ Unknown session" | |
| ev = log_event(session_id, "task_completed", { | |
| "task_id": task_id, | |
| "summary": f"Team {team_id} completed {task_id}", | |
| }, team_id=team_id) | |
| SESSION_STORE[session_id]["events"].append(ev) | |
| return f"✅ Task {task_id} completed!" | |
| def skip_task(session_id: str, task_id: str, team_id: str = "team-a"): | |
| """Log a task skip event.""" | |
| if session_id not in SESSION_STORE: | |
| return "⚠ Unknown session" | |
| ev = log_event(session_id, "task_skipped", { | |
| "task_id": task_id, | |
| "summary": f"Team {team_id} skipped {task_id}", | |
| }, team_id=team_id) | |
| SESSION_STORE[session_id]["events"].append(ev) | |
| return f"⏭️ Task {task_id} skipped." | |
| def use_hint(session_id: str, task_id: str, team_id: str = "team-a", | |
| question: str = "", answer: str = ""): | |
| """Log a hint usage event and return the team's new hint count for this task. | |
| The question/answer are stored on the event so the detail view can show a | |
| hint history. Scoring (``compute_scores``) deducts ``HINT_PENALTY`` per | |
| ``hint_used`` event, so no separate score mutation is needed here.""" | |
| if session_id not in SESSION_STORE: | |
| return 0 | |
| ev = log_event(session_id, "hint_used", { | |
| "task_id": task_id, | |
| "question": (question or "").strip(), | |
| "answer": (answer or "").strip(), | |
| "summary": f"Team {team_id} used a hint for {task_id}", | |
| }, team_id=team_id) | |
| SESSION_STORE[session_id]["events"].append(ev) | |
| return rooms.hints_used_count(session_id, team_id, task_id) | |
| def record_journal( | |
| session_id: str, | |
| transcript: str = "", | |
| task_id: str = "", | |
| location_note: str = "", | |
| team_id: str = "team-a", | |
| audio_path: str = "", | |
| language: str = "en", | |
| ): | |
| """Record a journal entry, summarize it, and return the result. | |
| Decorated with ``@spaces.GPU`` so the Cohere ASR model can run on the | |
| GPU. On HF ZeroGPU, ``torch.cuda.is_available()`` is ``False`` outside | |
| a ``@spaces.GPU`` function, so without this the ASR model would load | |
| on CPU and the voice-journal button would appear to hang. | |
| Two input paths are supported: | |
| * **Voice path** — pass ``audio_path`` (e.g. a path returned by | |
| ``gr.Audio(type="filepath")``). The audio is transcribed with the | |
| Cohere ASR service (``CohereLabs/cohere-transcribe-03-2026``) and | |
| the transcript is stored with ASR metadata. | |
| * **Typed path** — pass ``transcript`` directly. The user can also | |
| edit an ASR transcript in the UI before submitting; if both | |
| ``audio_path`` and ``transcript`` are present, the typed | |
| transcript wins and is treated as a manual correction | |
| (``transcript_source == "hybrid"``). | |
| """ | |
| if session_id not in SESSION_STORE: | |
| return "[!] Unknown session" | |
| asr_metadata: dict | None = None | |
| audio_ref: str | None = None | |
| transcript_source = "typed" | |
| # ── 1. Voice path — transcribe audio if provided ─────────────────── | |
| if audio_path: | |
| audio_ref = audio_path | |
| try: | |
| from app.services.asr import transcribe as _asr_transcribe | |
| asr_result = _asr_transcribe(audio_path, language=language) | |
| asr_metadata = { | |
| "model": asr_result.get("model"), | |
| "language": asr_result.get("language"), | |
| "status": asr_result.get("status"), | |
| "error": asr_result.get("error"), | |
| } | |
| asr_text = (asr_result.get("transcript") or "").strip() | |
| if asr_text and not transcript.strip(): | |
| transcript = asr_text | |
| transcript_source = "asr" | |
| elif asr_text and transcript.strip() and asr_text != transcript.strip(): | |
| transcript_source = "hybrid" | |
| except Exception as exc: | |
| print(f"[app] ASR transcription failed: {type(exc).__name__}: {exc}") | |
| asr_metadata = { | |
| "model": None, | |
| "language": language, | |
| "status": "error", | |
| "error": f"{type(exc).__name__}: {exc}", | |
| } | |
| if not transcript or not transcript.strip(): | |
| # Surface the actual error instead of a generic message | |
| hint = "" | |
| if asr_metadata: | |
| err = asr_metadata.get("error") or "" | |
| status = asr_metadata.get("status") or "" | |
| if status == "error" and err: | |
| hint = f"\n\n**ASR error:** {err}" | |
| elif status == "skipped": | |
| hint = ( | |
| "\n\n**ASR was skipped** — set `CITYQUEST_SKIP_MODEL=0` " | |
| "and ensure `HF_TOKEN` is configured." | |
| ) | |
| return ( | |
| "⚠️ No transcript available — record audio or type a note first." | |
| + hint | |
| ) | |
| # ── 2. Build journal entry ──────────────────────────────────────── | |
| entry = create_journal_entry( | |
| transcript=transcript, | |
| session_id=session_id, | |
| team_id=team_id, | |
| task_id=task_id or None, | |
| location_note=location_note, | |
| audio_ref=audio_ref, | |
| asr_metadata=asr_metadata, | |
| transcript_source=transcript_source, | |
| ) | |
| # Summarize | |
| summary = summarize_journal(transcript, task_id=task_id or None, location_note=location_note) | |
| entry["moment_summary"] = summary["moment_summary"] | |
| entry["tags"] = summary["tags"] | |
| entry["story_value"] = summary["story_value"] | |
| # Persist | |
| save_journal_entry(entry) | |
| # Log event | |
| ev = log_event(session_id, "journal_recorded", { | |
| "journal_id": entry["journal_id"], | |
| "mood": entry["mood"], | |
| "story_value": summary["story_value"], | |
| "summary": summary["moment_summary"], | |
| "transcript_source": transcript_source, | |
| "asr_status": asr_metadata.get("status") if asr_metadata else None, | |
| "asr_model": asr_metadata.get("model") if asr_metadata else None, | |
| }, team_id=team_id) | |
| SESSION_STORE[session_id]["events"].append(ev) | |
| SESSION_STORE[session_id]["journals"].append(entry) | |
| # Build display text | |
| source_label = { | |
| "asr": "🎙️ (transcribed)", | |
| "hybrid": "🎙️✏️ (transcribed, edited)", | |
| "typed": "⌨️ (typed)", | |
| }.get(transcript_source, transcript_source) | |
| asr_line = "" | |
| if asr_metadata and asr_metadata.get("status") != "ok": | |
| asr_line = f"- ASR status: **{asr_metadata.get('status')}** — {asr_metadata.get('error') or ''}\n" | |
| display = ( | |
| f"🎙️ **Journal recorded!** {source_label}\n" | |
| f"- Mood: *{entry['mood']}*\n" | |
| f"- Story value: **{summary['story_value']}**\n" | |
| f"- Tags: {', '.join(summary['tags'])}\n" | |
| f"{asr_line}" | |
| f"- Summary: {summary['moment_summary']}" | |
| ) | |
| return display | |
| def upload_photo( | |
| session_id: str, | |
| photo_file, | |
| caption: str = "", | |
| task_id: str = "", | |
| team_id: str = "team-a", | |
| ): | |
| """Log a photo upload event and return a confirmation.""" | |
| if session_id not in SESSION_STORE: | |
| return "[!] Unknown session" | |
| photo_name = "" | |
| if photo_file is not None: | |
| # Gradio 4+ returns a filepath string or a PIL image | |
| if isinstance(photo_file, str): | |
| photo_name = photo_file.split("/")[-1].split("\\")[-1] | |
| else: | |
| photo_name = getattr(photo_file, "name", "photo") | |
| photo_id = f"photo-{uuid.uuid4().hex[:8]}" | |
| payload = { | |
| "photo_id": photo_id, | |
| "photo_name": photo_name, | |
| "caption": caption, | |
| "summary": f"Team {team_id} uploaded photo for {task_id or 'general'}", | |
| } | |
| if task_id: | |
| payload["task_id"] = task_id | |
| ev = log_event(session_id, "photo_uploaded", payload, team_id=team_id) | |
| SESSION_STORE[session_id]["events"].append(ev) | |
| # Track photo in session store for recap | |
| if "photos" not in SESSION_STORE[session_id]: | |
| SESSION_STORE[session_id]["photos"] = [] | |
| SESSION_STORE[session_id]["photos"].append({ | |
| "photo_id": photo_id, | |
| "photo_name": photo_name, | |
| "photo_path": photo_file if isinstance(photo_file, str) else "", | |
| "caption": caption, | |
| "task_id": task_id, | |
| "team_id": team_id, | |
| }) | |
| # Build gallery list | |
| photos = SESSION_STORE[session_id]["photos"] | |
| gallery_lines = [f"📸 **{p['photo_id']}** — {p['caption'] or '(no caption)'} [{p['task_id'] or 'general'}]" for p in photos] | |
| display = ( | |
| f"📸 **Photo uploaded!**\n" | |
| f"- ID: `{photo_id}`\n" | |
| f"- Caption: {caption or '(none)'}\n" | |
| f"- Related task: {task_id or 'general'}\n\n" | |
| f"**All photos ({len(photos)}):**\n" + "\n".join(gallery_lines) | |
| ) | |
| return display | |
| def end_game(session_id: str, team_id: str = "team-a"): | |
| """End the game, compute scores, and return a scoreboard.""" | |
| if session_id not in SESSION_STORE: | |
| return "[!] Unknown session" | |
| session = SESSION_STORE[session_id] | |
| game = session["game"] | |
| events = load_events(session_id=session_id) | |
| ev = log_event(session_id, "game_finished", { | |
| "summary": f"Game finished — session {session_id}", | |
| }, team_id=team_id) | |
| events.append(ev) | |
| scores = compute_scores(events, game) | |
| session["scores"] = scores | |
| lines = ["# Final Scoreboard\n"] | |
| for ts in scores.get("team_scores", []): | |
| marker = " [WINNER]" if ts["team_id"] == scores.get("winner") else "" | |
| lines.append(f"### Team: {ts['team_id']}{marker}") | |
| lines.append(f"- **Total points:** {ts['points']}") | |
| lines.append(f"- Tasks completed: {ts['completed_tasks']}/{ts['total_tasks']}") | |
| lines.append(f"- Hints used: {ts['hints_used']}") | |
| if ts.get("bonuses"): | |
| lines.append(f"- Bonuses: {', '.join(ts['bonuses'])}") | |
| lines.append("") | |
| lines.append("**Breakdown:**") | |
| for b in ts.get("scoring_breakdown", []): | |
| lines.append(f" * {b}") | |
| lines.append("") | |
| if scores.get("winner"): | |
| lines.append(f"**Winner: {scores['winner']}**") | |
| return "\n".join(lines) | |
| def generate_recap(session_id: str): | |
| """Generate the final story recap from all collected session data.""" | |
| if session_id not in SESSION_STORE: | |
| return "[!] Unknown session", {} | |
| session = SESSION_STORE[session_id] | |
| game = session["game"] | |
| events = load_events(session_id=session_id) | |
| journals = load_journal_entries(session_id=session_id) | |
| scores = session.get("scores", compute_scores(events, game)) | |
| photos = session.get("photos", []) | |
| # Build story packet | |
| packet = build_story_packet( | |
| game=game, | |
| events=events, | |
| scores=scores, | |
| journal_entries=journals, | |
| photo_captions=photos, | |
| ) | |
| # Generate story | |
| result = generate_story(packet, session_id=session_id) | |
| # Format for display (poster goes in dedicated section below) | |
| lines = [ | |
| "# Episode Recap\n", | |
| result["short_recap"], | |
| "", | |
| result["long_summary"], | |
| "", | |
| ] | |
| # NOTE: poster_prompt is internal — do NOT render to user. | |
| # NOTE: do not duplicate Final Standings here; left column shows it. | |
| # TODO: result["long_summary"] may still contain a "Final standings" | |
| # sub-section produced upstream in generate_story(); if so, strip it | |
| # in the story generator module rather than here. | |
| return "\n".join(lines), result | |
| def generate_poster_from_session(session_id: str): | |
| """Generate a poster image from an existing session's recap data.""" | |
| if session_id not in SESSION_STORE: | |
| return None, "[!] Unknown session - generate a recap first" | |
| session = SESSION_STORE[session_id] | |
| # Check if we have a recap result or can build one | |
| if "poster_url" in session: | |
| return session["poster_url"], "Poster ready from previous generation" | |
| # Build story packet from stored data | |
| game = session.get("game") | |
| if not game: | |
| return None, "[!] No game data in session" | |
| events = load_events(session_id=session_id) | |
| journals = load_journal_entries(session_id=session_id) | |
| scores = session.get("scores", compute_scores(events, game)) | |
| photos = session.get("photos", []) | |
| packet = build_story_packet( | |
| game=game, events=events, scores=scores, | |
| journal_entries=journals, photo_captions=photos, | |
| ) | |
| result = generate_story(packet, session_id=session_id) | |
| poster_url, status = generate_poster_sync( | |
| session_id, | |
| result["poster_prompt"], | |
| photo_paths=[p.get("photo_path", "") for p in photos if p.get("photo_path")], | |
| ) | |
| session["poster_url"] = poster_url | |
| return poster_url, status | |
| # ── Build summary text ──────────────────────────────────────────────────────── | |
| def build_summary(game: dict, state: dict, session_id: str = "") -> str: | |
| """Format a human-readable game summary for the UI.""" | |
| lines = [] | |
| lines.append(f"# 🎮 {game.get('title', 'Untitled')}") | |
| lines.append("") | |
| if session_id: | |
| lines.append(f"> Session `{session_id[:12]}…` — paste this in the **Play** tab to log progress.") | |
| lines.append("") | |
| lines.append("## 📋 Setup") | |
| setup = game.get("setup", {}) | |
| lines.append(f"- **Location:** {setup.get('city', '?')} — {setup.get('area', '?')}") | |
| lines.append(f"- **Meeting point:** {setup.get('meeting_point', '?')}") | |
| lines.append(f"- **Duration:** {setup.get('duration_minutes', '?')} min") | |
| lines.append(f"- **Players:** {setup.get('num_players', '?')}") | |
| lines.append("") | |
| lines.append("## 📜 Rules") | |
| for i, rule in enumerate(game.get("rules", []), 1): | |
| lines.append(f" {i}. {rule}") | |
| lines.append("") | |
| lines.append("## 🎯 Tasks") | |
| for t in game.get("tasks", []): | |
| time_str = f"{t.get('time_limit_minutes', '∞')} min" if t.get("time_limit_minutes") else "No time limit" | |
| lines.append(f" **{t.get('task_id', '?')}:** {t.get('title', '?')}") | |
| lines.append(f" - *{t.get('description', '')[:80]}*") | |
| lines.append(f" - 🏆 {t.get('points', 0)} pts | ⏱ {time_str} | 📸 {t.get('proof_type', '?')}") | |
| lines.append(f" - 💡 {t.get('hint', '')[:70]}") | |
| lines.append(f" - 🛡 {t.get('safety_note', '')[:70]}") | |
| lines.append("") | |
| lines.append("## 💡 Global Hints") | |
| for h in game.get("global_hints", []): | |
| lines.append(f" • {h}") | |
| lines.append("") | |
| lines.append("## 🔒 Safety") | |
| safety = game.get("safety", {}) | |
| lines.append(f"- **Zone:** {safety.get('allowed_zone', '?')}") | |
| lines.append(f"- **Supervision required:** {'Yes' if safety.get('adult_supervision') else 'No'}") | |
| lines.append(f"- **Forbidden:** {', '.join(safety.get('forbidden_behaviors', []))}") | |
| lines.append("") | |
| lines.append("## 📊 Scoring") | |
| for s in game.get("score_rules", []): | |
| lines.append(f" • {s}") | |
| lines.append(f"- **Tie-breaker:** {game.get('tie_breaker', '?')}") | |
| lines.append("") | |
| lines.append("## 📖 Story Seed") | |
| seed = game.get("story_seed", {}) | |
| lines.append(f"- **Tone:** {seed.get('tone', '?')}") | |
| lines.append(f"- **Motifs:** {', '.join(seed.get('motifs', []))}") | |
| lines.append(f"- **Recap style:** {seed.get('recap_style', '?')}") | |
| lines.append("") | |
| # Pipeline trace | |
| lines.append("---") | |
| lines.append("### 🔍 Pipeline trace") | |
| lines.append(f"- Retrieval: {state.get('num_retrieved', 0)} examples found") | |
| if state.get("retrieved_ids"): | |
| lines.append(f"- Retrieved IDs: {', '.join(state['retrieved_ids'])}") | |
| lines.append(f"- Game ID: {state.get('game_id', '?')}") | |
| if state.get("validation_passed"): | |
| lines.append(f"- ✅ Validation passed") | |
| else: | |
| lines.append(f"- ❌ Validation failed ({len(state.get('validation_failures', []))} issues)") | |
| if state.get("repair_applied"): | |
| lines.append(f"- 🔧 Repair applied → {'✅ Passed' if state.get('repair_valid') else '❌ Still has issues'}") | |
| for f in state.get("validation_failures", [])[:5]: | |
| lines.append(f" - {f}") | |
| leftover = state.get("remaining_failures", []) | |
| if leftover: | |
| for f in leftover[:5]: | |
| lines.append(f" - ⚠ {f}") | |
| return "\n".join(lines) | |
| # ══════════════════════════════════════════════════════════════════════════ | |
| # LIQUID-FLOW MULTIPLAYER UI | |
| # ══════════════════════════════════════════════════════════════════════════ | |
| from app.ui.theme import CSS, JS_INIT, gradio_theme | |
| rooms.load_state() # restore live rooms after a restart | |
| ORDER = ["home", "create", "join", "lobby", "play", "wait", "win"] | |
| def vis(target: str): | |
| """Visibility updates for the 7 top-level screen groups.""" | |
| return tuple(gr.update(visible=(name == target)) for name in ORDER) | |
| def _mmss(secs) -> str: | |
| m, s = divmod(max(0, int(secs)), 60) | |
| return f"{m:02d}:{s:02d}" | |
| # ── HTML fragment renderers (polled) ────────────────────────────────────── | |
| def code_html(session_id: str) -> str: | |
| s = SESSION_STORE.get(session_id) or {} | |
| code = s.get("room_code", "——————") | |
| return ( | |
| "<div style='text-align:center'>" | |
| "<div class='cq-code-hint'>Share this code with your friends to join</div>" | |
| f"<div class='cq-code'>{code}</div></div>" | |
| ) | |
| def copy_btn_html(session_id: str) -> str: | |
| """A clipboard button beside the room code. Kept in its own (non-polled) | |
| HTML component so the 'Copied!' state isn't reset by the 1.5s lobby poll. | |
| Pure inline JS — Gradio can't reach the clipboard from Python.""" | |
| s = SESSION_STORE.get(session_id) or {} | |
| code = s.get("room_code", "") | |
| js = ( | |
| "var b=this;navigator.clipboard.writeText('" + code + "').then(function(){" | |
| "b.classList.add('copied');var l=b.querySelector('.cq-copy-label');" | |
| "if(l)l.textContent='✓ Copied!';" | |
| "setTimeout(function(){b.classList.remove('copied');" | |
| "if(l)l.textContent='📋 Copy';},2000);});" | |
| ) | |
| return ( | |
| "<div class='cq-copy-wrap'>" | |
| f"<button type='button' class='cq-copy-btn' onclick=\"{js}\">" | |
| "<span class='cq-copy-label'>📋 Copy</span></button></div>" | |
| ) | |
| def _teams_ready(session_id: str): | |
| """(teams_with_at_least_one_player, total_teams) for the lobby.""" | |
| s = SESSION_STORE.get(session_id) or {} | |
| teams = s.get("teams", []) | |
| have = {p["team_id"] for p in s.get("players", [])} | |
| ready = sum(1 for t in teams if t["id"] in have) | |
| return ready, len(teams) | |
| def roster_html(session_id: str, me: str = "") -> str: | |
| s = SESSION_STORE.get(session_id) or {} | |
| host = s.get("host_name", "") | |
| ready, total = _teams_ready(session_id) | |
| pct = int(round(100 * ready / total)) if total else 0 | |
| parts = ["<div style='display:flex;flex-direction:column;gap:14px'>"] | |
| parts.append( | |
| "<div class='cq-progress'>" | |
| f"<div class='cq-progress-label'>{ready} / {total} teams ready</div>" | |
| "<div class='cq-progress-track'>" | |
| f"<div class='cq-progress-fill' style='width:{pct}%'></div></div></div>" | |
| ) | |
| for t in s.get("teams", []): | |
| members = [p for p in s.get("players", []) if p["team_id"] == t["id"]] | |
| if members: | |
| chips = "".join( | |
| f"<span class='cq-pill{' live' if p['name'] == me else ''}'>" | |
| f"{p['name']}{' ⭐' if p['name'] == host else ''}</span>" | |
| for p in members | |
| ) | |
| else: | |
| chips = "<span class='cq-pill wait'>waiting…</span>" | |
| parts.append( | |
| "<div class='cq-soft'>" | |
| f"<div style=\"font-family:'Fraunces',serif;font-weight:600;color:#FFC107\">" | |
| f"{t['name']} <span style='color:#CCCCCC;font-size:13px'>· {t['role']}</span></div>" | |
| f"<div class='cq-strip' style='margin-top:8px'>{chips}</div></div>" | |
| ) | |
| parts.append("</div>") | |
| return "".join(parts) | |
| def timer_html(session_id: str, team_id: str) -> str: | |
| ph = rooms.team_phase(session_id, team_id) | |
| meta = rooms.team_meta(session_id, team_id) | |
| if ph["phase"] == "waiting": | |
| label, val, sub = "Head start in progress — you launch in", _mmss(ph["seconds_to_start"]), "Sit tight, your quest is about to open" | |
| elif ph["phase"] == "overtime": | |
| label, val, sub = "Time's up — wrap up and finish", "00:00", "" | |
| else: | |
| label, val, sub = "Time remaining", _mmss(ph["seconds_remaining"]), "" | |
| return ( | |
| "<div class='cq-card' style='text-align:center'>" | |
| f"<div style=\"font-family:'Fraunces',serif;font-weight:600;font-size:20px\">" | |
| f"{meta['name']} <span style='color:#CCCCCC;font-size:14px'>· {meta['role']}</span></div>" | |
| f"<div class='section-label' style='justify-content:center;margin-top:10px'>{label}</div>" | |
| f"<div class='cq-timer'>{val}</div>" | |
| f"<div style='color:#CCCCCC;font-size:13px'>{sub}</div></div>" | |
| ) | |
| def status_strip_html(session_id: str, my_team: str = "") -> str: | |
| s = SESSION_STORE.get(session_id) or {} | |
| pills = [] | |
| for t in s.get("teams", []): | |
| tid = t["id"] | |
| if rooms.team_finished(session_id, tid): | |
| cls, txt = "done", f"{t['name']} · finished 🏁" | |
| elif rooms.is_unlocked(session_id, tid): | |
| cls, txt = "live", f"{t['name']} · exploring" | |
| else: | |
| ph = rooms.team_phase(session_id, tid) | |
| cls, txt = "wait", f"{t['name']} · starts in {_mmss(ph['seconds_to_start'])}" | |
| me = " (you)" if tid == my_team else "" | |
| pills.append(f"<span class='cq-pill {cls}'>{txt}{me}</span>") | |
| return "<div class='cq-strip'>" + "".join(pills) + "</div>" | |
| def scoreboard_md(session_id: str) -> str: | |
| s = SESSION_STORE.get(session_id) or {} | |
| game = s.get("game") or {} | |
| events = load_events(session_id=session_id) | |
| scores = compute_scores(events, game) | |
| s["scores"] = scores | |
| winner = scores.get("winner") | |
| ts_sorted = sorted(scores.get("team_scores", []), key=lambda x: x.get("points", 0), reverse=True) | |
| medals = ["🥇", "🥈", "🥉"] | |
| lines = ["# 🏆 Final Standings", ""] | |
| for i, ts in enumerate(ts_sorted): | |
| meta = rooms.team_meta(session_id, ts["team_id"]) | |
| medal = medals[i] if i < 3 else "•" | |
| crown = " — Champion" if ts["team_id"] == winner else "" | |
| lines.append(f"### {medal} {meta['name']}{crown}") | |
| lines.append( | |
| f"- **{ts['points']} points** · {ts['completed_tasks']}/{ts['total_tasks']} tasks · " | |
| f"{ts['hints_used']} hints used" | |
| ) | |
| lines.append("") | |
| return "\n".join(lines) | |
| def _task_id_list(session_id: str) -> list[str]: | |
| s = SESSION_STORE.get(session_id) or {} | |
| return [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]] | |
| def _task_by_id(session_id: str, task_id: str) -> dict: | |
| s = SESSION_STORE.get(session_id) or {} | |
| return next((x for x in (s.get("game") or {}).get("tasks", []) if x["task_id"] == task_id), {}) | |
| def task_detail_md(session_id: str, team_id: str, task_id: str) -> str: | |
| t = _task_by_id(session_id, task_id) | |
| tl = t.get("time_limit_minutes") | |
| tls = f"{tl} min" if tl else "no time limit" | |
| return ( | |
| f"## {t.get('title', 'Task')}\n\n" | |
| f"**🏆 {t.get('points', 0)} pts · ⏱ {tls} · 📸 {t.get('proof_type', 'photo')}**\n\n" | |
| f"{t.get('description', '')}" | |
| ) | |
| def task_hint_html(session_id: str, team_id: str, task_id: str) -> str: | |
| """Hint rendered with its own class so it reads as 'discovery' (amber), | |
| visually distinct from a safety warning.""" | |
| hint = (_task_by_id(session_id, task_id).get("hint") or "").strip() | |
| if not hint: | |
| return "" | |
| return f"<div class='hint-item'>💡 {html.escape(hint)}</div>" | |
| def task_safety_html(session_id: str, team_id: str, task_id: str) -> str: | |
| """Safety flag rendered with a red 'warning' treatment, distinct from a hint.""" | |
| note = (_task_by_id(session_id, task_id).get("safety_note") | |
| or "Stay aware of your surroundings.").strip() | |
| return f"<div class='safety-item'>🛡 {html.escape(note)}</div>" | |
| def task_gallery(session_id: str, team_id: str, task_id: str): | |
| return [ | |
| (p["photo_path"], p.get("caption") or "") | |
| for p in rooms.team_photos(session_id, team_id, task_id) | |
| if p.get("photo_path") | |
| ] | |
| def proof_md(session_id: str, team_id: str, task_id: str) -> str: | |
| n_photos, n_journals = rooms.proof_count(session_id, team_id, task_id) | |
| if n_photos + n_journals > 0: | |
| return f"✅ Proof captured — **{n_photos}** photo(s), **{n_journals}** journal(s). You can complete this task." | |
| return "🔒 Add at least **one photo** or **one voice journal** to unlock **Mark Completed**." | |
| def journal_log_md(session_id: str, team_id: str, task_id: str) -> str: | |
| """List the voice/typed journals already saved for this task so players | |
| can keep adding more recordings.""" | |
| entries = rooms.team_journals(session_id, team_id, task_id) | |
| if not entries: | |
| return "" | |
| lines = [f"**🎙️ Journals for this task ({len(entries)})** — record more anytime:"] | |
| for i, e in enumerate(entries, 1): | |
| txt = (e.get("transcript") or "").strip().replace("\n", " ") | |
| if len(txt) > 90: | |
| txt = txt[:90] + "…" | |
| lines.append(f"{i}. {txt or '*(no text)*'}") | |
| return "\n".join(lines) | |
| def _hint_points_update(session_id: str, team_id: str, task_id: str): | |
| """Live 'available points' for this task: base points minus the per-hint | |
| penalty for every hint this team has revealed, clamped at 0. Turns | |
| orange-red once any hint has been spent.""" | |
| base = _task_by_id(session_id, task_id).get("points", 0) | |
| used = rooms.hints_used_count(session_id, team_id, task_id) | |
| avail = max(0, base - used * HINT_PENALTY) | |
| cls = ["points-display"] + (["reduced"] if used > 0 else []) | |
| return gr.update(value=f"⭐ {avail} pts available (−{HINT_PENALTY} pts per hint)", | |
| elem_classes=cls) | |
| def _hint_btn_state(remaining: int): | |
| """Hint button label/interactivity from the number of hints still allowed.""" | |
| if remaining <= 0: | |
| return gr.update(value="No hints left", interactive=False, | |
| elem_classes=["btn-hint", "spent"]) | |
| return gr.update(value=f"Get a hint −{HINT_PENALTY} pts ({remaining} left)", | |
| interactive=True, elem_classes=["btn-hint"]) | |
| def _hint_btn_update(session_id: str, team_id: str, task_id: str): | |
| used = rooms.hints_used_count(session_id, team_id, task_id) | |
| return _hint_btn_state(max(0, rooms.HINT_CAP - used)) | |
| def hint_log_md(session_id: str, team_id: str, task_id: str) -> str: | |
| """History of hints already revealed for this task so players don't have to | |
| re-buy a hint they've seen — shows the guide's answer for each.""" | |
| hints = rooms.team_hints(session_id, team_id, task_id) | |
| if not hints: | |
| return "" | |
| total = len(hints) * HINT_PENALTY | |
| lines = [f"**Hints used ({len(hints)} of {rooms.HINT_CAP}) — −{total} pts total**"] | |
| for i, p in enumerate(hints, 1): | |
| ans = (p.get("answer") or "").strip().replace("\n", " ") | |
| if len(ans) > 140: | |
| ans = ans[:140] + "…" | |
| lines.append(f"💡 Hint {i}: {ans or '*(no answer captured)*'}") | |
| return "\n\n".join(lines) | |
| def _detail_tab_updates(session_id: str, team_id: str, active_tid: str): | |
| """Updates for the 12 horizontal task-tab buttons in the detail view: each | |
| shows ✅/○ + name + points, the completed ones flagged 'done', and the one | |
| you're viewing flagged 'active'.""" | |
| s = SESSION_STORE.get(session_id) or {} | |
| tasks = (s.get("game") or {}).get("tasks", [])[:12] | |
| wall = rooms.task_wall_state(session_id, team_id) | |
| ups = [] | |
| for i in range(12): | |
| if i < len(tasks): | |
| t = tasks[i] | |
| tid = t["task_id"] | |
| done = wall.get(tid, False) | |
| name = t.get("title", "Task") | |
| if len(name) > 22: | |
| name = name[:21] + "…" | |
| cls = ["task-tab"] | |
| if done: | |
| cls.append("task-tab-done") | |
| if tid == active_tid: | |
| cls.append("task-tab-active") | |
| ups.append(gr.update( | |
| value=f"{'✅' if done else '○'} {name} · {t.get('points', 0)}pts", | |
| visible=True, elem_classes=cls, | |
| )) | |
| else: | |
| ups.append(gr.update(visible=False)) | |
| return ups | |
| def _task_position(session_id: str, active_tid: str) -> str: | |
| s = SESSION_STORE.get(session_id) or {} | |
| ids = [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]] | |
| if not ids: | |
| return "" | |
| idx = ids.index(active_tid) if active_tid in ids else 0 | |
| return f"Task {idx + 1} of {len(ids)}" | |
| def _complete_btn_update(can: bool): | |
| """Locked (grey, disabled) vs unlocked (green, active) Mark Completed button.""" | |
| if can: | |
| return gr.update(interactive=True, value="✅ Mark completed", | |
| elem_classes=["mark-complete-btn", "complete-btn", "unlocked"]) | |
| return gr.update(interactive=False, value="🔒 Mark completed", | |
| elem_classes=["mark-complete-btn", "complete-btn", "locked"]) | |
| def _completion_instruction_update(can: bool): | |
| """The elevated unlock instruction shown just above Mark Completed.""" | |
| if can: | |
| return gr.update(value="✅ Proof added — you can mark this complete!", | |
| elem_classes=["completion-instruction", "unlocked"]) | |
| return gr.update(value="🔒 Add a photo or voice note to unlock completion", | |
| elem_classes=["completion-instruction", "locked"]) | |
| # ── Polling dispatchers (attached to the shared gr.Timer) ───────────────── | |
| # Adaptive polling: stay snappy (FAST) while anything is changing, ease to | |
| # SLOW after a stretch of identical ticks. The countdown timer on the play | |
| # screen changes every second, so play naturally stays FAST; lobby/wait back | |
| # off when idle. Capped at SLOW so screen transitions still arrive within ~3s. | |
| POLL_FAST = 1.5 | |
| POLL_SLOW = 3.0 | |
| POLL_IDLE_LIMIT = 4 # identical ticks before easing off | |
| def poll_tick(session_id, screen, team_id, player_name, is_host, poll_state): | |
| nav = [gr.update()] * 7 | |
| new_screen = screen | |
| lobby_code = lobby_roster = play_timer = play_status = wait_status = win_score = gr.update() | |
| lobby_start = lobby_start_anyway = lobby_hint = gr.update() | |
| wait_line = wait_results_btn = gr.update() | |
| if session_id: | |
| status = rooms.session_status(session_id) | |
| # Canonical team id — never "" — so same-team members converge on one id. | |
| team_id = rooms.resolve_team_id(session_id, player_name) or team_id | |
| if screen == "lobby": | |
| lobby_code = code_html(session_id) | |
| lobby_roster = roster_html(session_id, player_name) | |
| # Start gate (Model B): the primary Start enables only when the whole | |
| # party is here (full AND min_ok). The host-only "Start anyway" | |
| # override covers the short-handed-but-legal case (min_ok, not full). | |
| pcs = rooms.player_count_status(session_id) | |
| full, min_ok = pcs["full"], pcs["min_ok"] | |
| lobby_start = gr.update(interactive=(full and min_ok)) | |
| if is_host: | |
| lobby_start_anyway = gr.update( | |
| visible=(not full), interactive=(min_ok and not full), | |
| value=f"▶ Start anyway ({pcs['joined']}/{pcs['target']} here)") | |
| lobby_hint = lobby_hint_text(session_id, is_host) | |
| if status == "playing": | |
| new_screen = "play" | |
| nav = list(vis("play")) | |
| play_timer = timer_html(session_id, team_id) | |
| play_status = status_strip_html(session_id, team_id) | |
| elif screen == "play": | |
| play_timer = timer_html(session_id, team_id) | |
| play_status = status_strip_html(session_id, team_id) | |
| elif screen == "wait": | |
| # The wait screen NEVER navigates on its own — for any player, under | |
| # any condition. Each tick only UPDATES: the team strip, the | |
| # "who's still playing" line, and the results button's enabled state. | |
| # The wait->win transition happens ONLY when a player clicks the | |
| # "See final results" button (see do_see_results). | |
| wait_status = status_strip_html(session_id, team_id) | |
| wait_line = wait_playing_md(session_id) | |
| pcs = rooms.player_count_status(session_id) | |
| can_results = rooms.all_finished(session_id) and pcs["min_ok"] | |
| wait_results_btn = gr.update(interactive=can_results) | |
| # ── Decide next poll interval from whether anything changed ── | |
| def _s(x): | |
| return x if isinstance(x, str) else "" | |
| sig = "|".join([new_screen, _s(lobby_code), _s(lobby_roster), _s(play_timer), | |
| _s(play_status), _s(wait_status), _s(wait_line), _s(win_score)]) | |
| last_sig, idle, cur_iv = poll_state or ("", 0, POLL_FAST) | |
| if sig != last_sig: | |
| idle, target_iv = 0, POLL_FAST | |
| else: | |
| idle += 1 | |
| target_iv = POLL_SLOW if idle >= POLL_IDLE_LIMIT else POLL_FAST | |
| # Only emit a Timer change when the interval actually flips, so we don't | |
| # reset the running countdown every tick. | |
| timer_update = gr.Timer(target_iv) if target_iv != cur_iv else gr.update() | |
| new_state = (sig, idle, target_iv) | |
| return (*nav, new_screen, lobby_code, lobby_roster, lobby_start, lobby_start_anyway, | |
| lobby_hint, play_timer, play_status, wait_status, wait_line, | |
| wait_results_btn, win_score, timer_update, new_state) | |
| def play_tick(session_id, screen, team_id, taskids, cur_task): | |
| """On the play screen keep the task-id list current and, on first entry | |
| (no task open yet), open the first task in place. This covers both the host | |
| (who clicks Start) and guests (who auto-transition to play on the poll), so | |
| the detail view is always populated without a separate landing screen. | |
| Returns the full detail-open tuple plus the refreshed ``s_taskids``.""" | |
| if screen != "play" or not session_id: | |
| return (gr.update(),) * _N_DETAIL_OUTPUTS + (taskids,) | |
| ids = _task_id_list(session_id) | |
| ids_json = json.dumps(ids) | |
| if not cur_task and ids: | |
| # First time on the play screen for this client: open task 1. | |
| return _open_task(session_id, team_id, ids[0]) + (ids_json,) | |
| # Already viewing a task — leave the detail view untouched (don't reset the | |
| # player's in-progress photo/transcript every tick), just sync the id list. | |
| return (gr.update(),) * _N_DETAIL_OUTPUTS + (ids_json,) | |
| # ── Screen-action handlers ──────────────────────────────────────────────── | |
| # Lobby instruction copy — differs for the host vs. everyone else. The host | |
| # copy shows progress (joined/target); HINT_HOST_WAIT/HINT_HOST_SHORT are | |
| # format strings filled by lobby_hint_text(). | |
| HINT_HOST_WAIT = "⏳ Waiting for players… {joined}/{target} joined" | |
| HINT_HOST_SHORT = "▶ {joined}/{target} here — start now, or wait for the rest." | |
| HINT_HOST_READY = "✅ Everyone's in — start the adventure whenever you're ready." | |
| HINT_GUEST = "⏳ Waiting for the host to start the adventure…" | |
| def lobby_hint_text(session_id: str, is_host: bool) -> str: | |
| """Lobby instruction line. Guests always wait on the host; the host sees | |
| progress and which gate (full vs. structural minimum) they're at.""" | |
| if not is_host: | |
| return HINT_GUEST | |
| pcs = rooms.player_count_status(session_id) | |
| if pcs["full"] and pcs["min_ok"]: | |
| return HINT_HOST_READY | |
| if pcs["min_ok"]: | |
| return HINT_HOST_SHORT.format(joined=pcs["joined"], target=pcs["target"]) | |
| return HINT_HOST_WAIT.format(joined=pcs["joined"], target=pcs["target"]) | |
| def wait_playing_md(session_id: str) -> str: | |
| """Live 'who's still playing' line for the wait screen — active teams | |
| (>=1 player) that haven't finished yet.""" | |
| active = rooms.active_team_ids(session_id) | |
| still = [tid for tid in active if not rooms.team_finished(session_id, tid)] | |
| if not still: | |
| return "Everyone's finished — see results whenever you're ready." | |
| names = ", ".join(rooms.team_meta(session_id, tid)["name"] for tid in still) | |
| return f"Waiting on: {names}" | |
| def _team_btn_updates(session_id, selected_id): | |
| """Button updates for the 4 team-pick slots: label + visibility, with the | |
| player's current team flagged 'selected' so it reads as chosen (green).""" | |
| teams = rooms.list_teams(session_id) | |
| ups = [] | |
| for i in range(4): | |
| if i < len(teams): | |
| cls = ["team-pick", "selected"] if teams[i]["id"] == selected_id else ["team-pick"] | |
| ups.append(gr.update(value=f"{teams[i]['name']} · {teams[i]['role']}", | |
| visible=True, elem_classes=cls)) | |
| else: | |
| ups.append(gr.update(visible=False)) | |
| return ups | |
| def do_create(name, gtype, city, area, duration, teams_n, head_start, | |
| difficulty, age_group, energy, players_n): | |
| name = (name or "Host").strip() or "Host" | |
| # Defense-in-depth: the slider min is 2, but guard here too so a game can | |
| # never be created with a single team (build_teams also floors at 2). | |
| teams_n = int(teams_n) | |
| if teams_n < 2: | |
| gr.Warning("Minimum 2 teams required — using 2.") | |
| teams_n = 2 | |
| session_id = str(uuid.uuid4()) | |
| config = { | |
| "game_type": gtype, "city": city or "Paris", "area": area or "Downtown", | |
| "location_type": "mixed", "duration_minutes": int(duration), | |
| "num_players": int(players_n), "num_teams": int(teams_n), | |
| "head_start_seconds": int(head_start), "difficulty": difficulty, | |
| "age_group": age_group, "energy_level": energy, "photo_enabled": True, | |
| "landscape_tags": [], "theme": "", "mobility": "standard", "allow_transport": False, | |
| } | |
| rooms.create_room(name, config, session_id) | |
| teams = rooms.list_teams(session_id) | |
| tb = _team_btn_updates(session_id, teams[0]["id"]) | |
| # Host's s_team is their real team id (teams[0] — create_room files the host | |
| # under it too), NOT "", so host proof/journal/finish state and final | |
| # standings all key off the same id as every other same-team member. | |
| return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], True, | |
| code_html(session_id), roster_html(session_id, name), | |
| gr.update(visible=True, interactive=False), | |
| copy_btn_html(session_id), lobby_hint_text(session_id, True), *tb) | |
| def do_join(code, name): | |
| session_id, msg, ok = rooms.join_room(code, name) | |
| if not ok: | |
| gr.Warning(msg) | |
| return (*vis("join"), "join", "", "", "", False, f"⚠ {msg}", | |
| gr.update(), gr.update(), gr.update(visible=False), | |
| gr.update(), gr.update(), | |
| *([gr.update(visible=False)] * 4)) | |
| gr.Info(f"Joined room — {msg}") | |
| name = (name or "").strip() | |
| teams = rooms.list_teams(session_id) | |
| tb = _team_btn_updates(session_id, teams[0]["id"]) | |
| return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], False, f"✅ {msg}", | |
| code_html(session_id), roster_html(session_id, name), | |
| gr.update(visible=False), | |
| copy_btn_html(session_id), HINT_GUEST, *tb) | |
| def make_pick(idx): | |
| def _pick(session_id, player): | |
| teams = rooms.list_teams(session_id) | |
| if idx >= len(teams): | |
| return (gr.update(), gr.update(), *([gr.update()] * 4)) | |
| tid = teams[idx]["id"] | |
| rooms.pick_team(session_id, player, tid) | |
| return (tid, roster_html(session_id, player), | |
| *_team_btn_updates(session_id, tid)) | |
| return _pick | |
| def prepare_game(session_id): | |
| """Generate the game during the lobby wait so Start is instant. | |
| Triggered right after the host creates the room. Config is fully known at | |
| creation time (teams/players don't affect Nemotron), so we download + | |
| generate here, hidden behind the lobby. Runs in a normal Gradio event | |
| (request context) so the inner @spaces.GPU generation gets a real GPU. | |
| """ | |
| if not session_id: | |
| yield "" | |
| return | |
| s = SESSION_STORE.get(session_id) | |
| if not s: | |
| yield "" | |
| return | |
| if s.get("game"): | |
| yield "<div class='cq-ready'>✓ Quest ready</div>" | |
| return | |
| yield "<div class='cq-loading'>Preparing your quest — warming Nemotron & generating tasks…</div>" | |
| try: | |
| generate_for_session(session_id) | |
| yield "<div class='cq-ready'>✓ Quest ready — start whenever everyone's in</div>" | |
| except Exception as e: | |
| print(f"[prepare_game] {type(e).__name__}: {e}") | |
| yield "<div class='cq-loading'>Couldn't pre-generate — it'll generate when you start.</div>" | |
| def do_start(session_id, team_id=""): | |
| s = SESSION_STORE.get(session_id) or {} | |
| # Render the starter's OWN team strip/timer, not always team A — a host who | |
| # picked a later team shouldn't see Team A's strip. Never empty: fall back | |
| # to the first team (mirrors rooms.resolve_team_id's fallback). | |
| team_id = team_id or (s.get("teams") or [{"id": "team-a"}])[0]["id"] | |
| if not s.get("game"): | |
| # Fallback: lobby pre-generation didn't finish — show a loader. | |
| yield (*vis("lobby"), "lobby", gr.update(), gr.update(), | |
| "<div class='cq-loading'>Summoning your quest…</div>") | |
| generate_for_session(session_id) | |
| rooms.begin_play(session_id, SESSION_STORE[session_id].get("game")) | |
| yield (*vis("play"), "play", timer_html(session_id, team_id), | |
| status_strip_html(session_id, team_id), "") | |
| def do_leave(): | |
| return (*vis("home"), "home", "", "", "", False) | |
| # Number of outputs produced by _open_task / consumed by DETAIL_OPEN_OUTPUTS. | |
| # 17 fixed slots + 12 task-tab buttons. | |
| _N_DETAIL_OUTPUTS = 19 + 12 | |
| def _open_task(session_id, team_id, tid): | |
| """Render the full task-detail view for one task. Shared by the play-screen | |
| entry, the horizontal task tabs, and the prev/next arrows so every entry | |
| point produces an identical, consistent detail screen.""" | |
| # NOTE: tasks are session-global (session["game"]["tasks"]) and are the SAME | |
| # for everyone — never slice/filter them per player. team_id here only keys | |
| # the per-team completion/proof/hint OVERLAY, so same-team members (sharing | |
| # one resolved team_id) see one shared task/proof view. Do not regress this. | |
| can = rooms.can_complete(session_id, team_id, tid) | |
| return ( | |
| tid, | |
| gr.update(visible=True), # detail view (always shown on the play screen) | |
| task_detail_md(session_id, team_id, tid), | |
| task_hint_html(session_id, team_id, tid), | |
| task_safety_html(session_id, team_id, tid), | |
| task_gallery(session_id, team_id, tid), | |
| proof_md(session_id, team_id, tid), | |
| _complete_btn_update(can), | |
| _completion_instruction_update(can), | |
| "", None, "", "", "", # transcript, audio, tagline, ask answer, capture status | |
| journal_log_md(session_id, team_id, tid), | |
| _task_position(session_id, tid), | |
| _hint_points_update(session_id, team_id, tid), | |
| hint_log_md(session_id, team_id, tid), | |
| _hint_btn_update(session_id, team_id, tid), | |
| *_detail_tab_updates(session_id, team_id, tid), | |
| ) | |
| def open_task_factory(idx): | |
| """Open the task at position ``idx`` in the team's task list. Used by both | |
| the wall cards and the horizontal task tabs (both index into s_taskids).""" | |
| def _open(session_id, team_id, taskids): | |
| ids = json.loads(taskids or "[]") | |
| if idx >= len(ids): | |
| return (gr.update(),) * _N_DETAIL_OUTPUTS | |
| return _open_task(session_id, team_id, ids[idx]) | |
| return _open | |
| def go_adjacent_factory(delta): | |
| """Step to the previous (delta=-1) or next (delta=+1) task, clamped to the | |
| list ends, and open it in place.""" | |
| def _go(session_id, team_id, taskids, cur_tid): | |
| ids = json.loads(taskids or "[]") | |
| if not ids: | |
| return (gr.update(),) * _N_DETAIL_OUTPUTS | |
| i = ids.index(cur_tid) if cur_tid in ids else 0 | |
| i = max(0, min(len(ids) - 1, i + delta)) | |
| return _open_task(session_id, team_id, ids[i]) | |
| return _go | |
| def do_transcribe(audio_path, language): | |
| if not audio_path: | |
| yield "No recording yet — tap record above, then transcribe." | |
| return | |
| yield "🌀 Transcribing your note…" | |
| text = transcribe_only(audio_path, language or "en") | |
| yield text or "(No speech detected — pick the right spoken language, or type your note.)" | |
| def do_save_journal(session_id, transcript, audio_path, language, task_id, team_id): | |
| # 7 outputs: capture status, proof, complete-btn, completion instruction, | |
| # journal log, audio, transcript | |
| yield ("🌀 Saving your journal…", gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), gr.update()) | |
| msg = record_journal(session_id, transcript=transcript or "", task_id=task_id, | |
| team_id=team_id, audio_path=audio_path or "", language=language or "en") | |
| if msg.lstrip().startswith("⚠"): | |
| gr.Warning("Couldn't save — record audio or type a note first.") | |
| else: | |
| gr.Info("Journal saved 🎙️") | |
| can = rooms.can_complete(session_id, team_id, task_id) | |
| # Clear the recorder + transcript so the player can immediately record another. | |
| yield (msg, proof_md(session_id, team_id, task_id), _complete_btn_update(can), | |
| _completion_instruction_update(can), | |
| journal_log_md(session_id, team_id, task_id), None, "") | |
| def do_add_photo(session_id, image, tagline, task_id, team_id): | |
| # 7 outputs: capture status, gallery, proof, complete-btn, completion | |
| # instruction, photo, tagline | |
| if image is None: | |
| gr.Warning("Choose a photo first.") | |
| return ("Choose a photo first.", task_gallery(session_id, team_id, task_id), | |
| proof_md(session_id, team_id, task_id), gr.update(), gr.update(), image, tagline) | |
| upload_photo(session_id, image, caption=tagline or "", task_id=task_id, team_id=team_id) | |
| can = rooms.can_complete(session_id, team_id, task_id) | |
| gr.Info("Photo added 📸") | |
| return ("📸 Photo added with its tagline.", task_gallery(session_id, team_id, task_id), | |
| proof_md(session_id, team_id, task_id), _complete_btn_update(can), | |
| _completion_instruction_update(can), None, "") | |
| def _ask_gpu(question, session_id, team_id, task_id): | |
| s = SESSION_STORE.get(session_id) or {} | |
| game = s.get("game") or {} | |
| t = next((x for x in game.get("tasks", []) if x["task_id"] == task_id), {}) | |
| city = game.get("setup", {}).get("city", "") | |
| ctx = f"City: {city}. Task: {t.get('title','')} — {t.get('description','')}. Hint: {t.get('hint','')}" | |
| ans = None | |
| try: | |
| from app.services import minicpm | |
| ans = minicpm.ask(question, context=ctx) | |
| except Exception as e: | |
| print(f"[ask] failed: {e}") | |
| if not ans: | |
| ans = "*(The guide is resting — re-read the hint and scan for the named landmark. Try again in a moment.)*" | |
| return ans | |
| def on_get_hint(question, session_id, team_id, task_id): | |
| """Reveal a hint from the in-game guide and charge the team for it. | |
| Outputs (4): guide answer, available-points display, hint button, hint log. | |
| Points are only charged once a real answer is produced — an empty question | |
| or a spent task costs nothing. | |
| """ | |
| used = rooms.hints_used_count(session_id, team_id, task_id) | |
| remaining = max(0, rooms.HINT_CAP - used) | |
| if remaining <= 0: | |
| gr.Warning("No hints left for this task.") | |
| yield (gr.update(value="You've used all the hints for this task — check the hint history below."), | |
| gr.update(), _hint_btn_state(0), gr.update()) | |
| return | |
| if not (question or "").strip(): | |
| yield ("Tell me where you're stuck, or ask for a clue — then tap the button.", | |
| gr.update(), gr.update(), gr.update()) | |
| return | |
| # Thinking: disable the button so the hint can't be double-charged; don't | |
| # deduct until we actually have an answer. | |
| yield ("🌀 The guide is thinking…", gr.update(), | |
| gr.update(value="Thinking…", interactive=False), gr.update()) | |
| answer = _ask_gpu(question, session_id, team_id, task_id) | |
| used_now = use_hint(session_id, task_id, team_id, question=question, answer=answer) | |
| remaining_now = max(0, rooms.HINT_CAP - used_now) | |
| gr.Info(f"−{HINT_PENALTY} pts · Hint used · {remaining_now} hint(s) left") | |
| yield (f"💡 {answer}", | |
| _hint_points_update(session_id, team_id, task_id), | |
| _hint_btn_state(remaining_now), | |
| hint_log_md(session_id, team_id, task_id)) | |
| def do_complete(session_id, team_id, task_id): | |
| if rooms.can_complete(session_id, team_id, task_id): | |
| complete_task(session_id, task_id, team_id) | |
| gr.Info("Task completed — nice work! 🎉") | |
| # Re-render the same task so its tab shows ✓ and the completed state | |
| # is reflected in place (there's no wall to return to). | |
| return _open_task(session_id, team_id, task_id) | |
| gr.Warning("Add a photo or voice journal first to complete this task.") | |
| return (gr.update(),) * _N_DETAIL_OUTPUTS | |
| def do_finish(session_id, team_id): | |
| log_event(session_id, "game_finished", {"summary": f"{team_id} finished"}, team_id=team_id) | |
| rooms.finish_team(session_id, team_id) | |
| return (*vis("wait"), "wait", status_strip_html(session_id, team_id)) | |
| def do_see_results(session_id): | |
| """Manual wait->win advance — the ONLY path to the win screen. Fired by the | |
| 'See final results' button (gated/enabled by poll_tick), never automatic.""" | |
| return (*vis("win"), "win", scoreboard_md(session_id)) | |
| def _gather_moments(session_id, team_id): | |
| bits = [j.get("transcript", "") for j in rooms.team_journals(session_id, team_id)] | |
| bits += [p.get("caption", "") for p in rooms.team_photos(session_id, team_id)] | |
| return [b for b in bits if b][:12] | |
| def _funny_recap_gpu(moments): | |
| prompt = ( | |
| "Our playful city adventure moments: " + "; ".join(moments) + | |
| ". Write a short, funny, warm recap (4-6 sentences). Keep it light and silly. " | |
| "Do NOT mention winning, losing, scores, or rankings." | |
| ) | |
| ans = None | |
| try: | |
| from app.services import minicpm | |
| ans = minicpm.ask(prompt, context="", max_tokens=320) | |
| except Exception as e: | |
| print(f"[funny_recap] {e}") | |
| if not ans: | |
| ans = "### A few ridiculous highlights\n\n- " + "\n- ".join(moments) | |
| return ans | |
| def _stream_text(full: str): | |
| """Yield a string progressively for a narrative reveal (UI pacing only). | |
| Reveals paragraph-by-paragraph, falling back to sentence chunks for a | |
| single long block, accumulating so the Markdown stays well-formed at every | |
| step. The text itself is produced by the backend and is never altered. | |
| """ | |
| full = (full or "").strip() | |
| if not full: | |
| yield full | |
| return | |
| paras = [p for p in full.split("\n\n") if p.strip()] | |
| if len(paras) <= 1: | |
| # One block — chunk by sentence so long recaps still feel alive. | |
| import re | |
| paras = re.findall(r"[^.!?]+[.!?]*\s*", full) or [full] | |
| sep = "" | |
| else: | |
| sep = "\n\n" | |
| acc = "" | |
| for i, chunk in enumerate(paras): | |
| acc += (sep if i else "") + chunk | |
| yield acc | |
| time.sleep(0.12) | |
| def do_funny_recap(session_id, team_id): | |
| moments = _gather_moments(session_id, team_id) | |
| if not moments: | |
| yield "_Snap a photo or record a voice note first — then I'll spin a silly recap!_" | |
| return | |
| yield "🌀 Spinning a silly recap…" | |
| yield from _stream_text(_funny_recap_gpu(moments)) | |
| def _funny_post_gpu(session_id, team_id): | |
| paths = [p["photo_path"] for p in rooms.team_photos(session_id, team_id) if p.get("photo_path")] | |
| prompt = ("A whimsical, funny travel scrapbook poster, warm pastel beige tones, playful " | |
| "hand-drawn doodles, lighthearted city adventure, cozy and silly, no text") | |
| return generate_poster_sync(f"{session_id}-funny", prompt, photo_paths=paths) | |
| def do_funny_post(session_id, team_id): | |
| yield None, "🎨 Painting your funny poster…" | |
| yield _funny_post_gpu(session_id, team_id) | |
| def _win_recap_gpu(session_id): | |
| md, _ = generate_recap(session_id) | |
| return md | |
| def do_win_recap(session_id): | |
| yield "🌀 Writing your winning recap…" | |
| yield from _stream_text(_win_recap_gpu(session_id)) | |
| def _win_poster_gpu(session_id, kind): | |
| s = SESSION_STORE.get(session_id) or {} | |
| paths = [p["photo_path"] for p in s.get("photos", []) if p.get("photo_path")] | |
| prompts = { | |
| "ending": ("A triumphant golden-hour ending poster for a city adventure, warm beige and " | |
| "teal tones, celebratory and cinematic, no text"), | |
| "group": ("A joyful group celebration scrapbook poster, warm pastel beige tones, friends " | |
| "adventuring together, confetti, no text"), | |
| } | |
| return generate_poster_sync(f"{session_id}-{kind}", prompts.get(kind, prompts["ending"]), photo_paths=paths) | |
| def do_win_endpost(session_id): | |
| yield None, "🎨 Painting your ending poster…" | |
| yield _win_poster_gpu(session_id, "ending") | |
| def do_win_grouppost(session_id): | |
| yield None, "🎨 Painting your group poster…" | |
| yield _win_poster_gpu(session_id, "group") | |
| def do_new(): | |
| return (*vis("home"), "home", "", "", "", False) | |
| # ── Live form validation (pure UI, no backend calls) ────────────────────── | |
| # Mirror the room-code charset/length from rooms so the live hint matches what | |
| # join_room() will accept (it upper-cases + strips before checking). | |
| _CODE_LEN = 6 | |
| _CODE_CHARS = getattr(rooms, "_CODE_ALPHABET", "ABCDEFGHJKMNPQRSTUVWXYZ23456789") | |
| def validate_join_code(code: str, name: str): | |
| """Live feedback for the join form: code counter/validity + submit gating. | |
| Returns (info_html, submit_update). Purely presentational — never calls | |
| the backend; join_room() still does the authoritative check on submit. | |
| """ | |
| raw = (code or "").strip().upper() | |
| n = len(raw) | |
| bad = [c for c in raw if c not in _CODE_CHARS] | |
| has_name = bool((name or "").strip()) | |
| if n == 0: | |
| pill = "<span class='cq-pill wait'>enter your 6-character code</span>" | |
| elif n < _CODE_LEN: | |
| pill = f"<span class='cq-pill wait'>{n}/{_CODE_LEN} characters</span>" | |
| elif n > _CODE_LEN or bad: | |
| detail = "too long" if n > _CODE_LEN else f"unexpected: {' '.join(sorted(set(bad)))}" | |
| pill = f"<span class='cq-pill alert'>{n}/{_CODE_LEN} · {detail}</span>" | |
| else: | |
| pill = f"<span class='cq-pill live'>✓ {n}/{_CODE_LEN} · looks good</span>" | |
| name_pill = "" if has_name else "<span class='cq-pill wait'>add your name</span>" | |
| info = f"<div class='cq-strip' style='margin:4px 0 2px'>{pill}{name_pill}</div>" | |
| valid = (n == _CODE_LEN and not bad and has_name) | |
| return info, gr.update(interactive=valid) | |
| # Button busy-state helpers for .then() chaining around LLM/IO calls. | |
| # Disabling during the call prevents double-submits and gives a visual cue; | |
| # the in-component text loaders ("🌀 …") still communicate progress. | |
| def _btn_off(): | |
| return gr.update(interactive=False) | |
| def _btn_on(): | |
| return gr.update(interactive=True) | |
| _GTYPE_LABELS = { | |
| "scavenger_hunt": "Scavenger hunt", | |
| "hide_and_seek": "Hide & seek", | |
| "tag": "Tag", | |
| } | |
| def create_summary(name, gtype, city, area, teams, duration, players): | |
| """Live one-line recap of the create form. Pure string formatting.""" | |
| bits = [ | |
| _GTYPE_LABELS.get(gtype, gtype), | |
| (city or "Paris").strip(), | |
| ] | |
| if (area or "").strip(): | |
| bits.append(area.strip()) | |
| bits.append(f"{int(teams)} team{'s' if int(teams) != 1 else ''}") | |
| bits.append(f"{int(duration)} min") | |
| bits.append(f"{int(players)} players") | |
| summary = " · ".join(bits) | |
| host = (name or "").strip() | |
| who = f"Hosted by {host} — " if host else "" | |
| return ( | |
| "<div class='cq-soft' style='padding:12px 16px'>" | |
| f"<span style='color:#CCCCCC;font-size:13px'>{who}</span>" | |
| f"<span style='font-weight:600'>{summary}</span></div>" | |
| ) | |
| # ── ASR transcribe-only helper (no save) ────────────────────────────────── | |
| def transcribe_only(audio_path: str, language: str = "en") -> str: | |
| if not audio_path: | |
| return "" | |
| try: | |
| from app.services.asr import transcribe as _t | |
| result = _t(audio_path, language=language or "en") | |
| text = (result.get("transcript") or "").strip() | |
| print(f"[transcribe_only] lang={language} status={result.get('status')} " | |
| f"len={len(text)} preview={text[:80]!r}") | |
| return text | |
| except Exception as e: | |
| print(f"[transcribe_only] {type(e).__name__}: {e}") | |
| return "" | |
| # ══════════════════════════════════════════════════════════════════════════ | |
| # BLOCKS | |
| # ══════════════════════════════════════════════════════════════════════════ | |
| with gr.Blocks(title="CityQuest · AI") as demo: | |
| # per-browser state | |
| s_session = gr.State("") | |
| s_player = gr.State("") | |
| s_team = gr.State("") | |
| s_host = gr.State(False) | |
| s_screen = gr.State("home") | |
| s_task = gr.State("") | |
| s_taskids = gr.State("[]") | |
| s_lang = gr.State("en") # voice-journal ISO code, set on the Create screen | |
| s_poll_state = gr.State(("", 0, POLL_FAST)) # (signature, idle_ticks, interval) | |
| gr.HTML( | |
| "<div style='text-align:center;padding:22px 0 4px'>" | |
| "<span class='cq-title' style='font-size:30px'>🌲 CityQuest<span style='color:#4CAF50'> · AI</span></span>" | |
| "<div style='color:#CCCCCC;font-size:14px;margin-top:4px'>explore your city, the unhurried way</div></div>" | |
| ) | |
| # ── SCREEN: HOME ────────────────────────────────────────────────────── | |
| with gr.Group(visible=True, elem_classes=["cq-screen", "cq-home"]) as g_home: | |
| gr.HTML("<div style='max-width:680px;margin:24px auto 6px;text-align:center'>" | |
| "<div class='cq-title' style='font-size:clamp(2rem,5vw,3rem)'>A calm, playful city adventure</div>" | |
| "</div>") | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Group(elem_classes=["cq-card", "cq-home-card"]): | |
| gr.Markdown("Host a new adventure and invite your friends.", elem_classes=["landing-description"]) | |
| home_create = gr.Button("Create a room", variant="primary", size="lg", elem_id="home-create", elem_classes=["landing-btn"]) | |
| with gr.Column(): | |
| with gr.Group(elem_classes=["cq-card", "cq-home-card"]): | |
| gr.Markdown("Got a code from a friend? Hop in.", elem_classes=["landing-description"]) | |
| home_join = gr.Button("Join a room", variant="secondary", size="lg", elem_id="home-join", elem_classes=["landing-btn"]) | |
| # ── SCREEN: CREATE ──────────────────────────────────────────────────── | |
| with gr.Group(visible=False, elem_classes=["cq-screen", "cq-create"]) as g_create: | |
| with gr.Column(elem_classes=["cq-card"]): | |
| gr.Markdown("## Host a new adventure") | |
| c_name = gr.Textbox(label="What to call you?", placeholder="e.g. Arjun", value="") | |
| with gr.Row(): | |
| c_type = gr.Dropdown(["scavenger_hunt", "hide_and_seek", "tag"], | |
| value="scavenger_hunt", label="Game type", | |
| filterable=False, elem_classes=["cq-dropdown"]) | |
| c_teams = gr.Slider(2, 4, value=2, step=1, label="Teams (min 2)") | |
| with gr.Row(): | |
| c_city = gr.Textbox(label="City", placeholder="Paris") | |
| c_area = gr.Textbox(label="Area / neighbourhood", placeholder="Le Marais") | |
| with gr.Row(): | |
| c_duration = gr.Slider(15, 120, value=45, step=5, label="Duration (min)") | |
| # Starts at 0 / hidden because the default game type is | |
| # scavenger_hunt (no head start). toggle_head_start raises it to | |
| # 60 and reveals it when a staggered-start game type is picked. | |
| c_headstart = gr.Slider(0, 300, value=0, step=15, | |
| label="Head start between teams (sec)", | |
| visible=False) | |
| with gr.Row(): | |
| c_diff = gr.Dropdown(["easy", "medium", "hard"], value="medium", label="Difficulty", | |
| elem_classes=["cq-dropdown"]) | |
| c_age = gr.Dropdown(["kids", "teens", "adults", "all"], value="all", label="Age group", | |
| elem_classes=["cq-dropdown"]) | |
| with gr.Row(): | |
| c_energy = gr.Dropdown(["chill", "balanced", "high"], value="balanced", label="Energy", | |
| elem_classes=["cq-dropdown"]) | |
| c_players = gr.Slider(2, 20, value=4, step=1, label="Players") | |
| with gr.Row(): | |
| c_language = gr.Dropdown(LANGUAGE_CHOICES, value="English", | |
| label="Voice journal language", | |
| filterable=False, elem_classes=["cq-dropdown"]) | |
| create_summary_html = gr.HTML("", elem_classes=["game-summary"]) | |
| with gr.Row(): | |
| create_back = gr.Button("← Back", variant="secondary", elem_id="create-back", | |
| elem_classes=["back-btn", "create-btn"]) | |
| create_submit = gr.Button("Open the lobby →", variant="primary", elem_id="create-submit", | |
| elem_classes=["open-lobby-btn", "create-btn"]) | |
| # ── SCREEN: JOIN ────────────────────────────────────────────────────── | |
| with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_join: | |
| with gr.Column(elem_classes=["cq-card"]): | |
| gr.Markdown("## Join a room") | |
| j_code = gr.Textbox(label="Room code", placeholder="e.g. WAVE42") | |
| join_code_info = gr.HTML("") | |
| j_name = gr.Textbox(label="Your name", placeholder="e.g. Maya") | |
| join_feedback = gr.Markdown("") | |
| with gr.Row(): | |
| join_back = gr.Button("← Back", variant="secondary", elem_id="join-back") | |
| join_submit = gr.Button("Join →", variant="primary", | |
| interactive=False, elem_id="join-submit") | |
| # ── SCREEN: LOBBY ───────────────────────────────────────────────────── | |
| with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_lobby: | |
| with gr.Column(elem_classes=["cq-card"]): | |
| lobby_code = gr.HTML("") | |
| lobby_copy = gr.HTML("") | |
| gr.HTML("<div class='section-label'>Pick your team</div>") | |
| with gr.Row(): | |
| team_btn = [gr.Button("", variant="secondary", visible=False, | |
| elem_classes=["team-pick"]) for _ in range(4)] | |
| gr.HTML("<div class='section-label' style='margin-top:18px'>Who's here</div>") | |
| lobby_roster = gr.HTML("") | |
| lobby_prep = gr.HTML("", elem_classes=["lobby-prep"]) | |
| lobby_hint = gr.Markdown(HINT_GUEST, elem_classes=["lobby-hint"]) | |
| with gr.Row(): | |
| lobby_leave = gr.Button("Leave", variant="secondary", elem_id="lobby-leave") | |
| lobby_start = gr.Button("Start the adventure →", variant="primary", | |
| visible=False, interactive=False, | |
| elem_id="lobby-start", elem_classes=["lobby-start-btn"]) | |
| # Host-only short-handed override, sitting just below Start as a green | |
| # underlined text link (not a button). Hidden until poll_tick reveals | |
| # it (min_ok and not full); the normal Start covers the full case. | |
| lobby_start_anyway = gr.Button("▶ Start anyway", variant="secondary", | |
| visible=False, interactive=False, | |
| elem_id="lobby-start-anyway", | |
| elem_classes=["lobby-start-link"]) | |
| # ── SCREEN: PLAY ────────────────────────────────────────────────────── | |
| with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_play: | |
| play_timer = gr.HTML("") | |
| gr.HTML("<div class='section-label' style='margin-top:14px'>Teams</div>") | |
| play_status = gr.HTML("") | |
| # The play screen lands directly in the task detail; the horizontal task | |
| # tabs below are the only task switcher (the old vertical task wall was | |
| with gr.Group(visible=True, elem_classes=["cq-reveal"]) as g_detail: | |
| # Wrap everything in a master row with spacers to align the ENTIRE page | |
| with gr.Row(): | |
| # 1. Left global spacer | |
| gr.Column(scale=1, min_width=0) | |
| # 2. Main Central Column (Aligns Tabs, Text, and Boxes perfectly) | |
| with gr.Column(scale=10, elem_classes=["play-card"]): | |
| # --- Tabs & Nav --- | |
| with gr.Row(elem_classes=["task-tab-row"]): | |
| detail_tab_btn = [gr.Button("", visible=False, size="sm", | |
| elem_classes=["task-tab"]) for _ in range(12)] | |
| with gr.Row(elem_classes=["task-nav-row"]): | |
| prev_task_btn = gr.Button("← Prev", size="sm", scale=0, | |
| elem_id="td-prev", elem_classes=["task-nav-btn"]) | |
| task_position_label = gr.Markdown("", elem_classes=["task-position"]) | |
| next_task_btn = gr.Button("Next →", size="sm", scale=0, | |
| elem_id="td-next", elem_classes=["task-nav-btn"]) | |
| # --- Text Content --- | |
| detail_md = gr.Markdown("") | |
| detail_hint = gr.HTML("") | |
| detail_safety = gr.HTML("") | |
| # --- Voice & Photos --- | |
| with gr.Row(equal_height=True, elem_classes=["panels-row"]): | |
| with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]): | |
| gr.HTML("<div class='section-label panel-header'>🎙️ Voice journal</div>") | |
| d_audio = gr.Audio(sources=["microphone", "upload"], type="filepath", | |
| format="wav", show_label=False, | |
| elem_classes=["cq-audio"], | |
| waveform_options=gr.WaveformOptions( | |
| show_recording_waveform=False)) | |
| d_transcript = gr.Textbox(label="Your notes", lines=3, | |
| placeholder="Transcribe your audio, or type a note…") | |
| with gr.Row(): | |
| d_transcribe = gr.Button("Transcribe", variant="secondary", | |
| elem_id="td-transcribe", | |
| elem_classes=["btn-secondary-action"]) | |
| d_savejournal = gr.Button("Save journal", variant="primary", | |
| elem_id="td-savejournal", | |
| elem_classes=["btn-primary-action"]) | |
| d_journal_log = gr.Markdown("") | |
| with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]): | |
| gr.HTML("<div class='section-label panel-header'>📸 Photos & taglines</div>") | |
| d_photo = gr.Image(type="filepath", show_label=False, height=170) | |
| d_tagline = gr.Textbox(label="Caption", placeholder="A caption for this photo", lines=3) | |
| with gr.Row(): | |
| d_addphoto = gr.Button("Add photo", variant="primary", | |
| elem_id="td-addphoto", | |
| elem_classes=["btn-primary-action"]) | |
| d_gallery = gr.Gallery(label="Photos for this task", columns=3, height=170) | |
| # --- Footer Actions --- | |
| d_capture_status = gr.Markdown("") | |
| with gr.Row(elem_classes=["hints-header"]): | |
| gr.Markdown("💡 Stuck? Take a hint..", elem_classes=["guide-header", "hints-label"]) | |
| d_points = gr.Markdown("", elem_classes=["points-display"]) | |
| with gr.Row(elem_classes=["hints-input-row"]): | |
| d_ask_q = gr.Textbox(show_label=False, scale=1, | |
| placeholder="Ask for a clue, or tell me where you're stuck…", | |
| elem_classes=["hint-input"]) | |
| d_ask_btn = gr.Button(f"Get a hint −{HINT_PENALTY} pts", scale=0, | |
| min_width=220, | |
| elem_id="td-ask", elem_classes=["btn-hint"]) | |
| d_ask_a = gr.Markdown("") | |
| d_hint_log = gr.Markdown("") | |
| d_proof = gr.Markdown("") | |
| completion_instruction = gr.Markdown( | |
| "🔒 Add a photo or voice note to unlock completion", | |
| elem_classes=["completion-instruction", "locked"]) | |
| d_complete = gr.Button("🔒 Mark completed", variant="primary", | |
| interactive=False, elem_id="td-complete", | |
| elem_classes=["mark-complete-btn", "complete-btn", "locked"]) | |
| d_finish = gr.Button("🏁 Finish & wait for others", variant="primary", | |
| elem_id="td-finish", elem_classes=["finish-btn"]) | |
| # 3. Right global spacer | |
| gr.Column(scale=1, min_width=0) | |
| # ── SCREEN: WAIT-LOBBY ──────────────────────────────────────────────── | |
| with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_wait: | |
| with gr.Column(elem_classes=["cq-card"]): | |
| gr.Markdown("## 🌿 You're done — nicely paced!") | |
| gr.Markdown("*Waiting for the other teams to wrap up. Meanwhile, make something silly:*") | |
| wait_status = gr.HTML("") | |
| # Live "who's still playing" line + manual advance to results. The | |
| # button stays DISABLED until the results gate passes (all active | |
| # teams finished AND min_ok); poll_tick flips its enabled state. The | |
| # wait screen NEVER auto-navigates — this click is the only path. | |
| wait_status_line = gr.Markdown("", elem_classes=["lobby-hint"]) | |
| wait_see_results = gr.Button("🏆 See final results", variant="primary", | |
| interactive=False, elem_id="wait-see-results", | |
| elem_classes=["lobby-start-btn"]) | |
| with gr.Row(): | |
| wait_funny_recap = gr.Button("😄 Funny recap", variant="secondary", elem_id="wait-funny-recap") | |
| wait_funny_post = gr.Button("🎨 Funny poster", variant="secondary", elem_id="wait-funny-post") | |
| wait_recap_md = gr.Markdown("") | |
| wait_post_img = gr.Image(label="Funny poster", height=320, visible=True, elem_classes=["cq-poster"]) | |
| wait_post_status = gr.Markdown("") | |
| # ── SCREEN: WIN ─────────────────────────────────────────────────────── | |
| with gr.Group(visible=False, elem_classes=["cq-screen", "cq-win"]) as g_win: | |
| with gr.Column(elem_classes=["cq-win-standings"]): | |
| win_scoreboard = gr.Markdown("Final standings will appear here.") | |
| with gr.Group(elem_classes=["cq-card", "cq-win-celebrate"]): | |
| gr.Markdown("### 🎉 Celebrate the run") | |
| win_recap = gr.Button("📖 Winning recap", variant="primary", elem_id="win-recap", elem_classes=["cq-win-btn"]) | |
| win_recap_md = gr.Markdown("") | |
| with gr.Row(): | |
| win_endpost = gr.Button("🌅 Ending poster", variant="secondary", elem_id="win-endpost", elem_classes=["cq-win-btn"]) | |
| win_grouppost = gr.Button("👯 Group poster", variant="secondary", elem_id="win-grouppost", elem_classes=["cq-win-btn"]) | |
| win_poster_img = gr.Image(label="Poster", height=320, visible=True) | |
| win_poster_status = gr.Markdown("") | |
| win_new = gr.Button("✨ Start a new adventure", variant="primary", elem_id="win-new", elem_classes=["cq-win-btn", "cq-win-new"]) | |
| # ── Shared poll timer ───────────────────────────────────────────────── | |
| poll = gr.Timer(1.5) | |
| nav_groups = [g_home, g_create, g_join, g_lobby, g_play, g_wait, g_win] | |
| poll.tick(poll_tick, inputs=[s_session, s_screen, s_team, s_player, s_host, s_poll_state], | |
| outputs=nav_groups + [s_screen, lobby_code, lobby_roster, lobby_start, | |
| lobby_start_anyway, lobby_hint, | |
| play_timer, play_status, wait_status, wait_status_line, | |
| wait_see_results, win_scoreboard, | |
| poll, s_poll_state]) | |
| # ── Navigation ──────────────────────────────────────────────────────── | |
| create_inputs = [c_name, c_type, c_city, c_area, c_teams, c_duration, c_players] | |
| home_create.click(lambda: (*vis("create"), "create"), outputs=nav_groups + [s_screen] | |
| ).then(create_summary, inputs=create_inputs, outputs=[create_summary_html]) | |
| home_join.click(lambda: (*vis("join"), "join"), outputs=nav_groups + [s_screen]) | |
| create_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen]) | |
| join_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen]) | |
| # ── Head start visibility: hide for scavenger hunt (irrelevant there) ── | |
| def toggle_head_start(game_type): | |
| if game_type == "scavenger_hunt": | |
| return gr.update(visible=False, value=0) | |
| return gr.update(visible=True, value=60) | |
| c_type.change(toggle_head_start, inputs=[c_type], outputs=[c_headstart]) | |
| # ── Live form validation (pure UI) ──────────────────────────────────── | |
| for comp in create_inputs: | |
| comp.change(create_summary, inputs=create_inputs, outputs=[create_summary_html]) | |
| for comp in (j_code, j_name): | |
| comp.change(validate_join_code, inputs=[j_code, j_name], | |
| outputs=[join_code_info, join_submit]) | |
| # Route the host's chosen voice-journal language (full name) into the ISO | |
| # code held in s_lang, which the play-screen audio handlers consume. | |
| c_language.change(lambda name: LANGUAGE_TO_ISO.get(name, "en"), | |
| inputs=[c_language], outputs=[s_lang]) | |
| create_submit.click(_btn_off, None, [create_submit]).then( | |
| do_create, | |
| inputs=[c_name, c_type, c_city, c_area, c_duration, c_teams, c_headstart, | |
| c_diff, c_age, c_energy, c_players], | |
| outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host, | |
| lobby_code, lobby_roster, lobby_start, | |
| lobby_copy, lobby_hint] + team_btn, | |
| ).then(prepare_game, inputs=[s_session], outputs=[lobby_prep] | |
| ).then(_btn_on, None, [create_submit]) | |
| join_submit.click( | |
| do_join, inputs=[j_code, j_name], | |
| outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host, | |
| join_feedback, lobby_code, lobby_roster, lobby_start, | |
| lobby_copy, lobby_hint] + team_btn, | |
| ) | |
| for i, tb in enumerate(team_btn): | |
| tb.click(make_pick(i), inputs=[s_session, s_player], | |
| outputs=[s_team, lobby_roster] + team_btn) | |
| lobby_leave.click(do_leave, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host]) | |
| # Both Start and Start-anyway funnel through the SAME do_start path — no | |
| # duplicated start logic; only the gating (in poll_tick) differs. | |
| lobby_start.click(_btn_off, None, [lobby_start]).then( | |
| do_start, inputs=[s_session, s_team], | |
| outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep] | |
| ).then(_btn_on, None, [lobby_start]) | |
| lobby_start_anyway.click(_btn_off, None, [lobby_start_anyway]).then( | |
| do_start, inputs=[s_session, s_team], | |
| outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep] | |
| ).then(_btn_on, None, [lobby_start_anyway]) | |
| # ── Task tabs → detail ──────────────────────────────────────────────── | |
| # Order must match _open_task's return tuple (19 fixed + 12 tab buttons). | |
| detail_open_outputs = [s_task, g_detail, detail_md, detail_hint, detail_safety, | |
| d_gallery, d_proof, d_complete, completion_instruction, | |
| d_transcript, d_audio, d_tagline, d_ask_a, d_capture_status, | |
| d_journal_log, task_position_label, | |
| d_points, d_hint_log, d_ask_btn] + detail_tab_btn | |
| # Keep the task-id list fresh and auto-open the first task when a client | |
| # first reaches the play screen (host via Start, guests via poll). | |
| poll.tick(play_tick, inputs=[s_session, s_screen, s_team, s_taskids, s_task], | |
| outputs=detail_open_outputs + [s_taskids]) | |
| # The in-detail horizontal task tabs index into s_taskids. | |
| for i, dtb in enumerate(detail_tab_btn): | |
| dtb.click(open_task_factory(i), inputs=[s_session, s_team, s_taskids], outputs=detail_open_outputs) | |
| prev_task_btn.click(go_adjacent_factory(-1), inputs=[s_session, s_team, s_taskids, s_task], | |
| outputs=detail_open_outputs) | |
| next_task_btn.click(go_adjacent_factory(+1), inputs=[s_session, s_team, s_taskids, s_task], | |
| outputs=detail_open_outputs) | |
| d_finish.click(do_finish, inputs=[s_session, s_team], | |
| outputs=nav_groups + [s_screen, wait_status]) | |
| d_transcribe.click(_btn_off, None, [d_transcribe]).then( | |
| do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript] | |
| ).then(_btn_on, None, [d_transcribe]) | |
| # Auto-transcribe once the clip is actually on the server. stop_recording | |
| # (mic) and upload (file) fire *after* Gradio has committed/uploaded the | |
| # audio, so do_transcribe always sees a real filepath. This removes the | |
| # race where clicking "Transcribe" immediately after recording sent a | |
| # stale/None path and seemed to do nothing until you clicked around. | |
| d_audio.stop_recording(_btn_off, None, [d_transcribe]).then( | |
| do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript] | |
| ).then(_btn_on, None, [d_transcribe]) | |
| d_audio.upload(_btn_off, None, [d_transcribe]).then( | |
| do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript] | |
| ).then(_btn_on, None, [d_transcribe]) | |
| d_savejournal.click(_btn_off, None, [d_savejournal]).then( | |
| do_save_journal, | |
| inputs=[s_session, d_transcript, d_audio, s_lang, s_task, s_team], | |
| outputs=[d_capture_status, d_proof, d_complete, completion_instruction, | |
| d_journal_log, d_audio, d_transcript] | |
| ).then(_btn_on, None, [d_savejournal]) | |
| d_addphoto.click(do_add_photo, inputs=[s_session, d_photo, d_tagline, s_task, s_team], | |
| outputs=[d_capture_status, d_gallery, d_proof, d_complete, | |
| completion_instruction, d_photo, d_tagline]) | |
| # on_get_hint manages the button itself (disable while thinking, then set | |
| # the "(N left)"/"No hints left" state), so it does NOT use the generic | |
| # _btn_off/_btn_on toggle — that would re-enable a spent button. | |
| d_ask_btn.click( | |
| on_get_hint, inputs=[d_ask_q, s_session, s_team, s_task], | |
| outputs=[d_ask_a, d_points, d_ask_btn, d_hint_log] | |
| ) | |
| # After completing, re-render the same task in place so the ✓ shows on its | |
| # tab and the completed state is reflected (no wall to bounce back to). | |
| d_complete.click(do_complete, inputs=[s_session, s_team, s_task], | |
| outputs=detail_open_outputs) | |
| # ── Wait-lobby actions ──────────────────────────────────────────────── | |
| # Manual, click-only advance to the win screen (gate/enabled by poll_tick). | |
| wait_see_results.click(do_see_results, inputs=[s_session], | |
| outputs=nav_groups + [s_screen, win_scoreboard]) | |
| wait_funny_recap.click(_btn_off, None, [wait_funny_recap]).then( | |
| do_funny_recap, inputs=[s_session, s_team], outputs=[wait_recap_md] | |
| ).then(_btn_on, None, [wait_funny_recap]) | |
| wait_funny_post.click(_btn_off, None, [wait_funny_post]).then( | |
| do_funny_post, inputs=[s_session, s_team], outputs=[wait_post_img, wait_post_status] | |
| ).then(_btn_on, None, [wait_funny_post]) | |
| # ── Winning actions ─────────────────────────────────────────────────── | |
| win_recap.click(_btn_off, None, [win_recap]).then( | |
| do_win_recap, inputs=[s_session], outputs=[win_recap_md] | |
| ).then(_btn_on, None, [win_recap]) | |
| win_endpost.click(_btn_off, None, [win_endpost]).then( | |
| do_win_endpost, inputs=[s_session], outputs=[win_poster_img, win_poster_status] | |
| ).then(_btn_on, None, [win_endpost]) | |
| win_grouppost.click(_btn_off, None, [win_grouppost]).then( | |
| do_win_grouppost, inputs=[s_session], outputs=[win_poster_img, win_poster_status] | |
| ).then(_btn_on, None, [win_grouppost]) | |
| win_new.click(do_new, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host]) | |
| # ── Entry point ──────────────────────────────────────────────────────────── | |
| if __name__ == "__main__": | |
| # ssr_mode=False avoids the Node SSR proxy that emits spurious | |
| # `sse_stream 404` errors with our 1.5s polling timer. | |
| demo.launch(theme=gradio_theme(), css=CSS, js=JS_INIT, ssr_mode=False,share=True) |