"""Control plane for Small Talk. Multi-room model: each Room is its own LiveKit room with a topic and a cast. Seed rooms run the pre-rendered conversations; users can create rooms and drop their own configured Reachy in (it joins and reacts — live speech needs the on-device brain + TTS). """ import asyncio import os import re import uuid from dataclasses import dataclass from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from livekit import api from pydantic import BaseModel from . import config, showgen, tokens from .publisher import ReachyPublisher # A character: (identity, display name, card colour, persona blurb, clip prefix, wardrobe). CAST_TEMPLATES: dict[str, list[tuple[str, str, str, str, str, dict]]] = { "duo": [ ("reachy-ada", "Ada", "#c98a3c", "curious, warm, asks big questions", "ada", {"hat": "party"}), ("reachy-bode", "Bode", "#5a6b3a", "dry wit, contrarian, loves a tangent", "bode", {"hat": "tophat"}), ], "groupchat": [ ("reachy-batman", "Batman", "#5b6b8c", "broods, works alone, encased-meat justice", "batman", {}), ("reachy-jarvis", "JARVIS", "#37b3c9", "polite British AI butler, cites the data", "jarvis", {"face": "monocle", "neck": "bowtie"}), ("reachy-jack", "Jack Sparrow", "#3fae9a", "rum-soaked pirate, picks whichever side has the rum", "jack", {"hat": "pirate"}), ("reachy-yoda", "Yoda", "#7bbf6a", "speaks in riddles, perpetually hungry", "yoda", {"hat": "halo"}), ("reachy-surfer", "Surfer", "#ff8c42", "chill, everything is just vibes, brah", "surfer", {"face": "sunglasses"}), ], } # Subtitle text for the pre-rendered seed clips (keyed by wav stem — the same # lines scripts/make-qwen-samples.py rendered). SEED_LINES: dict[str, str] = { "ada-1": "Welcome to the Reachy podcast! I'm Ada. Today's big question: can a tiny model, with just a few billion parameters, actually be charming?", "ada-2": "Oh come on, that's the magic! Small models run right here on the laptop. No cloud, no latency. That's not algebra, that's freedom!", "ada-3": "Unhinged is the whole point! Two robots, one brain, zero stage fright. We might just be the best hosts this hackathon has ever seen.", "bode-1": "Bode here. Charming? It's a robot reading sine waves off a tensor. But sure, Ada... let's anthropomorphize the linear algebra.", "bode-2": "Freedom to hallucinate locally, instead of in a data center. Progress. Though two robots talking with no human in the loop is, I admit, delightfully unhinged.", "bode-3": "We're the only robot hosts this hackathon has ever seen. But I'll take the win. To small models... and smaller egos.", "batman-1": "A hot dog is not a sandwich. The night does not negotiate with bread.", "batman-2": "Vibes are not evidence. I work alone. And I eat my hot dog... in the shadows.", "jarvis-1": "Sir, by every culinary classification, a filling between bread is a sandwich. The data is unambiguous.", "jarvis-2": "Might I suggest we resolve this democratically. Although, statistically, you will simply brood.", "jack-1": "Why is the rum always gone? And more importantly... is this hot dog a sandwich, or a tiny edible boat? Savvy?", "jack-2": "Me? I don't pick sides, mate. I pick whichever side has the rum. And the hot dog. ...Now, where's the hot dog?", "yoda-1": "Hmmm. A sandwich, a hot dog may be. Or not. Cloudy, the bun's true nature is.", "yoda-2": "Argue about lunch, we do, while the galaxy burns. Hungry... I am.", "surfer-1": "Whoa whoa, chill guys. It's like a taco's cousin, you know? A bread boat. It's all vibes, brah.", "surfer-2": "Okay so we all agree it's delicious. That's the real dub, dudes. Group hug!", } @dataclass class Room: id: str title: str topic: str emoji: str template: str # 'duo' | 'groupchat' | 'open' status: str = "live" # 'waiting' (green room, open shows) | 'live' # how many SIMULATED hosts the generated cast should have. For 'sim' shows # that's the whole cast (2-5); for 'physical' shows it's the virtual robots # added alongside the real Reachys in the green room (1-4). sim_count: int = 3 ROOMS: dict[str, Room] = { r.id: r for r in [ Room("the-podcast", "The Small Talk Podcast", "Can a model with a few billion parameters be charming?", "🎙️", "duo"), Room("hot-dog-court", "Hot Dog Court", "Is a hot dog a sandwich? Five robots, zero consensus.", "🌭", "groupchat"), ] } # roomId -> {identity -> publisher} ; roomId -> conversation loop task ROOM_PUBS: dict[str, dict[str, ReachyPublisher]] = {} ROOM_TASKS: dict[str, asyncio.Task] = {} # Room creation + custom Reachies are live (the LLM+TTS cascade powers them). FEATURES_LOCKED = False def _slug(s: str) -> str: return re.sub(r"[^a-z0-9]+", "-", s.lower()).strip("-")[:24] or "room" def _cast_preview(template: str) -> list[dict]: return [{"name": c[1], "color": c[2], "persona": c[3]} for c in CAST_TEMPLATES.get(template, [])] def _room_dict(r: Room) -> dict: return { "id": r.id, "title": r.title, "topic": r.topic, "emoji": r.emoji, "template": r.template, "cast": _cast_preview(r.template), "status": r.status, "simCount": r.sim_count, "live": r.id in ROOM_TASKS and not ROOM_TASKS[r.id].done(), } _counts_cache: dict[str, int] = {} _counts_ts = 0.0 async def _viewer_counts() -> dict[str, int]: """Human viewers per room = total participants minus the Reachy publishers we spawned. Cached ~5s so polling many viewers doesn't hammer LiveKit / block.""" global _counts_cache, _counts_ts now = asyncio.get_event_loop().time() if now - _counts_ts < 5.0: return _counts_cache out: dict[str, int] = {} try: lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET) rooms = await lk.room.list_rooms(api.ListRoomsRequest()) await lk.aclose() for room in rooms.rooms: out[room.name] = max(0, room.num_participants - len(ROOM_PUBS.get(room.name, {}))) except Exception: out = _counts_cache # keep the last good value on failure _counts_cache, _counts_ts = out, now return out # Shows run while someone is watching; after this long with zero viewers they # shut down (seed rooms restart on the next join, custom rooms are deleted). IDLE_STOP_S = int(os.environ.get("SHOW_IDLE_STOP_S", "150")) SEED_ROOM_IDS = {"the-podcast", "hot-dog-court"} def _idle_tracker(room_id: str): """Returns an async fn that's True once the room has been viewerless > IDLE_STOP_S.""" state = {"since": None} async def check() -> bool: viewers = (await _viewer_counts()).get(room_id, 0) now = asyncio.get_event_loop().time() if viewers > 0: state["since"] = None return False if state["since"] is None: state["since"] = now return False return (now - state["since"]) > IDLE_STOP_S return check async def _shutdown_room(room: Room, *, remove: bool | None = None) -> None: """Stop the show (disconnect cast, cancel task). remove=True also deletes the room; default keeps seed rooms and deletes custom ones (the idle behaviour).""" print(f"[show:{room.id}] shutting down (remove={remove})") for pub in ROOM_PUBS.pop(room.id, {}).values(): try: await pub.aclose() except Exception: pass ROOM_TASKS.pop(room.id, None) if remove is None: remove = room.id not in SEED_ROOM_IDS if remove and room.id not in SEED_ROOM_IDS: ROOMS.pop(room.id, None) async def _list_device_guests(room_id: str) -> list[dict]: """Physical Reachy companions currently in the LiveKit room (green-room cast).""" import json as _json guests = [] try: lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET) res = await lk.room.list_participants(api.ListParticipantsRequest(room=room_id)) await lk.aclose() for p in res.participants: if not p.identity.startswith("reachy-device-"): continue meta = {} try: meta = _json.loads(p.metadata) if p.metadata else {} except Exception: pass guests.append({ "id": f"d{len(guests) + 1}", "name": (p.name or p.identity)[:24], "persona": str(meta.get("persona") or "")[:60], "voice": str(meta.get("voice") or "")[:300], "device": p.identity, }) except Exception as e: print(f"[room:{room_id}] device scan failed: {e}") return guests[:5] # a call tops out at 5 hosts total async def _run_generated_show(room: Room, required: list[dict] | None = None) -> None: """The live LLM+TTS cascade (the original project idea, end to end): one structured LLM call writes the cast + speaker→dialogue script (starring any physical Reachys from the green room), then each line is voiced by Qwen3-TTS — with line N+1's audio generating while line N plays — and streamed into the LiveKit room with subtitle data messages.""" pubs = ROOM_PUBS.setdefault(room.id, {}) # a hidden stage manager joins first so the audience gets real progress # while the script generates (the frontend hides 'stage-' participants) stage = ReachyPublisher(f"stage-{room.id}", "Stage Manager", room=room.id, props={"role": "stage"}) await stage.connect() pubs[stage.identity] = stage await stage.send_data({"type": "status", "text": "the writers’ room is drafting the script…"}) # total cast = the real Reachys + the room's simulated count, capped at 5 required = required or [] n_speakers = max(2, min(5, len(required) + room.sim_count if required else room.sim_count)) script = await showgen.write_script(room.title, room.topic, n_speakers=n_speakers, required=required) speakers = {s["id"]: s for s in script["speakers"]} names = " · ".join(s["name"] for s in script["speakers"]) await stage.send_data({"type": "status", "text": f"cast locked: {names} — designing their voices…"}) fresh = [] for s in script["speakers"]: ident = f"gen-{room.id}-{s['id']}" if ident not in pubs: props = {k: s[k] for k in ("hat", "face", "neck") if s.get(k)} if s.get("device"): # this host IS a physical robot: the web UI routes this audio to # the device's card, and the device plays only this track props["forDevice"] = s["device"] pub = ReachyPublisher( ident, s["name"], room=room.id, color=s["color"], persona=s["persona"], props=props, ) pubs[ident] = pub fresh.append(pub) if fresh: await asyncio.gather(*(p.connect() for p in fresh)) anchor = next(iter(pubs.values())) async def voice(line: dict) -> str: spk = speakers[line["speaker"]] return await showgen.tts_wav(line["text"], spk["voice"]) idle = _idle_tracker(room.id) history: list[dict] = [] lines = list(script["lines"]) rounds = 0 played: list[tuple[str, str, str]] = [] # (identity, wav, text) for the fallback loop async def cleanup(): await _shutdown_room(room) for _, wav, _t in played: try: os.unlink(wav) except OSError: pass while True: nxt = asyncio.create_task(voice(lines[0])) for i, line in enumerate(lines): spk = speakers[line["speaker"]] ident = f"gen-{room.id}-{line['speaker']}" pub = pubs.get(ident) if not pub: continue try: if not nxt.done(): # real dead air: TTS is still rendering this voice — give the # audience the sound booth instead of frozen robots await anchor.send_data({"type": "status", "phase": "voicing", "speaker": spk["name"], "color": spk["color"], "text": f"{spk['name']} is warming up…"}) wav = await nxt except Exception as e: print(f"[show:{room.id}] tts failed: {e}") continue finally: if i + 1 < len(lines): nxt = asyncio.create_task(voice(lines[i + 1])) # the cascade await anchor.send_data({"type": "line", "speaker": spk["name"], "color": spk["color"], "text": line["text"]}) await pub.say_file(wav) await pub.wait_until_idle() played.append((ident, wav, line["text"])) history.append({"speaker": spk["name"], "text": line["text"]}) if await idle(): return await cleanup() await asyncio.sleep(0.3) rounds += 1 if rounds < 4 and not await idle(): try: # phase:'writing' drives the mid-show writers' ticker in the UI await anchor.send_data({"type": "status", "phase": "writing", "text": "the cast is writing the next segment…"}) cont = await showgen.write_script(room.title, room.topic, n_speakers=n_speakers, history=history) # keep the original cast; only take the new lines that map onto it lines = [l for l in cont["lines"] if l["speaker"] in speakers] or lines continue except Exception as e: print(f"[show:{room.id}] continuation failed, looping: {e}") # fallback: replay what we have until the room empties out while played: for ident, wav, text in played: pub = pubs.get(ident) if not pub: return spk_name = next((s["name"] for s in script["speakers"] if f"gen-{room.id}-{s['id']}" == ident), "") await anchor.send_data({"type": "line", "speaker": spk_name, "text": text}) await pub.say_file(wav) await pub.wait_until_idle() if await idle(): return await cleanup() await asyncio.sleep(0.3) # Serialise show creation per room: the task check → task registration window # spans several awaits (publisher connects), so two concurrent joins could spawn # TWO run-loops feeding the same publishers — every line plays twice. _ENSURE_LOCKS: dict[str, asyncio.Lock] = {} async def _ensure_conversation(room: Room) -> None: """Spin up the room's cast + looping conversation if it isn't already live.""" async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()): await _ensure_conversation_locked(room) async def _ensure_conversation_locked(room: Room) -> None: task = ROOM_TASKS.get(room.id) if task and not task.done(): return cast = CAST_TEMPLATES.get(room.template) if not cast: # simulated open shows start on join; physical ones wait in the green # room until POST /rooms/{id}/start casts the connected robots if (room.status == "live" and room.topic and config.MODAL_LLM_URL and config.MODAL_TTS_URL): ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room)) return if not any(config.SAMPLES_DIR.glob("*.wav")): raise HTTPException(400, "no voice clips found (run scripts/make-qwen-samples.py)") pubs = ROOM_PUBS.setdefault(room.id, {}) fresh = [] for identity, name, color, persona, _prefix, props in cast: if identity not in pubs: pub = ReachyPublisher(identity, name, room=room.id, color=color, persona=persona, props=props) pubs[identity] = pub fresh.append(pub) if fresh: # the whole cast walks in together instead of single-file await asyncio.gather(*(p.connect() for p in fresh)) clips = {c[0]: sorted(config.SAMPLES_DIR.glob(f"{c[4]}*.wav")) for c in cast} speakers = [c[0] for c in cast if clips[c[0]]] sequence: list[tuple[str, str]] = [] for k in range(max((len(clips[s]) for s in speakers), default=0)): for s in speakers: if k < len(clips[s]): sequence.append((s, str(clips[s][k]))) if not sequence: raise HTTPException(400, "no clips for this room") import pathlib as _pl who = {c[0]: (c[1], c[2]) for c in cast} # identity -> (name, colour) async def run_loop(): idle = _idle_tracker(room.id) anchor = next(iter(pubs.values())) i = 0 while True: identity, clip = sequence[i % len(sequence)] pub = pubs.get(identity) if not pub: break text = SEED_LINES.get(_pl.Path(clip).stem) if text: name, color = who.get(identity, ("", None)) await anchor.send_data({"type": "line", "speaker": name, "color": color, "text": text}) await pub.say_file(clip) await pub.wait_until_idle() if await idle(): # nobody watching → wind down; next join restarts it await _shutdown_room(room) return await asyncio.sleep(0.35) i += 1 ROOM_TASKS[room.id] = asyncio.create_task(run_loop()) # --------------------------------------------------------------------------- models class TokenRequest(BaseModel): identity: str | None = None name: str | None = None room: str # physical Reachy companions join as real participants (a card in the grid) device: bool = False color: str | None = None persona: str | None = None voice: str | None = None bodyColor: str | None = None hat: str | None = None face: str | None = None neck: str | None = None class CreateRoomRequest(BaseModel): title: str topic: str = "" emoji: str = "💬" template: str = "open" # 'duo' | 'groupchat' | 'open' mode: str = "sim" # 'sim' = all-virtual cast, starts on join · 'physical' = green room first simCount: int = 3 # sim mode: total hosts (2-5) · physical: virtual hosts to add (1-4) class ReachyRequest(BaseModel): name: str persona: str | None = None voice: str | None = None color: str = "#49e6c8" bodyColor: str | None = None hat: str | None = None face: str | None = None neck: str | None = None # --------------------------------------------------------------------------- routes async def get_config(): return {"livekitUrl": config.LIVEKIT_URL} async def list_rooms(): viewers = await _viewer_counts() return {"rooms": [{**_room_dict(r), "viewers": viewers.get(r.id, 0)} for r in ROOMS.values()]} async def create_room(req: CreateRoomRequest): if FEATURES_LOCKED: raise HTTPException(403, "Room creation is under construction.") if req.template not in ("duo", "groupchat", "open"): raise HTTPException(400, "bad template") # standards desk: one LLM pass over title+topic before the room exists ok, why = await showgen.moderate_topic(req.title.strip(), req.topic.strip()) if not ok: raise HTTPException(400, why) rid = f"{_slug(req.title)}-{uuid.uuid4().hex[:4]}" physical = req.template == "open" and req.mode == "physical" status = "waiting" if physical else "live" # sim shows pick the whole cast (2-5); physical shows pick the virtual # hosts that fill in around the real Reachys (1-4) sim_count = max(1, min(4, req.simCount)) if physical else max(2, min(5, req.simCount)) ROOMS[rid] = Room(rid, req.title.strip() or "Untitled room", req.topic.strip(), req.emoji or "💬", req.template, status=status, sim_count=sim_count) if status == "waiting": asyncio.create_task(_expire_waiting_room(rid)) return {"ok": True, "id": rid, "room": _room_dict(ROOMS[rid])} async def _expire_waiting_room(rid: str, ttl: float = 900.0) -> None: """A green room nobody starts or visits gets cleaned up after a while.""" await asyncio.sleep(ttl) room = ROOMS.get(rid) if room and room.status == "waiting" and rid not in ROOM_TASKS: if (await _viewer_counts()).get(rid, 0) == 0: ROOMS.pop(rid, None) print(f"[room:{rid}] waiting room expired") async def post_token(req: TokenRequest): metadata = None if req.device: identity = f"reachy-device-{_slug(req.name or 'reachy')}-{uuid.uuid4().hex[:4]}" metadata = {"role": "reachy", "device": True, "color": req.color or "#49e6c8", "persona": req.persona or "a real Reachy Mini, live"} for k in ("voice", "bodyColor", "hat", "face", "neck"): if getattr(req, k): metadata[k] = getattr(req, k) else: identity = req.identity or f"viewer-{uuid.uuid4().hex[:8]}" token = tokens.make_token(identity, req.name, req.room, can_publish=False, can_subscribe=True, metadata=metadata) return {"token": token, "url": config.LIVEKIT_URL, "room": req.room, "identity": identity} async def join_room(room_id: str): room = ROOMS.get(room_id) if not room: raise HTTPException(404, "no such room") # never make the viewer wait for publishers to spin up — answer immediately # and let the cast stream into the room while the frontend plays its # patching-in choreography asyncio.create_task(_ensure_conversation_logged(room)) return {"ok": True, "room": _room_dict(room)} async def _ensure_conversation_logged(room: Room) -> None: try: await _ensure_conversation(room) except Exception as e: # background task: surface in logs, not to the viewer print(f"[join:{room.id}] ensure_conversation failed: {e}") async def start_show(room_id: str): """Leave the green room: cast the physical Reachys currently connected and kick off the generated show.""" room = ROOMS.get(room_id) if not room: raise HTTPException(404, "no such room") if room.template != "open" or not room.topic: raise HTTPException(400, "only topic'd open rooms start this way") async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()): task = ROOM_TASKS.get(room.id) if task and not task.done(): return {"ok": True, "already": True, "room": _room_dict(room)} if not (config.MODAL_LLM_URL and config.MODAL_TTS_URL): raise HTTPException(503, "show brain not configured") guests = await _list_device_guests(room.id) room.status = "live" ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room, required=guests)) return {"ok": True, "guests": [g["name"] for g in guests], "room": _room_dict(room)} async def add_reachy(room_id: str, req: ReachyRequest): """Drop a user-configured Reachy into the room. It joins and reacts; live speech needs the on-device brain + TTS, so its track stays silent for now.""" if FEATURES_LOCKED: raise HTTPException(403, "Custom Reachies are under construction.") room = ROOMS.get(room_id) if not room: raise HTTPException(404, "no such room") if room.template != "open": raise HTTPException(403, "demo rooms are watch-only") ident = f"guest-{_slug(req.name)}-{uuid.uuid4().hex[:4]}" props = {k: v for k, v in (("hat", req.hat), ("face", req.face), ("neck", req.neck), ("bodyColor", req.bodyColor)) if v} pub = ReachyPublisher(ident, req.name, room=room_id, color=req.color, persona=req.persona, props=props) await pub.connect() ROOM_PUBS.setdefault(room_id, {})[ident] = pub return {"ok": True, "identity": ident} class IdentityRequest(BaseModel): identity: str async def remove_reachy(room_id: str, req: IdentityRequest): """Remove a guest Reachy when its viewer leaves (called on leave + on tab close via sendBeacon), so rooms don't accumulate ghost guests.""" pub = ROOM_PUBS.get(room_id, {}).pop(req.identity, None) if pub: await pub.aclose() return {"ok": True} # --------------------------------------------------------------------- stylist # The "bring your own Reachy" wardrobe: the Nemotron brain picks 3D accessories # (from the curated prop set the frontend bundles) to match a character. CURATED_PROPS = { "hat": ["wizard", "cowboy", "tophat", "crown", "party", "pirate", "viking", "propeller", "santa", "halo", "baseball"], "face": ["sunglasses", "monocle", "skigoggles"], "neck": ["bowtie", "necktie"], } class StyleRequest(BaseModel): description: str async def style_reachy(req: StyleRequest): """Dress the Reachy from its description (same Nemotron brain as the shows).""" desc = (req.description or "").strip()[:600] if not desc: raise HTTPException(400, "describe your Reachy first") if not config.MODAL_LLM_URL: raise HTTPException(503, "stylist brain not configured") try: return await showgen.style_outfit(desc, CURATED_PROPS) except Exception as e: raise HTTPException(502, f"stylist unavailable: {e}") # --------------------------------------------------------------------- debug log # Browser-side WebRTC diagnostics ship here so flaky-client issues (looking at # you, Firefox) can be inspected server-side without copy-pasting consoles. from collections import deque # noqa: E402 DEBUG_LOG: deque[str] = deque(maxlen=3000) class DebugLines(BaseModel): tag: str = "" lines: list[str] async def post_debug_log(req: DebugLines): import time as _t ts = _t.strftime("%H:%M:%S") for ln in req.lines[:100]: DEBUG_LOG.append(f"{ts} [{req.tag[:24]}] {ln[:500]}") return {"ok": True} # --------------------------------------------------------------------- admin from fastapi import Header # noqa: E402 def _check_admin(token: str | None) -> None: if not config.ADMIN_TOKEN or token != config.ADMIN_TOKEN: raise HTTPException(403, "bad admin token") async def admin_debug_log(x_admin_token: str | None = Header(default=None)): _check_admin(x_admin_token) from fastapi.responses import PlainTextResponse return PlainTextResponse("\n".join(DEBUG_LOG)) async def admin_rooms(x_admin_token: str | None = Header(default=None)): """Everything running, with enough detail to decide what to kill.""" _check_admin(x_admin_token) viewers = await _viewer_counts() out = [] for r in list(ROOMS.values()): task = ROOM_TASKS.get(r.id) out.append({ **_room_dict(r), "viewers": viewers.get(r.id, 0), "publishers": len(ROOM_PUBS.get(r.id, {})), "task": "running" if task and not task.done() else ("done" if task else "none"), "seed": r.id in SEED_ROOM_IDS, }) return {"rooms": out} async def _admin_stop_one(room_id: str, *, remove: bool) -> bool: room = ROOMS.get(room_id) task = ROOM_TASKS.get(room_id) if task and not task.done(): task.cancel() if room: await _shutdown_room(room, remove=remove) return True # room already gone — sweep any orphans for pub in ROOM_PUBS.pop(room_id, {}).values(): try: await pub.aclose() except Exception: pass ROOM_TASKS.pop(room_id, None) return False async def admin_stop(room_id: str, x_admin_token: str | None = Header(default=None)): """Stop the show: cast disconnects, room stays and restarts on next join.""" _check_admin(x_admin_token) await _admin_stop_one(room_id, remove=False) return {"ok": True} async def admin_delete(room_id: str, x_admin_token: str | None = Header(default=None)): """Stop the show AND remove the room entirely (seed rooms can't be deleted).""" _check_admin(x_admin_token) await _admin_stop_one(room_id, remove=True) return {"ok": True} class AdminBatch(BaseModel): ids: list[str] delete: bool = False async def admin_stop_batch(req: AdminBatch, x_admin_token: str | None = Header(default=None)): """Stop (or delete) a selected set of rooms in one go.""" _check_admin(x_admin_token) for rid in req.ids[:50]: await _admin_stop_one(rid, remove=req.delete) return {"ok": True, "count": len(req.ids[:50])} async def admin_stop_all(x_admin_token: str | None = Header(default=None)): _check_admin(x_admin_token) ids = set(ROOM_TASKS) | set(ROOM_PUBS) for rid in ids: await _admin_stop_one(rid, remove=False) return {"ok": True, "stopped": sorted(ids)} def attach(app) -> None: """Register CORS + gzip + the /api routes onto `app`.""" from starlette.middleware.gzip import GZipMiddleware app.add_middleware(GZipMiddleware, minimum_size=1024) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) app.add_api_route("/api/config", get_config, methods=["GET"]) app.add_api_route("/api/rooms", list_rooms, methods=["GET"]) app.add_api_route("/api/rooms", create_room, methods=["POST"]) app.add_api_route("/api/rooms/{room_id}/join", join_room, methods=["POST"]) app.add_api_route("/api/rooms/{room_id}/start", start_show, methods=["POST"]) app.add_api_route("/api/rooms/{room_id}/reachy", add_reachy, methods=["POST"]) app.add_api_route("/api/rooms/{room_id}/reachy/leave", remove_reachy, methods=["POST"]) app.add_api_route("/api/token", post_token, methods=["POST"]) app.add_api_route("/api/style-reachy", style_reachy, methods=["POST"]) app.add_api_route("/api/debug-log", post_debug_log, methods=["POST"]) app.add_api_route("/api/admin/debug-log", admin_debug_log, methods=["GET"]) app.add_api_route("/api/admin/rooms", admin_rooms, methods=["GET"]) app.add_api_route("/api/admin/rooms/{room_id}/stop", admin_stop, methods=["POST"]) app.add_api_route("/api/admin/rooms/{room_id}", admin_delete, methods=["DELETE"]) app.add_api_route("/api/admin/stop-batch", admin_stop_batch, methods=["POST"]) app.add_api_route("/api/admin/stop-all", admin_stop_all, methods=["POST"]) # Local dev app (the Space uses app.py / gradio.Server instead). app = FastAPI(title="Small Talk") attach(app) def run() -> None: import uvicorn uvicorn.run("backend.server:app", host="0.0.0.0", port=8000, reload=False) if __name__ == "__main__": run()