cq-test / app.py
BhargavMN
style(ui): adjust layout of win screen components for improved organization
777432c
Raw
History Blame Contribute Delete
96.5 kB
import os
import time
import uuid
import json
import html
from pathlib import Path
import gradio as gr
try:
import spaces # only available on Hugging Face Spaces runtime
except ImportError:
# Provide a dummy so @spaces.GPU(duration=...) is a no-op locally
# while keeping the decorator statically visible in the source code.
class _spaces:
@staticmethod
def GPU(duration=300):
return lambda fn: fn
spaces = _spaces
from app.services.retrieval import load_games_dataset, normalize_game_record, retrieve_examples
from app.services.generator import (
generate_game, generate_game_with_model, build_generation_prompt,
unload_nemotron,
)
from app.services.validator import validate_game, repair_game
from app.services.schema_validator import create_minimal_game_template
from app.services.tracing import log_event, log_generation_trace, load_events
from app.services.journal import (
create_journal_entry, save_journal_entry, summarize_journal,
load_journal_entries, detect_mood, assess_story_value,
)
from app.services.scoring import compute_scores, HINT_PENALTY
from app.services.story import build_story_packet, generate_story
from app.services.image_gen import generate_poster_sync
# ── Voice-journal language options ─────────────────────────────────────────
# Shown on the Create Lobby screen as full names; mapped to the ISO codes the
# ASR service expects before being routed into the play-screen audio handlers.
LANGUAGE_CHOICES = [
"English",
"French",
"Spanish",
"German",
"Italian",
"Portuguese",
"Dutch",
"Japanese",
"Chinese (Mandarin)",
"Arabic",
"Hindi",
"Korean",
"Russian",
"Turkish",
"Polish",
]
LANGUAGE_TO_ISO = {
"English": "en", "French": "fr", "Spanish": "es",
"German": "de", "Italian": "it", "Portuguese": "pt",
"Dutch": "nl", "Japanese": "ja", "Chinese (Mandarin)": "zh",
"Arabic": "ar", "Hindi": "hi", "Korean": "ko",
"Russian": "ru", "Turkish": "tr", "Polish": "pl",
}
# ── Load datasets once on startup ──────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent
DATASET_PATHS = {
"old": BASE_DIR / "app/data/games_dataset.json",
"scavenger_hunt": BASE_DIR / "app/data/scavenger_hunt/dataset.json",
"hide_and_seek": BASE_DIR / "app/data/hide_and_seek/dataset.json",
"tag": BASE_DIR / "app/data/tag/dataset.json",
}
DATA_RECORDS = []
for src, path in DATASET_PATHS.items():
try:
raw = load_games_dataset(str(path))
normalized = [normalize_game_record(r) for r in raw]
DATA_RECORDS.extend(normalized)
print(f"[OK] Loaded {len(normalized)} records from {src} dataset ({path.name})")
except FileNotFoundError:
print(f"[!] Dataset not found at {path}, skipping {src}")
except Exception as e:
print(f"[!] Error loading {src} dataset from {path}: {e}")
print(f"[OK] Total: {len(DATA_RECORDS)} game records loaded for retrieval")
# ── Shared multiplayer state lives in app.services.rooms ───────────────────
# SESSION_STORE / ADVENTURE_CODES are owned by the rooms module so the lobby,
# task wall and wait-lobby all read/write the same in-process truth.
from app.services import rooms
from app.services.rooms import SESSION_STORE, ADVENTURE_CODES, save_state, load_state
# ── HF Spaces Zero GPU generation ──────────────────────────────────────────
# This function is statically decorated so HF Spaces detects the GPU
# requirement at startup. The 2.84 GB GGUF model is lazily downloaded
# on first call inside the GPU context.
@spaces.GPU(duration=300)
def _generate_with_gpu(config: dict, retrieved: list[dict]):
"""Generate game using GPU (Nemotron 3 Nano 4B via llama.cpp).
Runs inside @spaces.GPU so CUDA is available. The model download
and llama.cpp initialisation happen lazily on first call.
"""
import json
prompt = build_generation_prompt(config, retrieved)
json_str = generate_game_with_model(prompt, model_name="nemotron")
if json_str:
try:
game = json.loads(json_str)
if all(field in game for field in ["game_id", "title", "setup", "tasks", "safety"]):
return game
except json.JSONDecodeError:
pass
return None
def _generate_game(config: dict, retrieved: list[dict]):
"""Try GPU generation; fall back to CPU/mock if unavailable."""
if spaces is not None:
game = _generate_with_gpu(config, retrieved)
if game is not None:
return game
print("[app] GPU generation returned None, falling back to CPU/mock")
return generate_game(config, retrieved)
# ── Pipeline entry point ──────────────────────────────────────────────────────
def run_pipeline(
game_type: str,
city: str,
area: str,
location_type: str,
duration_minutes: int,
num_players: int,
difficulty: str,
age_group: str,
energy_level: str,
):
"""Run the full AI generation pipeline end-to-end."""
session_id = str(uuid.uuid4())
config = {
"game_type": game_type,
"city": city or "Paris",
"area": area or "Downtown",
"location_type": location_type,
"duration_minutes": int(duration_minutes),
"num_players": int(num_players),
"difficulty": difficulty,
"age_group": age_group,
"energy_level": energy_level,
"photo_enabled": True,
# New fields for enhanced retrieval scoring
"landscape_tags": [],
"theme": "",
"mobility": "standard",
"allow_transport": False,
}
state = {}
# 1 ── Retrieval
state["num_retrieved"] = 0
if DATA_RECORDS:
retrieved = retrieve_examples(config, DATA_RECORDS, k=3)
state["num_retrieved"] = len(retrieved)
state["retrieved_ids"] = [r["id"] for r in retrieved]
else:
retrieved = []
# 2 ── Generation (GPU if available, CPU/mock fallback otherwise)
game = _generate_game(config, retrieved)
state["game_id"] = game["game_id"]
state["game_title"] = game["title"]
# Free GPU memory — Nemotron is no longer needed after generation.
# This makes room for FLUX poster / Cohere ASR later.
unload_nemotron()
# 3 ── Validation
is_valid, failures = validate_game(game, config)
state["validation_passed"] = is_valid
state["validation_failures"] = failures
# 4 ── Repair (if needed)
repaired = None
if not is_valid:
repaired = repair_game(game, failures, config)
state["repair_applied"] = True
is_valid2, failures2 = validate_game(repaired, config)
state["repair_valid"] = is_valid2
state["remaining_failures"] = failures2
else:
state["repair_applied"] = False
final_game = repaired if repaired is not None else game
# Stamp the selected game type onto the game so it's recorded explicitly
# (the model output / schema don't carry it) and can't drift from config.
final_game["game_type"] = config.get("game_type", "scavenger_hunt")
# 5 ── Log generation trace
log_generation_trace(
session_id=session_id,
config=config,
retrieved_examples=retrieved,
game=final_game,
validation_passed=is_valid or (repaired is not None and state.get("repair_valid", False)),
validation_failures=failures,
repaired_game=repaired,
)
# 6 ── Reveal all tasks as events
for task in final_game.get("tasks", []):
log_event(session_id, "task_revealed", {
"task_id": task["task_id"],
"title": task["title"],
"points": task["points"],
})
# 7 ── Store session
SESSION_STORE[session_id] = {
"config": config,
"game": final_game,
"events": [],
"journals": [],
}
# 8 ── Build summary text
summary = build_summary(final_game, state, session_id)
return summary, session_id
def generate_for_session(session_id: str) -> dict:
"""Generate (and validate/repair) a game for an *existing* lobby session.
Used when the host presses Start: the room/players already exist, so we
only run retrieval → generation → validation → repair, store the game on
the session, and reveal the tasks. Returns the final game dict.
Reuses the same pipeline pieces as :func:`run_pipeline`.
"""
session = SESSION_STORE.get(session_id)
if not session:
raise ValueError("Unknown session")
# Idempotent: once a quest exists for this session, never re-roll it. A
# second call (double-clicked Start, a poll race, a reload mid-flow) must
# return the SAME tasks — players completing a task should never see a
# different set of tasks appear under them.
if session.get("game"):
return session["game"]
config = session["config"]
retrieved = retrieve_examples(config, DATA_RECORDS, k=3) if DATA_RECORDS else []
game = _generate_game(config, retrieved)
unload_nemotron() # free GPU for ASR / MiniCPM / FLUX
is_valid, failures = validate_game(game, config)
repaired = repair_game(game, failures, config) if not is_valid else None
final_game = repaired if repaired is not None else game
# Record the selected game type on the game object (authoritative = config).
final_game["game_type"] = config.get("game_type", "scavenger_hunt")
log_generation_trace(
session_id=session_id, config=config, retrieved_examples=retrieved,
game=final_game,
validation_passed=is_valid or (repaired is not None),
validation_failures=failures, repaired_game=repaired,
)
for task in final_game.get("tasks", []):
log_event(session_id, "task_revealed", {
"task_id": task["task_id"], "title": task["title"],
"points": task["points"],
})
session["game"] = final_game
# Persist immediately so a process restart (e.g. dev hot-reload) between
# generation and begin_play() can't lose the quest and trigger a re-roll.
save_state()
return final_game
# ── Phase 3: Gameplay helpers ─────────────────────────────────────────────────
def complete_task(session_id: str, task_id: str, team_id: str = "team-a"):
"""Log a task completion event and return updated scoreboard."""
if session_id not in SESSION_STORE:
return "⚠ Unknown session"
ev = log_event(session_id, "task_completed", {
"task_id": task_id,
"summary": f"Team {team_id} completed {task_id}",
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
return f"✅ Task {task_id} completed!"
def skip_task(session_id: str, task_id: str, team_id: str = "team-a"):
"""Log a task skip event."""
if session_id not in SESSION_STORE:
return "⚠ Unknown session"
ev = log_event(session_id, "task_skipped", {
"task_id": task_id,
"summary": f"Team {team_id} skipped {task_id}",
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
return f"⏭️ Task {task_id} skipped."
def use_hint(session_id: str, task_id: str, team_id: str = "team-a",
question: str = "", answer: str = ""):
"""Log a hint usage event and return the team's new hint count for this task.
The question/answer are stored on the event so the detail view can show a
hint history. Scoring (``compute_scores``) deducts ``HINT_PENALTY`` per
``hint_used`` event, so no separate score mutation is needed here."""
if session_id not in SESSION_STORE:
return 0
ev = log_event(session_id, "hint_used", {
"task_id": task_id,
"question": (question or "").strip(),
"answer": (answer or "").strip(),
"summary": f"Team {team_id} used a hint for {task_id}",
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
return rooms.hints_used_count(session_id, team_id, task_id)
@spaces.GPU(duration=120)
def record_journal(
session_id: str,
transcript: str = "",
task_id: str = "",
location_note: str = "",
team_id: str = "team-a",
audio_path: str = "",
language: str = "en",
):
"""Record a journal entry, summarize it, and return the result.
Decorated with ``@spaces.GPU`` so the Cohere ASR model can run on the
GPU. On HF ZeroGPU, ``torch.cuda.is_available()`` is ``False`` outside
a ``@spaces.GPU`` function, so without this the ASR model would load
on CPU and the voice-journal button would appear to hang.
Two input paths are supported:
* **Voice path** — pass ``audio_path`` (e.g. a path returned by
``gr.Audio(type="filepath")``). The audio is transcribed with the
Cohere ASR service (``CohereLabs/cohere-transcribe-03-2026``) and
the transcript is stored with ASR metadata.
* **Typed path** — pass ``transcript`` directly. The user can also
edit an ASR transcript in the UI before submitting; if both
``audio_path`` and ``transcript`` are present, the typed
transcript wins and is treated as a manual correction
(``transcript_source == "hybrid"``).
"""
if session_id not in SESSION_STORE:
return "[!] Unknown session"
asr_metadata: dict | None = None
audio_ref: str | None = None
transcript_source = "typed"
# ── 1. Voice path — transcribe audio if provided ───────────────────
if audio_path:
audio_ref = audio_path
try:
from app.services.asr import transcribe as _asr_transcribe
asr_result = _asr_transcribe(audio_path, language=language)
asr_metadata = {
"model": asr_result.get("model"),
"language": asr_result.get("language"),
"status": asr_result.get("status"),
"error": asr_result.get("error"),
}
asr_text = (asr_result.get("transcript") or "").strip()
if asr_text and not transcript.strip():
transcript = asr_text
transcript_source = "asr"
elif asr_text and transcript.strip() and asr_text != transcript.strip():
transcript_source = "hybrid"
except Exception as exc:
print(f"[app] ASR transcription failed: {type(exc).__name__}: {exc}")
asr_metadata = {
"model": None,
"language": language,
"status": "error",
"error": f"{type(exc).__name__}: {exc}",
}
if not transcript or not transcript.strip():
# Surface the actual error instead of a generic message
hint = ""
if asr_metadata:
err = asr_metadata.get("error") or ""
status = asr_metadata.get("status") or ""
if status == "error" and err:
hint = f"\n\n**ASR error:** {err}"
elif status == "skipped":
hint = (
"\n\n**ASR was skipped** — set `CITYQUEST_SKIP_MODEL=0` "
"and ensure `HF_TOKEN` is configured."
)
return (
"⚠️ No transcript available — record audio or type a note first."
+ hint
)
# ── 2. Build journal entry ────────────────────────────────────────
entry = create_journal_entry(
transcript=transcript,
session_id=session_id,
team_id=team_id,
task_id=task_id or None,
location_note=location_note,
audio_ref=audio_ref,
asr_metadata=asr_metadata,
transcript_source=transcript_source,
)
# Summarize
summary = summarize_journal(transcript, task_id=task_id or None, location_note=location_note)
entry["moment_summary"] = summary["moment_summary"]
entry["tags"] = summary["tags"]
entry["story_value"] = summary["story_value"]
# Persist
save_journal_entry(entry)
# Log event
ev = log_event(session_id, "journal_recorded", {
"journal_id": entry["journal_id"],
"mood": entry["mood"],
"story_value": summary["story_value"],
"summary": summary["moment_summary"],
"transcript_source": transcript_source,
"asr_status": asr_metadata.get("status") if asr_metadata else None,
"asr_model": asr_metadata.get("model") if asr_metadata else None,
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
SESSION_STORE[session_id]["journals"].append(entry)
# Build display text
source_label = {
"asr": "🎙️ (transcribed)",
"hybrid": "🎙️✏️ (transcribed, edited)",
"typed": "⌨️ (typed)",
}.get(transcript_source, transcript_source)
asr_line = ""
if asr_metadata and asr_metadata.get("status") != "ok":
asr_line = f"- ASR status: **{asr_metadata.get('status')}** — {asr_metadata.get('error') or ''}\n"
display = (
f"🎙️ **Journal recorded!** {source_label}\n"
f"- Mood: *{entry['mood']}*\n"
f"- Story value: **{summary['story_value']}**\n"
f"- Tags: {', '.join(summary['tags'])}\n"
f"{asr_line}"
f"- Summary: {summary['moment_summary']}"
)
return display
def upload_photo(
session_id: str,
photo_file,
caption: str = "",
task_id: str = "",
team_id: str = "team-a",
):
"""Log a photo upload event and return a confirmation."""
if session_id not in SESSION_STORE:
return "[!] Unknown session"
photo_name = ""
if photo_file is not None:
# Gradio 4+ returns a filepath string or a PIL image
if isinstance(photo_file, str):
photo_name = photo_file.split("/")[-1].split("\\")[-1]
else:
photo_name = getattr(photo_file, "name", "photo")
photo_id = f"photo-{uuid.uuid4().hex[:8]}"
payload = {
"photo_id": photo_id,
"photo_name": photo_name,
"caption": caption,
"summary": f"Team {team_id} uploaded photo for {task_id or 'general'}",
}
if task_id:
payload["task_id"] = task_id
ev = log_event(session_id, "photo_uploaded", payload, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
# Track photo in session store for recap
if "photos" not in SESSION_STORE[session_id]:
SESSION_STORE[session_id]["photos"] = []
SESSION_STORE[session_id]["photos"].append({
"photo_id": photo_id,
"photo_name": photo_name,
"photo_path": photo_file if isinstance(photo_file, str) else "",
"caption": caption,
"task_id": task_id,
"team_id": team_id,
})
# Build gallery list
photos = SESSION_STORE[session_id]["photos"]
gallery_lines = [f"📸 **{p['photo_id']}** — {p['caption'] or '(no caption)'} [{p['task_id'] or 'general'}]" for p in photos]
display = (
f"📸 **Photo uploaded!**\n"
f"- ID: `{photo_id}`\n"
f"- Caption: {caption or '(none)'}\n"
f"- Related task: {task_id or 'general'}\n\n"
f"**All photos ({len(photos)}):**\n" + "\n".join(gallery_lines)
)
return display
def end_game(session_id: str, team_id: str = "team-a"):
"""End the game, compute scores, and return a scoreboard."""
if session_id not in SESSION_STORE:
return "[!] Unknown session"
session = SESSION_STORE[session_id]
game = session["game"]
events = load_events(session_id=session_id)
ev = log_event(session_id, "game_finished", {
"summary": f"Game finished — session {session_id}",
}, team_id=team_id)
events.append(ev)
scores = compute_scores(events, game)
session["scores"] = scores
lines = ["# Final Scoreboard\n"]
for ts in scores.get("team_scores", []):
marker = " [WINNER]" if ts["team_id"] == scores.get("winner") else ""
lines.append(f"### Team: {ts['team_id']}{marker}")
lines.append(f"- **Total points:** {ts['points']}")
lines.append(f"- Tasks completed: {ts['completed_tasks']}/{ts['total_tasks']}")
lines.append(f"- Hints used: {ts['hints_used']}")
if ts.get("bonuses"):
lines.append(f"- Bonuses: {', '.join(ts['bonuses'])}")
lines.append("")
lines.append("**Breakdown:**")
for b in ts.get("scoring_breakdown", []):
lines.append(f" * {b}")
lines.append("")
if scores.get("winner"):
lines.append(f"**Winner: {scores['winner']}**")
return "\n".join(lines)
def generate_recap(session_id: str):
"""Generate the final story recap from all collected session data."""
if session_id not in SESSION_STORE:
return "[!] Unknown session", {}
session = SESSION_STORE[session_id]
game = session["game"]
events = load_events(session_id=session_id)
journals = load_journal_entries(session_id=session_id)
scores = session.get("scores", compute_scores(events, game))
photos = session.get("photos", [])
# Build story packet
packet = build_story_packet(
game=game,
events=events,
scores=scores,
journal_entries=journals,
photo_captions=photos,
)
# Generate story
result = generate_story(packet, session_id=session_id)
# Format for display (poster goes in dedicated section below)
lines = [
"# Episode Recap\n",
result["short_recap"],
"",
result["long_summary"],
"",
]
# NOTE: poster_prompt is internal — do NOT render to user.
# NOTE: do not duplicate Final Standings here; left column shows it.
# TODO: result["long_summary"] may still contain a "Final standings"
# sub-section produced upstream in generate_story(); if so, strip it
# in the story generator module rather than here.
return "\n".join(lines), result
@spaces.GPU(duration=300)
def generate_poster_from_session(session_id: str):
"""Generate a poster image from an existing session's recap data."""
if session_id not in SESSION_STORE:
return None, "[!] Unknown session - generate a recap first"
session = SESSION_STORE[session_id]
# Check if we have a recap result or can build one
if "poster_url" in session:
return session["poster_url"], "Poster ready from previous generation"
# Build story packet from stored data
game = session.get("game")
if not game:
return None, "[!] No game data in session"
events = load_events(session_id=session_id)
journals = load_journal_entries(session_id=session_id)
scores = session.get("scores", compute_scores(events, game))
photos = session.get("photos", [])
packet = build_story_packet(
game=game, events=events, scores=scores,
journal_entries=journals, photo_captions=photos,
)
result = generate_story(packet, session_id=session_id)
poster_url, status = generate_poster_sync(
session_id,
result["poster_prompt"],
photo_paths=[p.get("photo_path", "") for p in photos if p.get("photo_path")],
)
session["poster_url"] = poster_url
return poster_url, status
# ── Build summary text ────────────────────────────────────────────────────────
def build_summary(game: dict, state: dict, session_id: str = "") -> str:
"""Format a human-readable game summary for the UI."""
lines = []
lines.append(f"# 🎮 {game.get('title', 'Untitled')}")
lines.append("")
if session_id:
lines.append(f"> Session `{session_id[:12]}…` — paste this in the **Play** tab to log progress.")
lines.append("")
lines.append("## 📋 Setup")
setup = game.get("setup", {})
lines.append(f"- **Location:** {setup.get('city', '?')}{setup.get('area', '?')}")
lines.append(f"- **Meeting point:** {setup.get('meeting_point', '?')}")
lines.append(f"- **Duration:** {setup.get('duration_minutes', '?')} min")
lines.append(f"- **Players:** {setup.get('num_players', '?')}")
lines.append("")
lines.append("## 📜 Rules")
for i, rule in enumerate(game.get("rules", []), 1):
lines.append(f" {i}. {rule}")
lines.append("")
lines.append("## 🎯 Tasks")
for t in game.get("tasks", []):
time_str = f"{t.get('time_limit_minutes', '∞')} min" if t.get("time_limit_minutes") else "No time limit"
lines.append(f" **{t.get('task_id', '?')}:** {t.get('title', '?')}")
lines.append(f" - *{t.get('description', '')[:80]}*")
lines.append(f" - 🏆 {t.get('points', 0)} pts | ⏱ {time_str} | 📸 {t.get('proof_type', '?')}")
lines.append(f" - 💡 {t.get('hint', '')[:70]}")
lines.append(f" - 🛡 {t.get('safety_note', '')[:70]}")
lines.append("")
lines.append("## 💡 Global Hints")
for h in game.get("global_hints", []):
lines.append(f" • {h}")
lines.append("")
lines.append("## 🔒 Safety")
safety = game.get("safety", {})
lines.append(f"- **Zone:** {safety.get('allowed_zone', '?')}")
lines.append(f"- **Supervision required:** {'Yes' if safety.get('adult_supervision') else 'No'}")
lines.append(f"- **Forbidden:** {', '.join(safety.get('forbidden_behaviors', []))}")
lines.append("")
lines.append("## 📊 Scoring")
for s in game.get("score_rules", []):
lines.append(f" • {s}")
lines.append(f"- **Tie-breaker:** {game.get('tie_breaker', '?')}")
lines.append("")
lines.append("## 📖 Story Seed")
seed = game.get("story_seed", {})
lines.append(f"- **Tone:** {seed.get('tone', '?')}")
lines.append(f"- **Motifs:** {', '.join(seed.get('motifs', []))}")
lines.append(f"- **Recap style:** {seed.get('recap_style', '?')}")
lines.append("")
# Pipeline trace
lines.append("---")
lines.append("### 🔍 Pipeline trace")
lines.append(f"- Retrieval: {state.get('num_retrieved', 0)} examples found")
if state.get("retrieved_ids"):
lines.append(f"- Retrieved IDs: {', '.join(state['retrieved_ids'])}")
lines.append(f"- Game ID: {state.get('game_id', '?')}")
if state.get("validation_passed"):
lines.append(f"- ✅ Validation passed")
else:
lines.append(f"- ❌ Validation failed ({len(state.get('validation_failures', []))} issues)")
if state.get("repair_applied"):
lines.append(f"- 🔧 Repair applied → {'✅ Passed' if state.get('repair_valid') else '❌ Still has issues'}")
for f in state.get("validation_failures", [])[:5]:
lines.append(f" - {f}")
leftover = state.get("remaining_failures", [])
if leftover:
for f in leftover[:5]:
lines.append(f" - ⚠ {f}")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════
# LIQUID-FLOW MULTIPLAYER UI
# ══════════════════════════════════════════════════════════════════════════
from app.ui.theme import CSS, JS_INIT, gradio_theme
rooms.load_state() # restore live rooms after a restart
ORDER = ["home", "create", "join", "lobby", "play", "wait", "win"]
def vis(target: str):
"""Visibility updates for the 7 top-level screen groups."""
return tuple(gr.update(visible=(name == target)) for name in ORDER)
def _mmss(secs) -> str:
m, s = divmod(max(0, int(secs)), 60)
return f"{m:02d}:{s:02d}"
# ── HTML fragment renderers (polled) ──────────────────────────────────────
def code_html(session_id: str) -> str:
s = SESSION_STORE.get(session_id) or {}
code = s.get("room_code", "——————")
return (
"<div style='text-align:center'>"
"<div class='cq-code-hint'>Share this code with your friends to join</div>"
f"<div class='cq-code'>{code}</div></div>"
)
def copy_btn_html(session_id: str) -> str:
"""A clipboard button beside the room code. Kept in its own (non-polled)
HTML component so the 'Copied!' state isn't reset by the 1.5s lobby poll.
Pure inline JS — Gradio can't reach the clipboard from Python."""
s = SESSION_STORE.get(session_id) or {}
code = s.get("room_code", "")
js = (
"var b=this;navigator.clipboard.writeText('" + code + "').then(function(){"
"b.classList.add('copied');var l=b.querySelector('.cq-copy-label');"
"if(l)l.textContent='✓ Copied!';"
"setTimeout(function(){b.classList.remove('copied');"
"if(l)l.textContent='📋 Copy';},2000);});"
)
return (
"<div class='cq-copy-wrap'>"
f"<button type='button' class='cq-copy-btn' onclick=\"{js}\">"
"<span class='cq-copy-label'>📋 Copy</span></button></div>"
)
def _teams_ready(session_id: str):
"""(teams_with_at_least_one_player, total_teams) for the lobby."""
s = SESSION_STORE.get(session_id) or {}
teams = s.get("teams", [])
have = {p["team_id"] for p in s.get("players", [])}
ready = sum(1 for t in teams if t["id"] in have)
return ready, len(teams)
def roster_html(session_id: str, me: str = "") -> str:
s = SESSION_STORE.get(session_id) or {}
host = s.get("host_name", "")
ready, total = _teams_ready(session_id)
pct = int(round(100 * ready / total)) if total else 0
parts = ["<div style='display:flex;flex-direction:column;gap:14px'>"]
parts.append(
"<div class='cq-progress'>"
f"<div class='cq-progress-label'>{ready} / {total} teams ready</div>"
"<div class='cq-progress-track'>"
f"<div class='cq-progress-fill' style='width:{pct}%'></div></div></div>"
)
for t in s.get("teams", []):
members = [p for p in s.get("players", []) if p["team_id"] == t["id"]]
if members:
chips = "".join(
f"<span class='cq-pill{' live' if p['name'] == me else ''}'>"
f"{p['name']}{' ⭐' if p['name'] == host else ''}</span>"
for p in members
)
else:
chips = "<span class='cq-pill wait'>waiting…</span>"
parts.append(
"<div class='cq-soft'>"
f"<div style=\"font-family:'Fraunces',serif;font-weight:600;color:#FFC107\">"
f"{t['name']} <span style='color:#CCCCCC;font-size:13px'>· {t['role']}</span></div>"
f"<div class='cq-strip' style='margin-top:8px'>{chips}</div></div>"
)
parts.append("</div>")
return "".join(parts)
def timer_html(session_id: str, team_id: str) -> str:
ph = rooms.team_phase(session_id, team_id)
meta = rooms.team_meta(session_id, team_id)
if ph["phase"] == "waiting":
label, val, sub = "Head start in progress — you launch in", _mmss(ph["seconds_to_start"]), "Sit tight, your quest is about to open"
elif ph["phase"] == "overtime":
label, val, sub = "Time's up — wrap up and finish", "00:00", ""
else:
label, val, sub = "Time remaining", _mmss(ph["seconds_remaining"]), ""
return (
"<div class='cq-card' style='text-align:center'>"
f"<div style=\"font-family:'Fraunces',serif;font-weight:600;font-size:20px\">"
f"{meta['name']} <span style='color:#CCCCCC;font-size:14px'>· {meta['role']}</span></div>"
f"<div class='section-label' style='justify-content:center;margin-top:10px'>{label}</div>"
f"<div class='cq-timer'>{val}</div>"
f"<div style='color:#CCCCCC;font-size:13px'>{sub}</div></div>"
)
def status_strip_html(session_id: str, my_team: str = "") -> str:
s = SESSION_STORE.get(session_id) or {}
pills = []
for t in s.get("teams", []):
tid = t["id"]
if rooms.team_finished(session_id, tid):
cls, txt = "done", f"{t['name']} · finished 🏁"
elif rooms.is_unlocked(session_id, tid):
cls, txt = "live", f"{t['name']} · exploring"
else:
ph = rooms.team_phase(session_id, tid)
cls, txt = "wait", f"{t['name']} · starts in {_mmss(ph['seconds_to_start'])}"
me = " (you)" if tid == my_team else ""
pills.append(f"<span class='cq-pill {cls}'>{txt}{me}</span>")
return "<div class='cq-strip'>" + "".join(pills) + "</div>"
def scoreboard_md(session_id: str) -> str:
s = SESSION_STORE.get(session_id) or {}
game = s.get("game") or {}
events = load_events(session_id=session_id)
scores = compute_scores(events, game)
s["scores"] = scores
winner = scores.get("winner")
ts_sorted = sorted(scores.get("team_scores", []), key=lambda x: x.get("points", 0), reverse=True)
medals = ["🥇", "🥈", "🥉"]
lines = ["# 🏆 Final Standings", ""]
for i, ts in enumerate(ts_sorted):
meta = rooms.team_meta(session_id, ts["team_id"])
medal = medals[i] if i < 3 else "•"
crown = " — Champion" if ts["team_id"] == winner else ""
lines.append(f"### {medal} {meta['name']}{crown}")
lines.append(
f"- **{ts['points']} points** · {ts['completed_tasks']}/{ts['total_tasks']} tasks · "
f"{ts['hints_used']} hints used"
)
lines.append("")
return "\n".join(lines)
def _task_id_list(session_id: str) -> list[str]:
s = SESSION_STORE.get(session_id) or {}
return [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]]
def _task_by_id(session_id: str, task_id: str) -> dict:
s = SESSION_STORE.get(session_id) or {}
return next((x for x in (s.get("game") or {}).get("tasks", []) if x["task_id"] == task_id), {})
def task_detail_md(session_id: str, team_id: str, task_id: str) -> str:
t = _task_by_id(session_id, task_id)
tl = t.get("time_limit_minutes")
tls = f"{tl} min" if tl else "no time limit"
return (
f"## {t.get('title', 'Task')}\n\n"
f"**🏆 {t.get('points', 0)} pts · ⏱ {tls} · 📸 {t.get('proof_type', 'photo')}**\n\n"
f"{t.get('description', '')}"
)
def task_hint_html(session_id: str, team_id: str, task_id: str) -> str:
"""Hint rendered with its own class so it reads as 'discovery' (amber),
visually distinct from a safety warning."""
hint = (_task_by_id(session_id, task_id).get("hint") or "").strip()
if not hint:
return ""
return f"<div class='hint-item'>💡 {html.escape(hint)}</div>"
def task_safety_html(session_id: str, team_id: str, task_id: str) -> str:
"""Safety flag rendered with a red 'warning' treatment, distinct from a hint."""
note = (_task_by_id(session_id, task_id).get("safety_note")
or "Stay aware of your surroundings.").strip()
return f"<div class='safety-item'>🛡 {html.escape(note)}</div>"
def task_gallery(session_id: str, team_id: str, task_id: str):
return [
(p["photo_path"], p.get("caption") or "")
for p in rooms.team_photos(session_id, team_id, task_id)
if p.get("photo_path")
]
def proof_md(session_id: str, team_id: str, task_id: str) -> str:
n_photos, n_journals = rooms.proof_count(session_id, team_id, task_id)
if n_photos + n_journals > 0:
return f"✅ Proof captured — **{n_photos}** photo(s), **{n_journals}** journal(s). You can complete this task."
return "🔒 Add at least **one photo** or **one voice journal** to unlock **Mark Completed**."
def journal_log_md(session_id: str, team_id: str, task_id: str) -> str:
"""List the voice/typed journals already saved for this task so players
can keep adding more recordings."""
entries = rooms.team_journals(session_id, team_id, task_id)
if not entries:
return ""
lines = [f"**🎙️ Journals for this task ({len(entries)})** — record more anytime:"]
for i, e in enumerate(entries, 1):
txt = (e.get("transcript") or "").strip().replace("\n", " ")
if len(txt) > 90:
txt = txt[:90] + "…"
lines.append(f"{i}. {txt or '*(no text)*'}")
return "\n".join(lines)
def _hint_points_update(session_id: str, team_id: str, task_id: str):
"""Live 'available points' for this task: base points minus the per-hint
penalty for every hint this team has revealed, clamped at 0. Turns
orange-red once any hint has been spent."""
base = _task_by_id(session_id, task_id).get("points", 0)
used = rooms.hints_used_count(session_id, team_id, task_id)
avail = max(0, base - used * HINT_PENALTY)
cls = ["points-display"] + (["reduced"] if used > 0 else [])
return gr.update(value=f"⭐ {avail} pts available (−{HINT_PENALTY} pts per hint)",
elem_classes=cls)
def _hint_btn_state(remaining: int):
"""Hint button label/interactivity from the number of hints still allowed."""
if remaining <= 0:
return gr.update(value="No hints left", interactive=False,
elem_classes=["btn-hint", "spent"])
return gr.update(value=f"Get a hint −{HINT_PENALTY} pts ({remaining} left)",
interactive=True, elem_classes=["btn-hint"])
def _hint_btn_update(session_id: str, team_id: str, task_id: str):
used = rooms.hints_used_count(session_id, team_id, task_id)
return _hint_btn_state(max(0, rooms.HINT_CAP - used))
def hint_log_md(session_id: str, team_id: str, task_id: str) -> str:
"""History of hints already revealed for this task so players don't have to
re-buy a hint they've seen — shows the guide's answer for each."""
hints = rooms.team_hints(session_id, team_id, task_id)
if not hints:
return ""
total = len(hints) * HINT_PENALTY
lines = [f"**Hints used ({len(hints)} of {rooms.HINT_CAP}) — −{total} pts total**"]
for i, p in enumerate(hints, 1):
ans = (p.get("answer") or "").strip().replace("\n", " ")
if len(ans) > 140:
ans = ans[:140] + "…"
lines.append(f"💡 Hint {i}: {ans or '*(no answer captured)*'}")
return "\n\n".join(lines)
def _detail_tab_updates(session_id: str, team_id: str, active_tid: str):
"""Updates for the 12 horizontal task-tab buttons in the detail view: each
shows ✅/○ + name + points, the completed ones flagged 'done', and the one
you're viewing flagged 'active'."""
s = SESSION_STORE.get(session_id) or {}
tasks = (s.get("game") or {}).get("tasks", [])[:12]
wall = rooms.task_wall_state(session_id, team_id)
ups = []
for i in range(12):
if i < len(tasks):
t = tasks[i]
tid = t["task_id"]
done = wall.get(tid, False)
name = t.get("title", "Task")
if len(name) > 22:
name = name[:21] + "…"
cls = ["task-tab"]
if done:
cls.append("task-tab-done")
if tid == active_tid:
cls.append("task-tab-active")
ups.append(gr.update(
value=f"{'✅' if done else '○'} {name} · {t.get('points', 0)}pts",
visible=True, elem_classes=cls,
))
else:
ups.append(gr.update(visible=False))
return ups
def _task_position(session_id: str, active_tid: str) -> str:
s = SESSION_STORE.get(session_id) or {}
ids = [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]]
if not ids:
return ""
idx = ids.index(active_tid) if active_tid in ids else 0
return f"Task {idx + 1} of {len(ids)}"
def _complete_btn_update(can: bool):
"""Locked (grey, disabled) vs unlocked (green, active) Mark Completed button."""
if can:
return gr.update(interactive=True, value="✅ Mark completed",
elem_classes=["mark-complete-btn", "complete-btn", "unlocked"])
return gr.update(interactive=False, value="🔒 Mark completed",
elem_classes=["mark-complete-btn", "complete-btn", "locked"])
def _completion_instruction_update(can: bool):
"""The elevated unlock instruction shown just above Mark Completed."""
if can:
return gr.update(value="✅ Proof added — you can mark this complete!",
elem_classes=["completion-instruction", "unlocked"])
return gr.update(value="🔒 Add a photo or voice note to unlock completion",
elem_classes=["completion-instruction", "locked"])
# ── Polling dispatchers (attached to the shared gr.Timer) ─────────────────
# Adaptive polling: stay snappy (FAST) while anything is changing, ease to
# SLOW after a stretch of identical ticks. The countdown timer on the play
# screen changes every second, so play naturally stays FAST; lobby/wait back
# off when idle. Capped at SLOW so screen transitions still arrive within ~3s.
POLL_FAST = 1.5
POLL_SLOW = 3.0
POLL_IDLE_LIMIT = 4 # identical ticks before easing off
def poll_tick(session_id, screen, team_id, player_name, is_host, poll_state):
nav = [gr.update()] * 7
new_screen = screen
lobby_code = lobby_roster = play_timer = play_status = wait_status = win_score = gr.update()
lobby_start = lobby_start_anyway = lobby_hint = gr.update()
wait_line = wait_results_btn = gr.update()
if session_id:
status = rooms.session_status(session_id)
# Canonical team id — never "" — so same-team members converge on one id.
team_id = rooms.resolve_team_id(session_id, player_name) or team_id
if screen == "lobby":
lobby_code = code_html(session_id)
lobby_roster = roster_html(session_id, player_name)
# Start gate (Model B): the primary Start enables only when the whole
# party is here (full AND min_ok). The host-only "Start anyway"
# override covers the short-handed-but-legal case (min_ok, not full).
pcs = rooms.player_count_status(session_id)
full, min_ok = pcs["full"], pcs["min_ok"]
lobby_start = gr.update(interactive=(full and min_ok))
if is_host:
lobby_start_anyway = gr.update(
visible=(not full), interactive=(min_ok and not full),
value=f"▶ Start anyway ({pcs['joined']}/{pcs['target']} here)")
lobby_hint = lobby_hint_text(session_id, is_host)
if status == "playing":
new_screen = "play"
nav = list(vis("play"))
play_timer = timer_html(session_id, team_id)
play_status = status_strip_html(session_id, team_id)
elif screen == "play":
play_timer = timer_html(session_id, team_id)
play_status = status_strip_html(session_id, team_id)
elif screen == "wait":
# The wait screen NEVER navigates on its own — for any player, under
# any condition. Each tick only UPDATES: the team strip, the
# "who's still playing" line, and the results button's enabled state.
# The wait->win transition happens ONLY when a player clicks the
# "See final results" button (see do_see_results).
wait_status = status_strip_html(session_id, team_id)
wait_line = wait_playing_md(session_id)
pcs = rooms.player_count_status(session_id)
can_results = rooms.all_finished(session_id) and pcs["min_ok"]
wait_results_btn = gr.update(interactive=can_results)
# ── Decide next poll interval from whether anything changed ──
def _s(x):
return x if isinstance(x, str) else ""
sig = "|".join([new_screen, _s(lobby_code), _s(lobby_roster), _s(play_timer),
_s(play_status), _s(wait_status), _s(wait_line), _s(win_score)])
last_sig, idle, cur_iv = poll_state or ("", 0, POLL_FAST)
if sig != last_sig:
idle, target_iv = 0, POLL_FAST
else:
idle += 1
target_iv = POLL_SLOW if idle >= POLL_IDLE_LIMIT else POLL_FAST
# Only emit a Timer change when the interval actually flips, so we don't
# reset the running countdown every tick.
timer_update = gr.Timer(target_iv) if target_iv != cur_iv else gr.update()
new_state = (sig, idle, target_iv)
return (*nav, new_screen, lobby_code, lobby_roster, lobby_start, lobby_start_anyway,
lobby_hint, play_timer, play_status, wait_status, wait_line,
wait_results_btn, win_score, timer_update, new_state)
def play_tick(session_id, screen, team_id, taskids, cur_task):
"""On the play screen keep the task-id list current and, on first entry
(no task open yet), open the first task in place. This covers both the host
(who clicks Start) and guests (who auto-transition to play on the poll), so
the detail view is always populated without a separate landing screen.
Returns the full detail-open tuple plus the refreshed ``s_taskids``."""
if screen != "play" or not session_id:
return (gr.update(),) * _N_DETAIL_OUTPUTS + (taskids,)
ids = _task_id_list(session_id)
ids_json = json.dumps(ids)
if not cur_task and ids:
# First time on the play screen for this client: open task 1.
return _open_task(session_id, team_id, ids[0]) + (ids_json,)
# Already viewing a task — leave the detail view untouched (don't reset the
# player's in-progress photo/transcript every tick), just sync the id list.
return (gr.update(),) * _N_DETAIL_OUTPUTS + (ids_json,)
# ── Screen-action handlers ────────────────────────────────────────────────
# Lobby instruction copy — differs for the host vs. everyone else. The host
# copy shows progress (joined/target); HINT_HOST_WAIT/HINT_HOST_SHORT are
# format strings filled by lobby_hint_text().
HINT_HOST_WAIT = "⏳ Waiting for players… {joined}/{target} joined"
HINT_HOST_SHORT = "▶ {joined}/{target} here — start now, or wait for the rest."
HINT_HOST_READY = "✅ Everyone's in — start the adventure whenever you're ready."
HINT_GUEST = "⏳ Waiting for the host to start the adventure…"
def lobby_hint_text(session_id: str, is_host: bool) -> str:
"""Lobby instruction line. Guests always wait on the host; the host sees
progress and which gate (full vs. structural minimum) they're at."""
if not is_host:
return HINT_GUEST
pcs = rooms.player_count_status(session_id)
if pcs["full"] and pcs["min_ok"]:
return HINT_HOST_READY
if pcs["min_ok"]:
return HINT_HOST_SHORT.format(joined=pcs["joined"], target=pcs["target"])
return HINT_HOST_WAIT.format(joined=pcs["joined"], target=pcs["target"])
def wait_playing_md(session_id: str) -> str:
"""Live 'who's still playing' line for the wait screen — active teams
(>=1 player) that haven't finished yet."""
active = rooms.active_team_ids(session_id)
still = [tid for tid in active if not rooms.team_finished(session_id, tid)]
if not still:
return "Everyone's finished — see results whenever you're ready."
names = ", ".join(rooms.team_meta(session_id, tid)["name"] for tid in still)
return f"Waiting on: {names}"
def _team_btn_updates(session_id, selected_id):
"""Button updates for the 4 team-pick slots: label + visibility, with the
player's current team flagged 'selected' so it reads as chosen (green)."""
teams = rooms.list_teams(session_id)
ups = []
for i in range(4):
if i < len(teams):
cls = ["team-pick", "selected"] if teams[i]["id"] == selected_id else ["team-pick"]
ups.append(gr.update(value=f"{teams[i]['name']} · {teams[i]['role']}",
visible=True, elem_classes=cls))
else:
ups.append(gr.update(visible=False))
return ups
def do_create(name, gtype, city, area, duration, teams_n, head_start,
difficulty, age_group, energy, players_n):
name = (name or "Host").strip() or "Host"
# Defense-in-depth: the slider min is 2, but guard here too so a game can
# never be created with a single team (build_teams also floors at 2).
teams_n = int(teams_n)
if teams_n < 2:
gr.Warning("Minimum 2 teams required — using 2.")
teams_n = 2
session_id = str(uuid.uuid4())
config = {
"game_type": gtype, "city": city or "Paris", "area": area or "Downtown",
"location_type": "mixed", "duration_minutes": int(duration),
"num_players": int(players_n), "num_teams": int(teams_n),
"head_start_seconds": int(head_start), "difficulty": difficulty,
"age_group": age_group, "energy_level": energy, "photo_enabled": True,
"landscape_tags": [], "theme": "", "mobility": "standard", "allow_transport": False,
}
rooms.create_room(name, config, session_id)
teams = rooms.list_teams(session_id)
tb = _team_btn_updates(session_id, teams[0]["id"])
# Host's s_team is their real team id (teams[0] — create_room files the host
# under it too), NOT "", so host proof/journal/finish state and final
# standings all key off the same id as every other same-team member.
return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], True,
code_html(session_id), roster_html(session_id, name),
gr.update(visible=True, interactive=False),
copy_btn_html(session_id), lobby_hint_text(session_id, True), *tb)
def do_join(code, name):
session_id, msg, ok = rooms.join_room(code, name)
if not ok:
gr.Warning(msg)
return (*vis("join"), "join", "", "", "", False, f"⚠ {msg}",
gr.update(), gr.update(), gr.update(visible=False),
gr.update(), gr.update(),
*([gr.update(visible=False)] * 4))
gr.Info(f"Joined room — {msg}")
name = (name or "").strip()
teams = rooms.list_teams(session_id)
tb = _team_btn_updates(session_id, teams[0]["id"])
return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], False, f"✅ {msg}",
code_html(session_id), roster_html(session_id, name),
gr.update(visible=False),
copy_btn_html(session_id), HINT_GUEST, *tb)
def make_pick(idx):
def _pick(session_id, player):
teams = rooms.list_teams(session_id)
if idx >= len(teams):
return (gr.update(), gr.update(), *([gr.update()] * 4))
tid = teams[idx]["id"]
rooms.pick_team(session_id, player, tid)
return (tid, roster_html(session_id, player),
*_team_btn_updates(session_id, tid))
return _pick
def prepare_game(session_id):
"""Generate the game during the lobby wait so Start is instant.
Triggered right after the host creates the room. Config is fully known at
creation time (teams/players don't affect Nemotron), so we download +
generate here, hidden behind the lobby. Runs in a normal Gradio event
(request context) so the inner @spaces.GPU generation gets a real GPU.
"""
if not session_id:
yield ""
return
s = SESSION_STORE.get(session_id)
if not s:
yield ""
return
if s.get("game"):
yield "<div class='cq-ready'>✓ Quest ready</div>"
return
yield "<div class='cq-loading'>Preparing your quest — warming Nemotron &amp; generating tasks…</div>"
try:
generate_for_session(session_id)
yield "<div class='cq-ready'>✓ Quest ready — start whenever everyone's in</div>"
except Exception as e:
print(f"[prepare_game] {type(e).__name__}: {e}")
yield "<div class='cq-loading'>Couldn't pre-generate — it'll generate when you start.</div>"
def do_start(session_id, team_id=""):
s = SESSION_STORE.get(session_id) or {}
# Render the starter's OWN team strip/timer, not always team A — a host who
# picked a later team shouldn't see Team A's strip. Never empty: fall back
# to the first team (mirrors rooms.resolve_team_id's fallback).
team_id = team_id or (s.get("teams") or [{"id": "team-a"}])[0]["id"]
if not s.get("game"):
# Fallback: lobby pre-generation didn't finish — show a loader.
yield (*vis("lobby"), "lobby", gr.update(), gr.update(),
"<div class='cq-loading'>Summoning your quest…</div>")
generate_for_session(session_id)
rooms.begin_play(session_id, SESSION_STORE[session_id].get("game"))
yield (*vis("play"), "play", timer_html(session_id, team_id),
status_strip_html(session_id, team_id), "")
def do_leave():
return (*vis("home"), "home", "", "", "", False)
# Number of outputs produced by _open_task / consumed by DETAIL_OPEN_OUTPUTS.
# 17 fixed slots + 12 task-tab buttons.
_N_DETAIL_OUTPUTS = 19 + 12
def _open_task(session_id, team_id, tid):
"""Render the full task-detail view for one task. Shared by the play-screen
entry, the horizontal task tabs, and the prev/next arrows so every entry
point produces an identical, consistent detail screen."""
# NOTE: tasks are session-global (session["game"]["tasks"]) and are the SAME
# for everyone — never slice/filter them per player. team_id here only keys
# the per-team completion/proof/hint OVERLAY, so same-team members (sharing
# one resolved team_id) see one shared task/proof view. Do not regress this.
can = rooms.can_complete(session_id, team_id, tid)
return (
tid,
gr.update(visible=True), # detail view (always shown on the play screen)
task_detail_md(session_id, team_id, tid),
task_hint_html(session_id, team_id, tid),
task_safety_html(session_id, team_id, tid),
task_gallery(session_id, team_id, tid),
proof_md(session_id, team_id, tid),
_complete_btn_update(can),
_completion_instruction_update(can),
"", None, "", "", "", # transcript, audio, tagline, ask answer, capture status
journal_log_md(session_id, team_id, tid),
_task_position(session_id, tid),
_hint_points_update(session_id, team_id, tid),
hint_log_md(session_id, team_id, tid),
_hint_btn_update(session_id, team_id, tid),
*_detail_tab_updates(session_id, team_id, tid),
)
def open_task_factory(idx):
"""Open the task at position ``idx`` in the team's task list. Used by both
the wall cards and the horizontal task tabs (both index into s_taskids)."""
def _open(session_id, team_id, taskids):
ids = json.loads(taskids or "[]")
if idx >= len(ids):
return (gr.update(),) * _N_DETAIL_OUTPUTS
return _open_task(session_id, team_id, ids[idx])
return _open
def go_adjacent_factory(delta):
"""Step to the previous (delta=-1) or next (delta=+1) task, clamped to the
list ends, and open it in place."""
def _go(session_id, team_id, taskids, cur_tid):
ids = json.loads(taskids or "[]")
if not ids:
return (gr.update(),) * _N_DETAIL_OUTPUTS
i = ids.index(cur_tid) if cur_tid in ids else 0
i = max(0, min(len(ids) - 1, i + delta))
return _open_task(session_id, team_id, ids[i])
return _go
def do_transcribe(audio_path, language):
if not audio_path:
yield "No recording yet — tap record above, then transcribe."
return
yield "🌀 Transcribing your note…"
text = transcribe_only(audio_path, language or "en")
yield text or "(No speech detected — pick the right spoken language, or type your note.)"
def do_save_journal(session_id, transcript, audio_path, language, task_id, team_id):
# 7 outputs: capture status, proof, complete-btn, completion instruction,
# journal log, audio, transcript
yield ("🌀 Saving your journal…", gr.update(), gr.update(),
gr.update(), gr.update(), gr.update(), gr.update())
msg = record_journal(session_id, transcript=transcript or "", task_id=task_id,
team_id=team_id, audio_path=audio_path or "", language=language or "en")
if msg.lstrip().startswith("⚠"):
gr.Warning("Couldn't save — record audio or type a note first.")
else:
gr.Info("Journal saved 🎙️")
can = rooms.can_complete(session_id, team_id, task_id)
# Clear the recorder + transcript so the player can immediately record another.
yield (msg, proof_md(session_id, team_id, task_id), _complete_btn_update(can),
_completion_instruction_update(can),
journal_log_md(session_id, team_id, task_id), None, "")
def do_add_photo(session_id, image, tagline, task_id, team_id):
# 7 outputs: capture status, gallery, proof, complete-btn, completion
# instruction, photo, tagline
if image is None:
gr.Warning("Choose a photo first.")
return ("Choose a photo first.", task_gallery(session_id, team_id, task_id),
proof_md(session_id, team_id, task_id), gr.update(), gr.update(), image, tagline)
upload_photo(session_id, image, caption=tagline or "", task_id=task_id, team_id=team_id)
can = rooms.can_complete(session_id, team_id, task_id)
gr.Info("Photo added 📸")
return ("📸 Photo added with its tagline.", task_gallery(session_id, team_id, task_id),
proof_md(session_id, team_id, task_id), _complete_btn_update(can),
_completion_instruction_update(can), None, "")
@spaces.GPU(duration=120)
def _ask_gpu(question, session_id, team_id, task_id):
s = SESSION_STORE.get(session_id) or {}
game = s.get("game") or {}
t = next((x for x in game.get("tasks", []) if x["task_id"] == task_id), {})
city = game.get("setup", {}).get("city", "")
ctx = f"City: {city}. Task: {t.get('title','')}{t.get('description','')}. Hint: {t.get('hint','')}"
ans = None
try:
from app.services import minicpm
ans = minicpm.ask(question, context=ctx)
except Exception as e:
print(f"[ask] failed: {e}")
if not ans:
ans = "*(The guide is resting — re-read the hint and scan for the named landmark. Try again in a moment.)*"
return ans
def on_get_hint(question, session_id, team_id, task_id):
"""Reveal a hint from the in-game guide and charge the team for it.
Outputs (4): guide answer, available-points display, hint button, hint log.
Points are only charged once a real answer is produced — an empty question
or a spent task costs nothing.
"""
used = rooms.hints_used_count(session_id, team_id, task_id)
remaining = max(0, rooms.HINT_CAP - used)
if remaining <= 0:
gr.Warning("No hints left for this task.")
yield (gr.update(value="You've used all the hints for this task — check the hint history below."),
gr.update(), _hint_btn_state(0), gr.update())
return
if not (question or "").strip():
yield ("Tell me where you're stuck, or ask for a clue — then tap the button.",
gr.update(), gr.update(), gr.update())
return
# Thinking: disable the button so the hint can't be double-charged; don't
# deduct until we actually have an answer.
yield ("🌀 The guide is thinking…", gr.update(),
gr.update(value="Thinking…", interactive=False), gr.update())
answer = _ask_gpu(question, session_id, team_id, task_id)
used_now = use_hint(session_id, task_id, team_id, question=question, answer=answer)
remaining_now = max(0, rooms.HINT_CAP - used_now)
gr.Info(f"−{HINT_PENALTY} pts · Hint used · {remaining_now} hint(s) left")
yield (f"💡 {answer}",
_hint_points_update(session_id, team_id, task_id),
_hint_btn_state(remaining_now),
hint_log_md(session_id, team_id, task_id))
def do_complete(session_id, team_id, task_id):
if rooms.can_complete(session_id, team_id, task_id):
complete_task(session_id, task_id, team_id)
gr.Info("Task completed — nice work! 🎉")
# Re-render the same task so its tab shows ✓ and the completed state
# is reflected in place (there's no wall to return to).
return _open_task(session_id, team_id, task_id)
gr.Warning("Add a photo or voice journal first to complete this task.")
return (gr.update(),) * _N_DETAIL_OUTPUTS
def do_finish(session_id, team_id):
log_event(session_id, "game_finished", {"summary": f"{team_id} finished"}, team_id=team_id)
rooms.finish_team(session_id, team_id)
return (*vis("wait"), "wait", status_strip_html(session_id, team_id))
def do_see_results(session_id):
"""Manual wait->win advance — the ONLY path to the win screen. Fired by the
'See final results' button (gated/enabled by poll_tick), never automatic."""
return (*vis("win"), "win", scoreboard_md(session_id))
def _gather_moments(session_id, team_id):
bits = [j.get("transcript", "") for j in rooms.team_journals(session_id, team_id)]
bits += [p.get("caption", "") for p in rooms.team_photos(session_id, team_id)]
return [b for b in bits if b][:12]
@spaces.GPU(duration=180)
def _funny_recap_gpu(moments):
prompt = (
"Our playful city adventure moments: " + "; ".join(moments) +
". Write a short, funny, warm recap (4-6 sentences). Keep it light and silly. "
"Do NOT mention winning, losing, scores, or rankings."
)
ans = None
try:
from app.services import minicpm
ans = minicpm.ask(prompt, context="", max_tokens=320)
except Exception as e:
print(f"[funny_recap] {e}")
if not ans:
ans = "### A few ridiculous highlights\n\n- " + "\n- ".join(moments)
return ans
def _stream_text(full: str):
"""Yield a string progressively for a narrative reveal (UI pacing only).
Reveals paragraph-by-paragraph, falling back to sentence chunks for a
single long block, accumulating so the Markdown stays well-formed at every
step. The text itself is produced by the backend and is never altered.
"""
full = (full or "").strip()
if not full:
yield full
return
paras = [p for p in full.split("\n\n") if p.strip()]
if len(paras) <= 1:
# One block — chunk by sentence so long recaps still feel alive.
import re
paras = re.findall(r"[^.!?]+[.!?]*\s*", full) or [full]
sep = ""
else:
sep = "\n\n"
acc = ""
for i, chunk in enumerate(paras):
acc += (sep if i else "") + chunk
yield acc
time.sleep(0.12)
def do_funny_recap(session_id, team_id):
moments = _gather_moments(session_id, team_id)
if not moments:
yield "_Snap a photo or record a voice note first — then I'll spin a silly recap!_"
return
yield "🌀 Spinning a silly recap…"
yield from _stream_text(_funny_recap_gpu(moments))
@spaces.GPU(duration=300)
def _funny_post_gpu(session_id, team_id):
paths = [p["photo_path"] for p in rooms.team_photos(session_id, team_id) if p.get("photo_path")]
prompt = ("A whimsical, funny travel scrapbook poster, warm pastel beige tones, playful "
"hand-drawn doodles, lighthearted city adventure, cozy and silly, no text")
return generate_poster_sync(f"{session_id}-funny", prompt, photo_paths=paths)
def do_funny_post(session_id, team_id):
yield None, "🎨 Painting your funny poster…"
yield _funny_post_gpu(session_id, team_id)
@spaces.GPU(duration=180)
def _win_recap_gpu(session_id):
md, _ = generate_recap(session_id)
return md
def do_win_recap(session_id):
yield "🌀 Writing your winning recap…"
yield from _stream_text(_win_recap_gpu(session_id))
@spaces.GPU(duration=300)
def _win_poster_gpu(session_id, kind):
s = SESSION_STORE.get(session_id) or {}
paths = [p["photo_path"] for p in s.get("photos", []) if p.get("photo_path")]
prompts = {
"ending": ("A triumphant golden-hour ending poster for a city adventure, warm beige and "
"teal tones, celebratory and cinematic, no text"),
"group": ("A joyful group celebration scrapbook poster, warm pastel beige tones, friends "
"adventuring together, confetti, no text"),
}
return generate_poster_sync(f"{session_id}-{kind}", prompts.get(kind, prompts["ending"]), photo_paths=paths)
def do_win_endpost(session_id):
yield None, "🎨 Painting your ending poster…"
yield _win_poster_gpu(session_id, "ending")
def do_win_grouppost(session_id):
yield None, "🎨 Painting your group poster…"
yield _win_poster_gpu(session_id, "group")
def do_new():
return (*vis("home"), "home", "", "", "", False)
# ── Live form validation (pure UI, no backend calls) ──────────────────────
# Mirror the room-code charset/length from rooms so the live hint matches what
# join_room() will accept (it upper-cases + strips before checking).
_CODE_LEN = 6
_CODE_CHARS = getattr(rooms, "_CODE_ALPHABET", "ABCDEFGHJKMNPQRSTUVWXYZ23456789")
def validate_join_code(code: str, name: str):
"""Live feedback for the join form: code counter/validity + submit gating.
Returns (info_html, submit_update). Purely presentational — never calls
the backend; join_room() still does the authoritative check on submit.
"""
raw = (code or "").strip().upper()
n = len(raw)
bad = [c for c in raw if c not in _CODE_CHARS]
has_name = bool((name or "").strip())
if n == 0:
pill = "<span class='cq-pill wait'>enter your 6-character code</span>"
elif n < _CODE_LEN:
pill = f"<span class='cq-pill wait'>{n}/{_CODE_LEN} characters</span>"
elif n > _CODE_LEN or bad:
detail = "too long" if n > _CODE_LEN else f"unexpected: {' '.join(sorted(set(bad)))}"
pill = f"<span class='cq-pill alert'>{n}/{_CODE_LEN} · {detail}</span>"
else:
pill = f"<span class='cq-pill live'>✓ {n}/{_CODE_LEN} · looks good</span>"
name_pill = "" if has_name else "<span class='cq-pill wait'>add your name</span>"
info = f"<div class='cq-strip' style='margin:4px 0 2px'>{pill}{name_pill}</div>"
valid = (n == _CODE_LEN and not bad and has_name)
return info, gr.update(interactive=valid)
# Button busy-state helpers for .then() chaining around LLM/IO calls.
# Disabling during the call prevents double-submits and gives a visual cue;
# the in-component text loaders ("🌀 …") still communicate progress.
def _btn_off():
return gr.update(interactive=False)
def _btn_on():
return gr.update(interactive=True)
_GTYPE_LABELS = {
"scavenger_hunt": "Scavenger hunt",
"hide_and_seek": "Hide &amp; seek",
"tag": "Tag",
}
def create_summary(name, gtype, city, area, teams, duration, players):
"""Live one-line recap of the create form. Pure string formatting."""
bits = [
_GTYPE_LABELS.get(gtype, gtype),
(city or "Paris").strip(),
]
if (area or "").strip():
bits.append(area.strip())
bits.append(f"{int(teams)} team{'s' if int(teams) != 1 else ''}")
bits.append(f"{int(duration)} min")
bits.append(f"{int(players)} players")
summary = " · ".join(bits)
host = (name or "").strip()
who = f"Hosted by {host} — " if host else ""
return (
"<div class='cq-soft' style='padding:12px 16px'>"
f"<span style='color:#CCCCCC;font-size:13px'>{who}</span>"
f"<span style='font-weight:600'>{summary}</span></div>"
)
# ── ASR transcribe-only helper (no save) ──────────────────────────────────
@spaces.GPU(duration=120)
def transcribe_only(audio_path: str, language: str = "en") -> str:
if not audio_path:
return ""
try:
from app.services.asr import transcribe as _t
result = _t(audio_path, language=language or "en")
text = (result.get("transcript") or "").strip()
print(f"[transcribe_only] lang={language} status={result.get('status')} "
f"len={len(text)} preview={text[:80]!r}")
return text
except Exception as e:
print(f"[transcribe_only] {type(e).__name__}: {e}")
return ""
# ══════════════════════════════════════════════════════════════════════════
# BLOCKS
# ══════════════════════════════════════════════════════════════════════════
with gr.Blocks(title="CityQuest · AI") as demo:
# per-browser state
s_session = gr.State("")
s_player = gr.State("")
s_team = gr.State("")
s_host = gr.State(False)
s_screen = gr.State("home")
s_task = gr.State("")
s_taskids = gr.State("[]")
s_lang = gr.State("en") # voice-journal ISO code, set on the Create screen
s_poll_state = gr.State(("", 0, POLL_FAST)) # (signature, idle_ticks, interval)
gr.HTML(
"<div style='text-align:center;padding:22px 0 4px'>"
"<span class='cq-title' style='font-size:30px'>🌲 CityQuest<span style='color:#4CAF50'> · AI</span></span>"
"<div style='color:#CCCCCC;font-size:14px;margin-top:4px'>explore your city, the unhurried way</div></div>"
)
# ── SCREEN: HOME ──────────────────────────────────────────────────────
with gr.Group(visible=True, elem_classes=["cq-screen", "cq-home"]) as g_home:
gr.HTML("<div style='max-width:680px;margin:24px auto 6px;text-align:center'>"
"<div class='cq-title' style='font-size:clamp(2rem,5vw,3rem)'>A calm, playful city adventure</div>"
"</div>")
with gr.Row():
with gr.Column():
with gr.Group(elem_classes=["cq-card", "cq-home-card"]):
gr.Markdown("Host a new adventure and invite your friends.", elem_classes=["landing-description"])
home_create = gr.Button("Create a room", variant="primary", size="lg", elem_id="home-create", elem_classes=["landing-btn"])
with gr.Column():
with gr.Group(elem_classes=["cq-card", "cq-home-card"]):
gr.Markdown("Got a code from a friend? Hop in.", elem_classes=["landing-description"])
home_join = gr.Button("Join a room", variant="secondary", size="lg", elem_id="home-join", elem_classes=["landing-btn"])
# ── SCREEN: CREATE ────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen", "cq-create"]) as g_create:
with gr.Column(elem_classes=["cq-card"]):
gr.Markdown("## Host a new adventure")
c_name = gr.Textbox(label="What to call you?", placeholder="e.g. Arjun", value="")
with gr.Row():
c_type = gr.Dropdown(["scavenger_hunt", "hide_and_seek", "tag"],
value="scavenger_hunt", label="Game type",
filterable=False, elem_classes=["cq-dropdown"])
c_teams = gr.Slider(2, 4, value=2, step=1, label="Teams (min 2)")
with gr.Row():
c_city = gr.Textbox(label="City", placeholder="Paris")
c_area = gr.Textbox(label="Area / neighbourhood", placeholder="Le Marais")
with gr.Row():
c_duration = gr.Slider(15, 120, value=45, step=5, label="Duration (min)")
# Starts at 0 / hidden because the default game type is
# scavenger_hunt (no head start). toggle_head_start raises it to
# 60 and reveals it when a staggered-start game type is picked.
c_headstart = gr.Slider(0, 300, value=0, step=15,
label="Head start between teams (sec)",
visible=False)
with gr.Row():
c_diff = gr.Dropdown(["easy", "medium", "hard"], value="medium", label="Difficulty",
elem_classes=["cq-dropdown"])
c_age = gr.Dropdown(["kids", "teens", "adults", "all"], value="all", label="Age group",
elem_classes=["cq-dropdown"])
with gr.Row():
c_energy = gr.Dropdown(["chill", "balanced", "high"], value="balanced", label="Energy",
elem_classes=["cq-dropdown"])
c_players = gr.Slider(2, 20, value=4, step=1, label="Players")
with gr.Row():
c_language = gr.Dropdown(LANGUAGE_CHOICES, value="English",
label="Voice journal language",
filterable=False, elem_classes=["cq-dropdown"])
create_summary_html = gr.HTML("", elem_classes=["game-summary"])
with gr.Row():
create_back = gr.Button("← Back", variant="secondary", elem_id="create-back",
elem_classes=["back-btn", "create-btn"])
create_submit = gr.Button("Open the lobby →", variant="primary", elem_id="create-submit",
elem_classes=["open-lobby-btn", "create-btn"])
# ── SCREEN: JOIN ──────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_join:
with gr.Column(elem_classes=["cq-card"]):
gr.Markdown("## Join a room")
j_code = gr.Textbox(label="Room code", placeholder="e.g. WAVE42")
join_code_info = gr.HTML("")
j_name = gr.Textbox(label="Your name", placeholder="e.g. Maya")
join_feedback = gr.Markdown("")
with gr.Row():
join_back = gr.Button("← Back", variant="secondary", elem_id="join-back")
join_submit = gr.Button("Join →", variant="primary",
interactive=False, elem_id="join-submit")
# ── SCREEN: LOBBY ─────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_lobby:
with gr.Column(elem_classes=["cq-card"]):
lobby_code = gr.HTML("")
lobby_copy = gr.HTML("")
gr.HTML("<div class='section-label'>Pick your team</div>")
with gr.Row():
team_btn = [gr.Button("", variant="secondary", visible=False,
elem_classes=["team-pick"]) for _ in range(4)]
gr.HTML("<div class='section-label' style='margin-top:18px'>Who's here</div>")
lobby_roster = gr.HTML("")
lobby_prep = gr.HTML("", elem_classes=["lobby-prep"])
lobby_hint = gr.Markdown(HINT_GUEST, elem_classes=["lobby-hint"])
with gr.Row():
lobby_leave = gr.Button("Leave", variant="secondary", elem_id="lobby-leave")
lobby_start = gr.Button("Start the adventure →", variant="primary",
visible=False, interactive=False,
elem_id="lobby-start", elem_classes=["lobby-start-btn"])
# Host-only short-handed override, sitting just below Start as a green
# underlined text link (not a button). Hidden until poll_tick reveals
# it (min_ok and not full); the normal Start covers the full case.
lobby_start_anyway = gr.Button("▶ Start anyway", variant="secondary",
visible=False, interactive=False,
elem_id="lobby-start-anyway",
elem_classes=["lobby-start-link"])
# ── SCREEN: PLAY ──────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_play:
play_timer = gr.HTML("")
gr.HTML("<div class='section-label' style='margin-top:14px'>Teams</div>")
play_status = gr.HTML("")
# The play screen lands directly in the task detail; the horizontal task
# tabs below are the only task switcher (the old vertical task wall was
with gr.Group(visible=True, elem_classes=["cq-reveal"]) as g_detail:
# Wrap everything in a master row with spacers to align the ENTIRE page
with gr.Row():
# 1. Left global spacer
gr.Column(scale=1, min_width=0)
# 2. Main Central Column (Aligns Tabs, Text, and Boxes perfectly)
with gr.Column(scale=10, elem_classes=["play-card"]):
# --- Tabs & Nav ---
with gr.Row(elem_classes=["task-tab-row"]):
detail_tab_btn = [gr.Button("", visible=False, size="sm",
elem_classes=["task-tab"]) for _ in range(12)]
with gr.Row(elem_classes=["task-nav-row"]):
prev_task_btn = gr.Button("← Prev", size="sm", scale=0,
elem_id="td-prev", elem_classes=["task-nav-btn"])
task_position_label = gr.Markdown("", elem_classes=["task-position"])
next_task_btn = gr.Button("Next →", size="sm", scale=0,
elem_id="td-next", elem_classes=["task-nav-btn"])
# --- Text Content ---
detail_md = gr.Markdown("")
detail_hint = gr.HTML("")
detail_safety = gr.HTML("")
# --- Voice & Photos ---
with gr.Row(equal_height=True, elem_classes=["panels-row"]):
with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]):
gr.HTML("<div class='section-label panel-header'>🎙️ Voice journal</div>")
d_audio = gr.Audio(sources=["microphone", "upload"], type="filepath",
format="wav", show_label=False,
elem_classes=["cq-audio"],
waveform_options=gr.WaveformOptions(
show_recording_waveform=False))
d_transcript = gr.Textbox(label="Your notes", lines=3,
placeholder="Transcribe your audio, or type a note…")
with gr.Row():
d_transcribe = gr.Button("Transcribe", variant="secondary",
elem_id="td-transcribe",
elem_classes=["btn-secondary-action"])
d_savejournal = gr.Button("Save journal", variant="primary",
elem_id="td-savejournal",
elem_classes=["btn-primary-action"])
d_journal_log = gr.Markdown("")
with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]):
gr.HTML("<div class='section-label panel-header'>📸 Photos &amp; taglines</div>")
d_photo = gr.Image(type="filepath", show_label=False, height=170)
d_tagline = gr.Textbox(label="Caption", placeholder="A caption for this photo", lines=3)
with gr.Row():
d_addphoto = gr.Button("Add photo", variant="primary",
elem_id="td-addphoto",
elem_classes=["btn-primary-action"])
d_gallery = gr.Gallery(label="Photos for this task", columns=3, height=170)
# --- Footer Actions ---
d_capture_status = gr.Markdown("")
with gr.Row(elem_classes=["hints-header"]):
gr.Markdown("💡 Stuck? Take a hint..", elem_classes=["guide-header", "hints-label"])
d_points = gr.Markdown("", elem_classes=["points-display"])
with gr.Row(elem_classes=["hints-input-row"]):
d_ask_q = gr.Textbox(show_label=False, scale=1,
placeholder="Ask for a clue, or tell me where you're stuck…",
elem_classes=["hint-input"])
d_ask_btn = gr.Button(f"Get a hint −{HINT_PENALTY} pts", scale=0,
min_width=220,
elem_id="td-ask", elem_classes=["btn-hint"])
d_ask_a = gr.Markdown("")
d_hint_log = gr.Markdown("")
d_proof = gr.Markdown("")
completion_instruction = gr.Markdown(
"🔒 Add a photo or voice note to unlock completion",
elem_classes=["completion-instruction", "locked"])
d_complete = gr.Button("🔒 Mark completed", variant="primary",
interactive=False, elem_id="td-complete",
elem_classes=["mark-complete-btn", "complete-btn", "locked"])
d_finish = gr.Button("🏁 Finish & wait for others", variant="primary",
elem_id="td-finish", elem_classes=["finish-btn"])
# 3. Right global spacer
gr.Column(scale=1, min_width=0)
# ── SCREEN: WAIT-LOBBY ────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_wait:
with gr.Column(elem_classes=["cq-card"]):
gr.Markdown("## 🌿 You're done — nicely paced!")
gr.Markdown("*Waiting for the other teams to wrap up. Meanwhile, make something silly:*")
wait_status = gr.HTML("")
# Live "who's still playing" line + manual advance to results. The
# button stays DISABLED until the results gate passes (all active
# teams finished AND min_ok); poll_tick flips its enabled state. The
# wait screen NEVER auto-navigates — this click is the only path.
wait_status_line = gr.Markdown("", elem_classes=["lobby-hint"])
wait_see_results = gr.Button("🏆 See final results", variant="primary",
interactive=False, elem_id="wait-see-results",
elem_classes=["lobby-start-btn"])
with gr.Row():
wait_funny_recap = gr.Button("😄 Funny recap", variant="secondary", elem_id="wait-funny-recap")
wait_funny_post = gr.Button("🎨 Funny poster", variant="secondary", elem_id="wait-funny-post")
wait_recap_md = gr.Markdown("")
wait_post_img = gr.Image(label="Funny poster", height=320, visible=True, elem_classes=["cq-poster"])
wait_post_status = gr.Markdown("")
# ── SCREEN: WIN ───────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen", "cq-win"]) as g_win:
with gr.Column(elem_classes=["cq-win-standings"]):
win_scoreboard = gr.Markdown("Final standings will appear here.")
with gr.Group(elem_classes=["cq-card", "cq-win-celebrate"]):
gr.Markdown("### 🎉 Celebrate the run")
win_recap = gr.Button("📖 Winning recap", variant="primary", elem_id="win-recap", elem_classes=["cq-win-btn"])
win_recap_md = gr.Markdown("")
with gr.Row():
win_endpost = gr.Button("🌅 Ending poster", variant="secondary", elem_id="win-endpost", elem_classes=["cq-win-btn"])
win_grouppost = gr.Button("👯 Group poster", variant="secondary", elem_id="win-grouppost", elem_classes=["cq-win-btn"])
win_poster_img = gr.Image(label="Poster", height=320, visible=True)
win_poster_status = gr.Markdown("")
win_new = gr.Button("✨ Start a new adventure", variant="primary", elem_id="win-new", elem_classes=["cq-win-btn", "cq-win-new"])
# ── Shared poll timer ─────────────────────────────────────────────────
poll = gr.Timer(1.5)
nav_groups = [g_home, g_create, g_join, g_lobby, g_play, g_wait, g_win]
poll.tick(poll_tick, inputs=[s_session, s_screen, s_team, s_player, s_host, s_poll_state],
outputs=nav_groups + [s_screen, lobby_code, lobby_roster, lobby_start,
lobby_start_anyway, lobby_hint,
play_timer, play_status, wait_status, wait_status_line,
wait_see_results, win_scoreboard,
poll, s_poll_state])
# ── Navigation ────────────────────────────────────────────────────────
create_inputs = [c_name, c_type, c_city, c_area, c_teams, c_duration, c_players]
home_create.click(lambda: (*vis("create"), "create"), outputs=nav_groups + [s_screen]
).then(create_summary, inputs=create_inputs, outputs=[create_summary_html])
home_join.click(lambda: (*vis("join"), "join"), outputs=nav_groups + [s_screen])
create_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen])
join_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen])
# ── Head start visibility: hide for scavenger hunt (irrelevant there) ──
def toggle_head_start(game_type):
if game_type == "scavenger_hunt":
return gr.update(visible=False, value=0)
return gr.update(visible=True, value=60)
c_type.change(toggle_head_start, inputs=[c_type], outputs=[c_headstart])
# ── Live form validation (pure UI) ────────────────────────────────────
for comp in create_inputs:
comp.change(create_summary, inputs=create_inputs, outputs=[create_summary_html])
for comp in (j_code, j_name):
comp.change(validate_join_code, inputs=[j_code, j_name],
outputs=[join_code_info, join_submit])
# Route the host's chosen voice-journal language (full name) into the ISO
# code held in s_lang, which the play-screen audio handlers consume.
c_language.change(lambda name: LANGUAGE_TO_ISO.get(name, "en"),
inputs=[c_language], outputs=[s_lang])
create_submit.click(_btn_off, None, [create_submit]).then(
do_create,
inputs=[c_name, c_type, c_city, c_area, c_duration, c_teams, c_headstart,
c_diff, c_age, c_energy, c_players],
outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host,
lobby_code, lobby_roster, lobby_start,
lobby_copy, lobby_hint] + team_btn,
).then(prepare_game, inputs=[s_session], outputs=[lobby_prep]
).then(_btn_on, None, [create_submit])
join_submit.click(
do_join, inputs=[j_code, j_name],
outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host,
join_feedback, lobby_code, lobby_roster, lobby_start,
lobby_copy, lobby_hint] + team_btn,
)
for i, tb in enumerate(team_btn):
tb.click(make_pick(i), inputs=[s_session, s_player],
outputs=[s_team, lobby_roster] + team_btn)
lobby_leave.click(do_leave, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host])
# Both Start and Start-anyway funnel through the SAME do_start path — no
# duplicated start logic; only the gating (in poll_tick) differs.
lobby_start.click(_btn_off, None, [lobby_start]).then(
do_start, inputs=[s_session, s_team],
outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep]
).then(_btn_on, None, [lobby_start])
lobby_start_anyway.click(_btn_off, None, [lobby_start_anyway]).then(
do_start, inputs=[s_session, s_team],
outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep]
).then(_btn_on, None, [lobby_start_anyway])
# ── Task tabs → detail ────────────────────────────────────────────────
# Order must match _open_task's return tuple (19 fixed + 12 tab buttons).
detail_open_outputs = [s_task, g_detail, detail_md, detail_hint, detail_safety,
d_gallery, d_proof, d_complete, completion_instruction,
d_transcript, d_audio, d_tagline, d_ask_a, d_capture_status,
d_journal_log, task_position_label,
d_points, d_hint_log, d_ask_btn] + detail_tab_btn
# Keep the task-id list fresh and auto-open the first task when a client
# first reaches the play screen (host via Start, guests via poll).
poll.tick(play_tick, inputs=[s_session, s_screen, s_team, s_taskids, s_task],
outputs=detail_open_outputs + [s_taskids])
# The in-detail horizontal task tabs index into s_taskids.
for i, dtb in enumerate(detail_tab_btn):
dtb.click(open_task_factory(i), inputs=[s_session, s_team, s_taskids], outputs=detail_open_outputs)
prev_task_btn.click(go_adjacent_factory(-1), inputs=[s_session, s_team, s_taskids, s_task],
outputs=detail_open_outputs)
next_task_btn.click(go_adjacent_factory(+1), inputs=[s_session, s_team, s_taskids, s_task],
outputs=detail_open_outputs)
d_finish.click(do_finish, inputs=[s_session, s_team],
outputs=nav_groups + [s_screen, wait_status])
d_transcribe.click(_btn_off, None, [d_transcribe]).then(
do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript]
).then(_btn_on, None, [d_transcribe])
# Auto-transcribe once the clip is actually on the server. stop_recording
# (mic) and upload (file) fire *after* Gradio has committed/uploaded the
# audio, so do_transcribe always sees a real filepath. This removes the
# race where clicking "Transcribe" immediately after recording sent a
# stale/None path and seemed to do nothing until you clicked around.
d_audio.stop_recording(_btn_off, None, [d_transcribe]).then(
do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript]
).then(_btn_on, None, [d_transcribe])
d_audio.upload(_btn_off, None, [d_transcribe]).then(
do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript]
).then(_btn_on, None, [d_transcribe])
d_savejournal.click(_btn_off, None, [d_savejournal]).then(
do_save_journal,
inputs=[s_session, d_transcript, d_audio, s_lang, s_task, s_team],
outputs=[d_capture_status, d_proof, d_complete, completion_instruction,
d_journal_log, d_audio, d_transcript]
).then(_btn_on, None, [d_savejournal])
d_addphoto.click(do_add_photo, inputs=[s_session, d_photo, d_tagline, s_task, s_team],
outputs=[d_capture_status, d_gallery, d_proof, d_complete,
completion_instruction, d_photo, d_tagline])
# on_get_hint manages the button itself (disable while thinking, then set
# the "(N left)"/"No hints left" state), so it does NOT use the generic
# _btn_off/_btn_on toggle — that would re-enable a spent button.
d_ask_btn.click(
on_get_hint, inputs=[d_ask_q, s_session, s_team, s_task],
outputs=[d_ask_a, d_points, d_ask_btn, d_hint_log]
)
# After completing, re-render the same task in place so the ✓ shows on its
# tab and the completed state is reflected (no wall to bounce back to).
d_complete.click(do_complete, inputs=[s_session, s_team, s_task],
outputs=detail_open_outputs)
# ── Wait-lobby actions ────────────────────────────────────────────────
# Manual, click-only advance to the win screen (gate/enabled by poll_tick).
wait_see_results.click(do_see_results, inputs=[s_session],
outputs=nav_groups + [s_screen, win_scoreboard])
wait_funny_recap.click(_btn_off, None, [wait_funny_recap]).then(
do_funny_recap, inputs=[s_session, s_team], outputs=[wait_recap_md]
).then(_btn_on, None, [wait_funny_recap])
wait_funny_post.click(_btn_off, None, [wait_funny_post]).then(
do_funny_post, inputs=[s_session, s_team], outputs=[wait_post_img, wait_post_status]
).then(_btn_on, None, [wait_funny_post])
# ── Winning actions ───────────────────────────────────────────────────
win_recap.click(_btn_off, None, [win_recap]).then(
do_win_recap, inputs=[s_session], outputs=[win_recap_md]
).then(_btn_on, None, [win_recap])
win_endpost.click(_btn_off, None, [win_endpost]).then(
do_win_endpost, inputs=[s_session], outputs=[win_poster_img, win_poster_status]
).then(_btn_on, None, [win_endpost])
win_grouppost.click(_btn_off, None, [win_grouppost]).then(
do_win_grouppost, inputs=[s_session], outputs=[win_poster_img, win_poster_status]
).then(_btn_on, None, [win_grouppost])
win_new.click(do_new, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host])
# ── Entry point ────────────────────────────────────────────────────────────
if __name__ == "__main__":
# ssr_mode=False avoids the Node SSR proxy that emits spurious
# `sse_stream 404` errors with our 1.5s polling timer.
demo.launch(theme=gradio_theme(), css=CSS, js=JS_INIT, ssr_mode=False,share=True)