Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import random | |
| import html | |
| import gradio as gr | |
| import agents | |
| import voice | |
| import transcribe | |
| import events | |
| import router | |
| import world_state | |
| import campaign | |
| from worlds import get_world, WORLDS, list_worlds | |
| MOCK = os.environ.get("TINYWORLD_MOCK", "0") == "1" | |
| # On a ZeroGPU Space (TINYWORLD_INFER=local) the @spaces.GPU functions must be | |
| # registered at startup or ZeroGPU shuts the Space down. Import inference eagerly | |
| # in that mode only (loads torch but NOT the models — those stay lazy). Skipped on | |
| # mock/modal so dev and tests never need torch/spaces. | |
| if not MOCK and os.environ.get("TINYWORLD_INFER", "modal").lower() == "local": | |
| try: | |
| import inference # noqa: F401 (registers ZeroGPU GPU tasks) | |
| import threading | |
| # warm the model into CPU RAM in the background so startup isn't blocked | |
| threading.Thread(target=inference.warmup, daemon=True).start() | |
| print("[app] ZeroGPU inference module loaded; warming model in background") | |
| except Exception as e: | |
| print(f"[app] inference preimport failed: {e}") | |
| MOOD_EMOJI = { | |
| "happy": "😊", "stressed": "😰", "bored": "😐", "excited": "🤩", | |
| "hungry": "🍕", "tired": "😴", "nostalgic": "🥹", "curious": "🧐", | |
| "proud": "😤", "embarrassed": "🫣", | |
| } | |
| VIBE_COLORS = { | |
| "energy": ("#ff6b6b", "#ffb15f"), | |
| "hunger": ("#ffd76a", "#ff8a5f"), | |
| "social": ("#38e8ff", "#9b6bff"), | |
| } | |
| CHAR_COLORS = { | |
| "Marta Voss": "#ff5fa2", "Jay Park": "#ff8a5f", "Nia Okafor": "#7CFFB2", | |
| "Luca Bell": "#38e8ff", "Priya Raman": "#9b6bff", | |
| } | |
| PALETTE = ["#ff5fa2", "#ff8a5f", "#7CFFB2", "#38e8ff", "#9b6bff"] | |
| def esc(value): | |
| return html.escape(str(value), quote=True) | |
| def char_color(world, name): | |
| for i, c in enumerate(world["cast"]): | |
| if c["name"] == name: | |
| return c.get("color") or CHAR_COLORS.get(name) or PALETTE[i % len(PALETTE)] | |
| return "#9b6bff" | |
| # ---------------------------------------------------------------- canvas payloads | |
| def build_world_payload(world_id): | |
| world = get_world(world_id) | |
| board = world["board"] | |
| cast = [] | |
| for c in world["cast"]: | |
| home_key = c.get("home", "square") | |
| tile = board["hotspots_tile"].get(home_key, [6, 6]) | |
| cast.append({ | |
| "name": c["name"], "short": c["name"].split()[0], | |
| "emoji": c.get("emoji", "👤"), | |
| "color": char_color(world, c["name"]), | |
| "home": tile, | |
| }) | |
| payload = { | |
| "cols": board["cols"], "rows": board["rows"], | |
| "roads": board["roads"], "plaza": board["plaza"], | |
| "plaza_center": board.get("plaza_center", [board["cols"] / 2, board["rows"] / 2]), | |
| "buildings": board["buildings"], "trees": board.get("trees", []), | |
| "props": board.get("props", []), "cast": cast, | |
| "hotspots": board.get("hotspots_tile", {}), "ambient": board.get("ambient", 12), | |
| } | |
| return json.dumps(payload) | |
| def build_reactions_payload(world_id, reactions): | |
| world = get_world(world_id) | |
| board = world["board"] | |
| hs = board["hotspots_tile"] | |
| acts = board.get("activities", {}) | |
| out = {"ts": time.time(), "reactions": []} | |
| for r in reactions: | |
| name = r["name"] | |
| key = r.get("moved_to") or world_state.get_position(world_id, name) | |
| act = acts.get(name, {}) | |
| action = (r.get("action") or "").strip() | |
| short_action = (action[:24].rstrip() + "…") if len(action) > 25 else action | |
| label = short_action or act.get("label", "") | |
| vehicle = act.get("vehicle", "") | |
| if key and key in hs: | |
| tgt = hs[key] | |
| else: | |
| home = next((c.get("home") for c in world["cast"] if c["name"] == name), None) | |
| tgt = hs.get(home) or hs.get("square") or [0, 0] | |
| out["reactions"].append({ | |
| "name": name, "short": name.split()[0], | |
| "mood": r["mood"], "moodEmoji": MOOD_EMOJI.get(r["mood"], "😐"), | |
| "text": r["text"], "target": tgt, "activity": label, "vehicle": vehicle, | |
| "running": False, | |
| }) | |
| return json.dumps(out) | |
| def build_state_payload(world_id): | |
| world = get_world(world_id) | |
| board = world["board"] | |
| hs = board["hotspots_tile"] | |
| out = {"ts": time.time(), "silent": True, "reactions": []} | |
| for c in world["cast"]: | |
| name = c["name"] | |
| key = world_state.get_position(world_id, name) or c.get("home", "square") | |
| tile = hs.get(key) or hs.get(c.get("home")) or hs.get("square") or [0, 0] | |
| mood = world_state.get_mood(world_id, name) | |
| out["reactions"].append({ | |
| "name": name, | |
| "short": name.split()[0], | |
| "mood": mood, | |
| "moodEmoji": MOOD_EMOJI.get(mood, "😐"), | |
| "text": "", | |
| "target": tile, | |
| "activity": world_state.get_activity(world_id, name), | |
| "vehicle": "", | |
| "running": False, | |
| }) | |
| return json.dumps(out) | |
| # ---------------------------------------------------------------- roster / ticker / log | |
| def render_vibe_bar(label, value, colors): | |
| c1, c2 = colors | |
| pct = int(value * 100) | |
| return ( | |
| f'<div class="vibe-row"><span class="vibe-label">{label}</span>' | |
| f'<div class="vibe-track"><div class="vibe-fill" style="width:{pct}%;' | |
| f'background:linear-gradient(90deg,{c1},{c2});color:{c1}"></div></div>' | |
| f'<span class="vibe-val">{pct}</span></div>' | |
| ) | |
| def render_roster(world_id): | |
| world = get_world(world_id) | |
| world_state.init_cast(world) | |
| state = world_state.get_state(world_id) | |
| needs = world_state.get_needs(world_id) | |
| cards = [] | |
| for c in world["cast"]: | |
| name = c["name"] | |
| mood = state["moods"].get(name, "curious") | |
| mood_e = MOOD_EMOJI.get(mood, "😐") | |
| n = needs.get(name, {"energy": 60, "hunger": 35, "social": 50}) | |
| cards.append( | |
| f'<div class="roster-card" style="border-left-color:{char_color(world, name)}">' | |
| f'<div class="roster-emoji">{c.get("emoji", "👤")}</div>' | |
| f'<div class="roster-name">{name.split()[0]}</div>' | |
| f'<div class="roster-mood">{mood_e} {mood}</div>' | |
| f'<div class="roster-meters">{render_vibe_bar("⚡", n["energy"] / 100, VIBE_COLORS["energy"])}' | |
| f'{render_vibe_bar("🍽", n["hunger"] / 100, VIBE_COLORS["hunger"])}' | |
| f'{render_vibe_bar("💬", n["social"] / 100, VIBE_COLORS["social"])}</div>' | |
| f'</div>' | |
| ) | |
| return f'<div class="roster-rail">{"".join(cards)}</div>' | |
| def render_ticker(world_id): | |
| state = world_state.get_state(world_id) | |
| day, game_time, paused = world_state.get_game_time(world_id) | |
| chaos = state["chaos"] | |
| filled = min(int(chaos * 5), 5) | |
| bar = "▰" * filled + "▱" * (5 - filled) | |
| unlocked = " 🔓" if chaos >= 0.5 else "" | |
| return ( | |
| f'<div class="progress-ticker">' | |
| f'<span class="ticker-day">DAY {day} · {world_state.format_time(game_time)}</span><span class="ticker-sep">·</span>' | |
| f'<span class="ticker-events">{"PAUSED" if paused else "LIVE TIME"}</span><span class="ticker-sep">·</span>' | |
| f'<span class="ticker-events">⚡ {state["event_count"]}</span><span class="ticker-sep">·</span>' | |
| f'<span class="ticker-chaos">CHAOS {bar}{unlocked}</span></div>' | |
| ) | |
| def render_daily_log(world_id): | |
| world = get_world(world_id) | |
| timeline = world_state.get_timeline(world_id) | |
| rows = [] | |
| for c in world["cast"]: | |
| name = c["name"] | |
| entries = timeline.get(name, [])[-4:] | |
| text = " · ".join(esc(e) for e in entries) if entries else "No entries yet." | |
| rows.append( | |
| f'<div class="daily-row"><b style="color:{char_color(world, name)}">{esc(name.split()[0])}</b> ' | |
| f'<span>{text}</span></div>' | |
| ) | |
| return f'<div class="daily-log-inner">{"".join(rows)}</div>' | |
| def render_mode_badge(status=None): | |
| status = status or agents.get_runtime_status() | |
| mode = status.get("mode") | |
| model = status.get("model") or "unknown" | |
| latency = status.get("latency") | |
| error = status.get("error") | |
| if mode == "live": | |
| label = f"🟢 Live · {model.split('/')[-1]}" | |
| if latency is not None: | |
| label += f" · {latency:.1f}s" | |
| cls = "mode-live" | |
| elif mode == "waking": | |
| label = f"🟡 Waking · {model.split('/')[-1]}" | |
| cls = "mode-waking" | |
| elif mode == "error": | |
| msg = f" · {error}" if error else "" | |
| label = f"🔴 LLM error{msg}" | |
| cls = "mode-error" | |
| else: | |
| label = "🟡 Offline demo (mock)" | |
| cls = "mode-mock" | |
| return f'<div class="mode-badge {cls}">{esc(label)}</div>' | |
| def render_town_log(world_id, reactions=None, followup=None): | |
| world = get_world(world_id) | |
| if not reactions: | |
| return ('<div class="town-log-inner"><div class="town-log-empty">' | |
| 'The town log is empty.<br>Throw an event to start the story.</div></div>') | |
| lines = [] | |
| for r in reactions: | |
| char = next((c for c in world["cast"] if c["name"] == r["name"]), None) | |
| emoji = char.get("emoji", "👤") if char else "👤" | |
| name = esc(r["name"]) | |
| text = esc(r["text"]) | |
| mood = esc(r["mood"]) | |
| lines.append( | |
| f'<div class="log-entry"><div class="log-head">' | |
| f'<span class="log-emoji">{emoji}</span><span class="log-name">{name}</span>' | |
| f'<span class="log-mood">{MOOD_EMOJI.get(r["mood"], "😐")}</span></div>' | |
| f'<div class="log-text" data-mood="{mood}">"{text}"</div></div>' | |
| ) | |
| if followup: | |
| lines.append(f'<div class="log-entry log-followup"><div class="log-text">🔗 <em>{esc(followup["text"])}</em></div></div>') | |
| gossip_from = [r for r in reactions if random.random() < 0.3] | |
| if gossip_from: | |
| source = random.choice(gossip_from) | |
| others = [c["name"] for c in world["cast"] if c["name"] != source["name"]] | |
| if others: | |
| target = random.choice(others) | |
| snippet = source["text"][:60] + ("…" if len(source["text"]) > 60 else "") | |
| lines.append(f'<div class="log-entry log-gossip"><div class="log-text">🗣️ ' | |
| f'<em>Word spreads: {esc(target.split()[0])} hears "{esc(snippet)}"</em></div></div>') | |
| world_state.add_gossip(world_id, target, snippet) | |
| return f'<div class="town-log-inner">{"".join(lines)}</div>' | |
| def render_explainer(world_id, reactions=None, focus=None): | |
| world = get_world(world_id) | |
| if not reactions: | |
| return ('<div class="explain-inner"><div class="explain-empty">' | |
| 'Pick a <b>scenario</b> or throw an event, then this panel explains ' | |
| '<b>why each person reacted the way they did</b> — great for teaching how ' | |
| 'different people respond to the same situation.</div></div>') | |
| parts = [] | |
| if focus: | |
| parts.append(f'<div class="explain-focus">🎓 <b>Think about:</b> {esc(focus)}</div>') | |
| else: | |
| parts.append('<div class="explain-focus">🎓 <b>Why did they react this way?</b></div>') | |
| for r in reactions: | |
| c = next((x for x in world["cast"] if x["name"] == r["name"]), None) | |
| if not c: | |
| continue | |
| trait = (c.get("traits") or ["distinct"])[0] | |
| hint = c.get("catchphrase_hint", "responds in their own way") | |
| col = char_color(world, r["name"]) | |
| understanding = esc((r.get("understanding") or "").strip()) | |
| action = esc((r.get("action") or "").strip()) | |
| first_name = esc(r["name"].split()[0]) | |
| mood = esc(r["mood"]) | |
| trait_text = esc(trait) | |
| if understanding or action: | |
| reason = ( | |
| f'reads it as: <em>{understanding}</em> ' if understanding else '' | |
| ) + ( | |
| f'so they <b>{action}</b>.' if action else '' | |
| ) | |
| parts.append( | |
| f'<div class="explain-row"><b style="color:{col}">{first_name}</b> ' | |
| f'(feeling <em>{mood}</em>, naturally <em>{trait_text}</em>) {reason}</div>' | |
| ) | |
| else: | |
| parts.append( | |
| f'<div class="explain-row"><b style="color:{col}">{first_name}</b> ' | |
| f'feels <em>{mood}</em> and is naturally <em>{trait_text}</em>, ' | |
| f'so they {esc(hint)}.</div>' | |
| ) | |
| return f'<div class="explain-inner">{"".join(parts)}</div>' | |
| # ---------------------------------------------------------------- crisis mode UI | |
| def render_crisis_hud(world_id): | |
| v = campaign.view(get_world(world_id)) | |
| cr = v["crisis"] | |
| if not v["active"] or not cr: | |
| return '<div class="crisis-hud crisis-hud-idle">No crisis active — switch to 🎬 Campaign and press <b>Start Campaign</b>.</div>' | |
| chips = [] | |
| for req in cr["requirements"]: | |
| cls = "met" if req["met"] else "open" | |
| icon = "✅" if req["met"] else "⏳" | |
| opt = " (optional)" if req["optional"] else "" | |
| chips.append(f'<span class="crisis-chip {cls}">{icon} {esc(req["label"])}{esc(opt)}</span>') | |
| chaos_pct = int(v["chaos"] * 100) | |
| res = int(v["town_resilience"]) | |
| return ( | |
| f'<div class="crisis-hud">' | |
| f'<div class="crisis-top"><span class="crisis-title">🚨 {esc(cr["title"])}</span>' | |
| f'<span class="crisis-meta">Chapter {v["chapter_index"] + 1}/{v["chapter_count"]} · ' | |
| f'Round {cr["round"]}/{cr["time_limit"]}</span></div>' | |
| f'<div class="crisis-chips">{"".join(chips)}</div>' | |
| f'<div class="crisis-bars">' | |
| f'<div class="crisis-bar"><span>🔥 Chaos</span><div class="cbar-track">' | |
| f'<div class="cbar-fill chaos" style="width:{chaos_pct}%"></div></div><b>{chaos_pct}</b></div>' | |
| f'<div class="crisis-bar"><span>🛡 Resilience</span><div class="cbar-track">' | |
| f'<div class="cbar-fill res" style="width:{res}%"></div></div><b>{res}</b></div>' | |
| f'</div></div>' | |
| ) | |
| def render_story_card(kind="dispatch", title="", body="", extra=""): | |
| if not title and not body and not extra: | |
| return '<div class="story-card story-hidden"></div>' | |
| title_html = f'<div class="story-title">{esc(title)}</div>' if title else "" | |
| body_html = f'<div class="story-body">{esc(body)}</div>' if body else "" | |
| return f'<div class="story-card story-{kind}">{title_html}{body_html}{extra}</div>' | |
| def build_crisis_payload(world_id): | |
| world = get_world(world_id) | |
| v = campaign.view(world) | |
| cr = v["crisis"] | |
| out = {"ts": time.time(), "active": bool(v["active"] and cr), | |
| "status": v["status"], "chaos": v["chaos"]} | |
| if cr: | |
| hs = world["board"]["hotspots_tile"] | |
| out.update({ | |
| "kind": cr["kind"], "title": cr["title"], | |
| "tile": hs.get(cr["affected_hotspot"]) if cr["affected_hotspot"] else None, | |
| "round": cr["round"], "time_limit": cr["time_limit"], | |
| "requirements": [{"id": r["id"], "label": r["label"], "met": r["met"]} | |
| for r in cr["requirements"]], | |
| }) | |
| return json.dumps(out) | |
| def _outcome_card(res): | |
| oc = res.get("outcome") | |
| grade = res.get("grade") | |
| if oc == "won": | |
| return render_story_card("win", f"✅ Chapter cleared · Grade {grade}", res.get("outro", "")) | |
| if oc == "lost_crisis": | |
| return render_story_card("loss", f"❌ Chapter failed · Grade {grade}", res.get("outro", "")) | |
| if oc == "campaign_lost": | |
| return render_story_card("gameover", "🏚 The town fell", | |
| f'{res.get("outro", "")} Town Resilience hit zero.') | |
| if oc == "finale": | |
| extra = f'<div class="story-reveal">{esc(res.get("reveal", ""))}</div>' | |
| return render_story_card("finale", "🎬 The Long Summer — Finale", res.get("outro", ""), extra) | |
| return render_story_card("dispatch", "", res.get("dispatch", "")) | |
| def scenario_choices(world_id): | |
| world = get_world(world_id) | |
| return [(s["title"], s["id"]) for s in world.get("scenarios", [])] | |
| def render_stage_shell(): | |
| return '<div class="stage" id="tw-stage"><canvas id="tw-canvas"></canvas><div class="stage-vignette"></div></div>' | |
| # ---------------------------------------------------------------- assets | |
| def _read(path): | |
| full = os.path.join(os.path.dirname(__file__), path) | |
| if os.path.exists(full): | |
| with open(full) as f: | |
| return f.read() | |
| return "" | |
| css_content = _read("assets/style.css") | |
| js_content = _read("assets/game.js") | |
| # Force dark mode on load. HF Spaces otherwise loads light mode, which renders | |
| # Gradio's dark text on top of our neon-dark panels — unreadable. With ?__theme=dark | |
| # Gradio uses its light-on-dark palette and everything is legible. | |
| THEME_JS = ( | |
| "(function(){try{var u=new URL(window.location.href);" | |
| "if(u.searchParams.get('__theme')!=='dark'){" | |
| "u.searchParams.set('__theme','dark');window.location.replace(u.href);}}catch(e){}})();\n" | |
| ) | |
| js_content = THEME_JS + js_content | |
| DEFAULT_WORLD = os.environ.get("TW_DEFAULT_WORLD", "maple_street") | |
| world_list = list_worlds() | |
| world_names = {w["id"]: w["name"] for w in world_list} | |
| # ---------------------------------------------------------------- UI | |
| with gr.Blocks(title="TinyWorld — AI Neighborhood Game") as demo: | |
| current_world_id = gr.State(DEFAULT_WORLD) | |
| last_reactions_state = gr.State([]) | |
| with gr.Row(): | |
| with gr.Column(scale=7): | |
| gr.HTML('<div class="tw-topbar"><div class="tw-logo">TINYWORLD</div>' | |
| '<div class="tw-tagline">the town that remembers</div></div>') | |
| with gr.Column(scale=2, min_width=160): | |
| world_picker = gr.Dropdown( | |
| choices=[(world_names.get(w["id"], w["id"]), w["id"]) for w in world_list], | |
| value=DEFAULT_WORLD, label="World") | |
| with gr.Column(scale=3, min_width=210): | |
| ticker_html = gr.HTML(render_ticker(DEFAULT_WORLD)) | |
| with gr.Column(scale=2, min_width=210): | |
| mode_badge_html = gr.HTML(render_mode_badge()) | |
| # the canvas stage (rendered once, never re-rendered → game loop persists) | |
| gr.HTML(render_stage_shell()) | |
| # hidden data channels for the canvas | |
| world_box = gr.Textbox(value=build_world_payload(DEFAULT_WORLD), elem_id="tw-world", | |
| elem_classes="tw-data", label="", interactive=True) | |
| reactions_box = gr.Textbox(value="", elem_id="tw-reactions", elem_classes="tw-data", | |
| label="", interactive=True) | |
| crisis_box = gr.Textbox(value=build_crisis_payload(DEFAULT_WORLD), elem_id="tw-crisis", | |
| elem_classes="tw-data", label="", interactive=True) | |
| with gr.Row(elem_id="tw-console"): | |
| with gr.Column(scale=8): | |
| event_input = gr.Textbox( | |
| placeholder="Type an event to throw at the neighborhood… (e.g. a UFO lands in the park)", | |
| label="", show_label=False, lines=1) | |
| with gr.Column(scale=2, min_width=150): | |
| trigger_btn = gr.Button("⚡ THROW", variant="primary", elem_id="throw-btn") | |
| with gr.Row(elem_id="tw-actions"): | |
| with gr.Column(scale=6, min_width=260): | |
| scenario_dd = gr.Dropdown(choices=scenario_choices(DEFAULT_WORLD), value=None, | |
| label="🎓 Teaching scenario — a real situation to explore") | |
| with gr.Column(scale=2, min_width=140): | |
| run_scenario_btn = gr.Button("🎓 Run Scenario") | |
| with gr.Column(scale=2, min_width=140): | |
| random_btn = gr.Button("🎲 Random Chaos") | |
| with gr.Column(scale=1, min_width=120): | |
| pause_btn = gr.Button("⏯ Time") | |
| with gr.Column(scale=1, min_width=120): | |
| step_btn = gr.Button("⏭ Step") | |
| with gr.Row(elem_id="tw-mode"): | |
| with gr.Column(scale=5, min_width=240): | |
| mode_radio = gr.Radio( | |
| ["🧪 Sandbox", "🎬 Campaign"], value="🧪 Sandbox", | |
| label="Mode", show_label=True) | |
| with gr.Column(scale=2, min_width=160): | |
| start_campaign_btn = gr.Button("🎬 Start Campaign", variant="primary") | |
| with gr.Column(scale=2, min_width=160): | |
| next_round_btn = gr.Button("▶ Play Round") | |
| crisis_hud_html = gr.HTML(render_crisis_hud(DEFAULT_WORLD)) | |
| story_card_html = gr.HTML(render_story_card()) | |
| with gr.Row(equal_height=False): | |
| with gr.Column(scale=3, min_width=220): | |
| gr.HTML('<div class="panel-title">🧑🤝🧑 Townsfolk</div>') | |
| roster_html = gr.HTML(render_roster(DEFAULT_WORLD)) | |
| with gr.Column(scale=4, min_width=240): | |
| gr.HTML('<div class="panel-title">📜 Town Log</div>') | |
| town_log_html = gr.HTML(render_town_log(DEFAULT_WORLD)) | |
| with gr.Column(scale=4, min_width=240): | |
| gr.HTML('<div class="panel-title">🎓 Learn — Why did they react?</div>') | |
| explainer_html = gr.HTML(render_explainer(DEFAULT_WORLD)) | |
| gr.HTML('<div class="panel-title" style="margin-top:6px">🕒 Daily Log</div>') | |
| daily_log_html = gr.HTML(render_daily_log(DEFAULT_WORLD)) | |
| gr.HTML('<div class="panel-title" style="margin-top:6px">🔊 Voice</div>') | |
| with gr.Row(elem_id="tw-audio", equal_height=False): | |
| with gr.Column(scale=4, min_width=240): | |
| mic_input = gr.Audio(sources=["microphone"], type="filepath", label="🎙️ 1 · Record your event") | |
| transcribe_btn = gr.Button("📝 2 · Transcribe to text") | |
| mic_status = gr.HTML('<div class="audio-cap">Record → press <b>Transcribe</b>: your words fill the ' | |
| 'event box, then press <b>⚡ THROW</b>. <i>Mic only works on ' | |
| 'http://localhost:7860 (blocked on 0.0.0.0).</i></div>') | |
| with gr.Column(scale=4, min_width=240): | |
| voice_output = gr.Audio(label="🔊 Auto-voice (plays after each event)", autoplay=True) | |
| gr.HTML('<div class="audio-cap">The <b>most dramatic</b> reaction is read aloud automatically.</div>') | |
| with gr.Column(scale=4, min_width=240): | |
| hear_name = gr.Dropdown(choices=[], label="🎧 Replay a character's voice") | |
| hear_btn = gr.Button("▶ Play their last line") | |
| gr.HTML('<div class="audio-cap">Pick any character and hear their <b>last line</b> again.</div>') | |
| gr.HTML('<div class="tw-footer">Built for <strong>Build Small Hackathon 2026</strong> · ' | |
| 'Hugging Face × Gradio · Thousand Token Wood<br>' | |
| 'Models: <strong>Nemotron-Mini-4B</strong> (NVIDIA) + <strong>VoxCPM2</strong> (OpenBMB) + ' | |
| '<strong>Whisper</strong> · running on <strong>Hugging Face ZeroGPU</strong><br>' | |
| 'Built with <strong>OpenAI Codex</strong><br>' | |
| '<span class="footer-muted">Switch maps with the <b>World</b> picker, top-right · ' | |
| 'open at <b>http://localhost:7860</b> for microphone access.</span></div>') | |
| tick_timer = gr.Timer(value=float(os.environ.get("TINYWORLD_TIME_SCALE", "6")), active=True) | |
| # ---------------------------------------------------------- handlers | |
| def _run_event(world_id, event_text, focus=None, route=None): | |
| world = get_world(world_id) | |
| world_state.init_cast(world) | |
| result = agents.react(world_id, event_text, route=route) | |
| reactions = result["reactions"] | |
| runtime = result.get("runtime") or agents.get_runtime_status() | |
| followup = agents.generate_followup(reactions, event_text) | |
| top = max(reactions, key=lambda r: r["drama"]) | |
| audio = None | |
| try: | |
| cd = next(c for c in world["cast"] if c["name"] == top["name"]) | |
| audio = voice.generate_voice(top["text"], cd["voice_description"]) | |
| except Exception as e: | |
| print(f"[app] voice failed: {e}") | |
| names = [r["name"] for r in reactions] | |
| return (render_town_log(world_id, reactions, followup), render_roster(world_id), | |
| render_ticker(world_id), audio, | |
| gr.Dropdown(choices=names, value=names[0] if names else None), | |
| reactions, build_reactions_payload(world_id, reactions), | |
| render_explainer(world_id, reactions, focus), render_mode_badge(runtime), | |
| render_daily_log(world_id)) | |
| def _sandbox_tail(world_id): | |
| # the three campaign outputs in their idle/passthrough state for sandbox events | |
| return (render_crisis_hud(world_id), render_story_card(), build_crisis_payload(world_id)) | |
| def _empty_sandbox(world_id): | |
| return (render_town_log(world_id), render_roster(world_id), render_ticker(world_id), | |
| None, gr.Dropdown(choices=[], value=None), [], "", render_explainer(world_id), | |
| render_mode_badge(), render_daily_log(world_id)) + _sandbox_tail(world_id) | |
| def _sandbox_throw(event_text, world_id): | |
| if not event_text or not event_text.strip(): | |
| return _empty_sandbox(world_id) | |
| world = get_world(world_id) | |
| route = router.classify(event_text.strip(), world) | |
| if route["type"] == "noop": | |
| return _empty_sandbox(world_id) | |
| return _run_event(world_id, event_text.strip(), route=route) + _sandbox_tail(world_id) | |
| def do_throw(event_text, world_id, mode): | |
| v = campaign.view(get_world(world_id)) | |
| if str(mode).startswith("🎬") and v["active"] and v["status"] == "playing": | |
| return _run_crisis_round(world_id, event_text) | |
| return _sandbox_throw(event_text, world_id) | |
| def start_campaign(world_id): | |
| world = get_world(world_id) | |
| world_state.reset_world(world_id) | |
| world_state.init_cast(world) | |
| v = campaign.start(world) | |
| card = render_story_card("intro", f"🎬 {v['title']}", v.get("chapter_intro", "")) | |
| return (render_town_log(world_id), render_roster(world_id), render_ticker(world_id), | |
| None, gr.Dropdown(choices=[], value=None), [], build_state_payload(world_id), | |
| render_explainer(world_id), render_mode_badge(), render_daily_log(world_id), | |
| render_crisis_hud(world_id), card, build_crisis_payload(world_id)) | |
| def _run_crisis_round(world_id, player_text): | |
| world = get_world(world_id) | |
| v0 = campaign.view(world) | |
| if not v0["active"] or not v0["crisis"] or v0["status"] != "playing": | |
| return (render_town_log(world_id), render_roster(world_id), render_ticker(world_id), | |
| None, gr.Dropdown(choices=[], value=None), [], build_state_payload(world_id), | |
| render_explainer(world_id), render_mode_badge(), render_daily_log(world_id), | |
| render_crisis_hud(world_id), | |
| render_story_card("dispatch", "", "No active crisis — press Start Campaign."), | |
| build_crisis_payload(world_id)) | |
| cr = campaign.current_crisis(world) | |
| focus = cr.focus if cr else "" | |
| event_text = campaign.round_event_text(world) | |
| route = None | |
| if player_text and player_text.strip(): | |
| r = router.classify(player_text.strip(), world) | |
| if r["type"] != "noop": | |
| route = r | |
| result = agents.react(world_id, event_text, route=route) | |
| reactions = result["reactions"] | |
| runtime = result.get("runtime") or agents.get_runtime_status() | |
| res = campaign.resolve_round(world, reactions) | |
| audio = None | |
| if reactions: | |
| top = max(reactions, key=lambda r: r["drama"]) | |
| try: | |
| cd = next(c for c in world["cast"] if c["name"] == top["name"]) | |
| audio = voice.generate_voice(top["text"], cd["voice_description"]) | |
| except Exception as e: | |
| print(f"[app] voice failed: {e}") | |
| names = [r["name"] for r in reactions] | |
| dispatch = {"text": res.get("dispatch", "")} if res.get("dispatch") else None | |
| return (render_town_log(world_id, reactions, dispatch), render_roster(world_id), | |
| render_ticker(world_id), audio, | |
| gr.Dropdown(choices=names, value=names[0] if names else None), | |
| reactions, build_reactions_payload(world_id, reactions), | |
| render_explainer(world_id, reactions, focus), render_mode_badge(runtime), | |
| render_daily_log(world_id), render_crisis_hud(world_id), | |
| _outcome_card(res), build_crisis_payload(world_id)) | |
| def run_scenario(scenario_id, world_id): | |
| world = get_world(world_id) | |
| scen = next((s for s in world.get("scenarios", []) if s["id"] == scenario_id), None) | |
| if not scen: | |
| return (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), | |
| gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update()) | |
| route = {"type": "world_event", "text": scen["event"], "instruction": scen["event"], "addressees": [], "goto": None} | |
| return (scen["event"],) + _run_event(world_id, scen["event"], scen.get("focus"), route) | |
| def random_chaos(world_id): | |
| world = get_world(world_id) | |
| if world and world.get("events"): | |
| return random.choice(world["events"]) | |
| return events.random_event() | |
| def switch_world(world_id): | |
| world = get_world(world_id) | |
| if world: | |
| world_state.reset_world(world_id) | |
| world_state.init_cast(world) | |
| return (build_world_payload(world_id), "", render_town_log(world_id), | |
| render_roster(world_id), render_ticker(world_id), | |
| gr.Dropdown(choices=scenario_choices(world_id), value=None), | |
| render_explainer(world_id), world_id, render_mode_badge(), render_daily_log(world_id), | |
| render_crisis_hud(world_id), render_story_card(), build_crisis_payload(world_id)) | |
| def sim_tick(world_id): | |
| world = get_world(world_id) | |
| if world: | |
| world_state.tick(world, hours=1.0) | |
| return (render_ticker(world_id), render_roster(world_id), | |
| build_state_payload(world_id), render_daily_log(world_id)) | |
| def toggle_time(world_id): | |
| _, _, paused = world_state.get_game_time(world_id) | |
| world_state.set_paused(world_id, not paused) | |
| return sim_tick(world_id) | |
| def step_time(world_id): | |
| world = get_world(world_id) | |
| if world: | |
| world_state.tick(world, hours=1.0, force=True) | |
| return (render_ticker(world_id), render_roster(world_id), | |
| build_state_payload(world_id), render_daily_log(world_id)) | |
| def transcribe_audio(audio_path): | |
| if not audio_path: | |
| return ("", '<div class="audio-cap">⚠ No recording found — press the mic ● record button first. ' | |
| '<i>The mic only works on http://localhost:7860.</i></div>') | |
| text = transcribe.transcribe(audio_path) | |
| if not text: | |
| return ("", '<div class="audio-cap">⚠ Could not transcribe. Try again, or just type the event.</div>') | |
| return (text, f'<div class="audio-cap">✅ Heard: "<b>{esc(text)}</b>" — now press <b>⚡ THROW</b>.</div>') | |
| def hear_reaction(name, world_id, reactions): | |
| if not reactions or not name: | |
| return None | |
| rx = next((r for r in reactions if r["name"] == name), None) | |
| if not rx: | |
| return None | |
| world = get_world(world_id) | |
| try: | |
| cd = next(c for c in world["cast"] if c["name"] == name) | |
| return voice.generate_voice(rx["text"], cd["voice_description"]) | |
| except Exception as e: | |
| print(f"[app] hear failed: {e}") | |
| return None | |
| def play_round(world_id): | |
| return _run_crisis_round(world_id, "") | |
| trig_out = [town_log_html, roster_html, ticker_html, voice_output, | |
| hear_name, last_reactions_state, reactions_box, explainer_html, mode_badge_html, | |
| daily_log_html] | |
| full_out = trig_out + [crisis_hud_html, story_card_html, crisis_box] | |
| trigger_btn.click(do_throw, [event_input, current_world_id, mode_radio], full_out) | |
| event_input.submit(do_throw, [event_input, current_world_id, mode_radio], full_out) | |
| start_campaign_btn.click(start_campaign, [current_world_id], full_out) | |
| next_round_btn.click(play_round, [current_world_id], full_out) | |
| run_scenario_btn.click(run_scenario, [scenario_dd, current_world_id], [event_input] + trig_out) | |
| random_btn.click(random_chaos, [current_world_id], [event_input]) | |
| transcribe_btn.click(transcribe_audio, [mic_input], [event_input, mic_status]) | |
| hear_btn.click(hear_reaction, [hear_name, current_world_id, last_reactions_state], [voice_output]) | |
| world_picker.change(switch_world, [world_picker], | |
| [world_box, reactions_box, town_log_html, roster_html, ticker_html, | |
| scenario_dd, explainer_html, current_world_id, mode_badge_html, daily_log_html, | |
| crisis_hud_html, story_card_html, crisis_box]) | |
| pause_btn.click(toggle_time, [current_world_id], [ticker_html, roster_html, reactions_box, daily_log_html]) | |
| step_btn.click(step_time, [current_world_id], [ticker_html, roster_html, reactions_box, daily_log_html]) | |
| tick_timer.tick(sim_tick, [current_world_id], [ticker_html, roster_html, reactions_box, daily_log_html]) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("GRADIO_SERVER_PORT", "7860")) | |
| print("\n TinyWorld is starting…") | |
| print(f" ▶ Open http://localhost:{port} (use localhost, not 0.0.0.0, so the mic works)\n") | |
| demo.launch(server_name="0.0.0.0", server_port=port, css=css_content, js=js_content or None) | |