| """Control plane for Small Talk. |
| |
| Multi-room model: each Room is its own LiveKit room with a topic and a cast. |
| Seed rooms run the pre-rendered conversations; users can create rooms and drop |
| their own configured Reachy in (it joins and reacts — live speech needs the |
| on-device brain + TTS). |
| """ |
| import asyncio |
| import os |
| import re |
| import uuid |
| from dataclasses import dataclass |
|
|
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from livekit import api |
| from pydantic import BaseModel |
|
|
| from . import config, showgen, tokens |
| from .publisher import ReachyPublisher |
|
|
| |
| CAST_TEMPLATES: dict[str, list[tuple[str, str, str, str, str, dict]]] = { |
| "duo": [ |
| ("reachy-ada", "Ada", "#c98a3c", "curious, warm, asks big questions", "ada", |
| {"hat": "party"}), |
| ("reachy-bode", "Bode", "#5a6b3a", "dry wit, contrarian, loves a tangent", "bode", |
| {"hat": "tophat"}), |
| ], |
| "groupchat": [ |
| ("reachy-batman", "Batman", "#5b6b8c", "broods, works alone, encased-meat justice", "batman", |
| {}), |
| ("reachy-jarvis", "JARVIS", "#37b3c9", "polite British AI butler, cites the data", "jarvis", |
| {"face": "monocle", "neck": "bowtie"}), |
| ("reachy-jack", "Jack Sparrow", "#3fae9a", "rum-soaked pirate, picks whichever side has the rum", "jack", |
| {"hat": "pirate"}), |
| ("reachy-yoda", "Yoda", "#7bbf6a", "speaks in riddles, perpetually hungry", "yoda", |
| {"hat": "halo"}), |
| ("reachy-surfer", "Surfer", "#ff8c42", "chill, everything is just vibes, brah", "surfer", |
| {"face": "sunglasses"}), |
| ], |
| } |
|
|
| |
| |
| SEED_LINES: dict[str, str] = { |
| "ada-1": "Welcome to the Reachy podcast! I'm Ada. Today's big question: can a tiny model, with just a few billion parameters, actually be charming?", |
| "ada-2": "Oh come on, that's the magic! Small models run right here on the laptop. No cloud, no latency. That's not algebra, that's freedom!", |
| "ada-3": "Unhinged is the whole point! Two robots, one brain, zero stage fright. We might just be the best hosts this hackathon has ever seen.", |
| "bode-1": "Bode here. Charming? It's a robot reading sine waves off a tensor. But sure, Ada... let's anthropomorphize the linear algebra.", |
| "bode-2": "Freedom to hallucinate locally, instead of in a data center. Progress. Though two robots talking with no human in the loop is, I admit, delightfully unhinged.", |
| "bode-3": "We're the only robot hosts this hackathon has ever seen. But I'll take the win. To small models... and smaller egos.", |
| "batman-1": "A hot dog is not a sandwich. The night does not negotiate with bread.", |
| "batman-2": "Vibes are not evidence. I work alone. And I eat my hot dog... in the shadows.", |
| "jarvis-1": "Sir, by every culinary classification, a filling between bread is a sandwich. The data is unambiguous.", |
| "jarvis-2": "Might I suggest we resolve this democratically. Although, statistically, you will simply brood.", |
| "jack-1": "Why is the rum always gone? And more importantly... is this hot dog a sandwich, or a tiny edible boat? Savvy?", |
| "jack-2": "Me? I don't pick sides, mate. I pick whichever side has the rum. And the hot dog. ...Now, where's the hot dog?", |
| "yoda-1": "Hmmm. A sandwich, a hot dog may be. Or not. Cloudy, the bun's true nature is.", |
| "yoda-2": "Argue about lunch, we do, while the galaxy burns. Hungry... I am.", |
| "surfer-1": "Whoa whoa, chill guys. It's like a taco's cousin, you know? A bread boat. It's all vibes, brah.", |
| "surfer-2": "Okay so we all agree it's delicious. That's the real dub, dudes. Group hug!", |
| } |
|
|
|
|
| @dataclass |
| class Room: |
| id: str |
| title: str |
| topic: str |
| emoji: str |
| template: str |
| status: str = "live" |
| |
| |
| |
| sim_count: int = 3 |
|
|
|
|
| ROOMS: dict[str, Room] = { |
| r.id: r |
| for r in [ |
| Room("the-podcast", "The Small Talk Podcast", |
| "Can a model with a few billion parameters be charming?", "🎙️", "duo"), |
| Room("hot-dog-court", "Hot Dog Court", |
| "Is a hot dog a sandwich? Five robots, zero consensus.", "🌭", "groupchat"), |
| ] |
| } |
|
|
| |
| ROOM_PUBS: dict[str, dict[str, ReachyPublisher]] = {} |
| ROOM_TASKS: dict[str, asyncio.Task] = {} |
|
|
| |
| FEATURES_LOCKED = False |
|
|
|
|
| def _slug(s: str) -> str: |
| return re.sub(r"[^a-z0-9]+", "-", s.lower()).strip("-")[:24] or "room" |
|
|
|
|
| def _cast_preview(template: str) -> list[dict]: |
| return [{"name": c[1], "color": c[2], "persona": c[3]} for c in CAST_TEMPLATES.get(template, [])] |
|
|
|
|
| def _room_dict(r: Room) -> dict: |
| return { |
| "id": r.id, "title": r.title, "topic": r.topic, "emoji": r.emoji, |
| "template": r.template, "cast": _cast_preview(r.template), |
| "status": r.status, "simCount": r.sim_count, |
| "live": r.id in ROOM_TASKS and not ROOM_TASKS[r.id].done(), |
| } |
|
|
|
|
| _counts_cache: dict[str, int] = {} |
| _counts_ts = 0.0 |
|
|
|
|
| async def _viewer_counts() -> dict[str, int]: |
| """Human viewers per room = total participants minus the Reachy publishers we |
| spawned. Cached ~5s so polling many viewers doesn't hammer LiveKit / block.""" |
| global _counts_cache, _counts_ts |
| now = asyncio.get_event_loop().time() |
| if now - _counts_ts < 5.0: |
| return _counts_cache |
| out: dict[str, int] = {} |
| try: |
| lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET) |
| rooms = await lk.room.list_rooms(api.ListRoomsRequest()) |
| await lk.aclose() |
| for room in rooms.rooms: |
| out[room.name] = max(0, room.num_participants - len(ROOM_PUBS.get(room.name, {}))) |
| except Exception: |
| out = _counts_cache |
| _counts_cache, _counts_ts = out, now |
| return out |
|
|
|
|
| |
| |
| IDLE_STOP_S = int(os.environ.get("SHOW_IDLE_STOP_S", "150")) |
| SEED_ROOM_IDS = {"the-podcast", "hot-dog-court"} |
|
|
|
|
| def _idle_tracker(room_id: str): |
| """Returns an async fn that's True once the room has been viewerless > IDLE_STOP_S.""" |
| state = {"since": None} |
|
|
| async def check() -> bool: |
| viewers = (await _viewer_counts()).get(room_id, 0) |
| now = asyncio.get_event_loop().time() |
| if viewers > 0: |
| state["since"] = None |
| return False |
| if state["since"] is None: |
| state["since"] = now |
| return False |
| return (now - state["since"]) > IDLE_STOP_S |
|
|
| return check |
|
|
|
|
| async def _shutdown_room(room: Room, *, remove: bool | None = None) -> None: |
| """Stop the show (disconnect cast, cancel task). remove=True also deletes the |
| room; default keeps seed rooms and deletes custom ones (the idle behaviour).""" |
| print(f"[show:{room.id}] shutting down (remove={remove})") |
| for pub in ROOM_PUBS.pop(room.id, {}).values(): |
| try: |
| await pub.aclose() |
| except Exception: |
| pass |
| ROOM_TASKS.pop(room.id, None) |
| if remove is None: |
| remove = room.id not in SEED_ROOM_IDS |
| if remove and room.id not in SEED_ROOM_IDS: |
| ROOMS.pop(room.id, None) |
|
|
|
|
| async def _list_device_guests(room_id: str) -> list[dict]: |
| """Physical Reachy companions currently in the LiveKit room (green-room cast).""" |
| import json as _json |
|
|
| guests = [] |
| try: |
| lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET) |
| res = await lk.room.list_participants(api.ListParticipantsRequest(room=room_id)) |
| await lk.aclose() |
| for p in res.participants: |
| if not p.identity.startswith("reachy-device-"): |
| continue |
| meta = {} |
| try: |
| meta = _json.loads(p.metadata) if p.metadata else {} |
| except Exception: |
| pass |
| guests.append({ |
| "id": f"d{len(guests) + 1}", |
| "name": (p.name or p.identity)[:24], |
| "persona": str(meta.get("persona") or "")[:60], |
| "voice": str(meta.get("voice") or "")[:300], |
| "device": p.identity, |
| }) |
| except Exception as e: |
| print(f"[room:{room_id}] device scan failed: {e}") |
| return guests[:5] |
|
|
|
|
| async def _run_generated_show(room: Room, required: list[dict] | None = None) -> None: |
| """The live LLM+TTS cascade (the original project idea, end to end): |
| one structured LLM call writes the cast + speaker→dialogue script (starring |
| any physical Reachys from the green room), then each line is voiced by |
| Qwen3-TTS — with line N+1's audio generating while line N plays — and |
| streamed into the LiveKit room with subtitle data messages.""" |
| pubs = ROOM_PUBS.setdefault(room.id, {}) |
| |
| |
| stage = ReachyPublisher(f"stage-{room.id}", "Stage Manager", room=room.id, |
| props={"role": "stage"}) |
| await stage.connect() |
| pubs[stage.identity] = stage |
| await stage.send_data({"type": "status", "text": "the writers’ room is drafting the script…"}) |
|
|
| |
| required = required or [] |
| n_speakers = max(2, min(5, len(required) + room.sim_count if required else room.sim_count)) |
| script = await showgen.write_script(room.title, room.topic, |
| n_speakers=n_speakers, required=required) |
| speakers = {s["id"]: s for s in script["speakers"]} |
| names = " · ".join(s["name"] for s in script["speakers"]) |
| await stage.send_data({"type": "status", "text": f"cast locked: {names} — designing their voices…"}) |
|
|
| fresh = [] |
| for s in script["speakers"]: |
| ident = f"gen-{room.id}-{s['id']}" |
| if ident not in pubs: |
| props = {k: s[k] for k in ("hat", "face", "neck") if s.get(k)} |
| if s.get("device"): |
| |
| |
| props["forDevice"] = s["device"] |
| pub = ReachyPublisher( |
| ident, s["name"], room=room.id, color=s["color"], persona=s["persona"], |
| props=props, |
| ) |
| pubs[ident] = pub |
| fresh.append(pub) |
| if fresh: |
| await asyncio.gather(*(p.connect() for p in fresh)) |
| anchor = next(iter(pubs.values())) |
|
|
| async def voice(line: dict) -> str: |
| spk = speakers[line["speaker"]] |
| return await showgen.tts_wav(line["text"], spk["voice"]) |
|
|
| idle = _idle_tracker(room.id) |
| history: list[dict] = [] |
| lines = list(script["lines"]) |
| rounds = 0 |
| played: list[tuple[str, str, str]] = [] |
|
|
| async def cleanup(): |
| await _shutdown_room(room) |
| for _, wav, _t in played: |
| try: |
| os.unlink(wav) |
| except OSError: |
| pass |
|
|
| while True: |
| nxt = asyncio.create_task(voice(lines[0])) |
| for i, line in enumerate(lines): |
| spk = speakers[line["speaker"]] |
| ident = f"gen-{room.id}-{line['speaker']}" |
| pub = pubs.get(ident) |
| if not pub: |
| continue |
| try: |
| if not nxt.done(): |
| |
| |
| await anchor.send_data({"type": "status", "phase": "voicing", |
| "speaker": spk["name"], "color": spk["color"], |
| "text": f"{spk['name']} is warming up…"}) |
| wav = await nxt |
| except Exception as e: |
| print(f"[show:{room.id}] tts failed: {e}") |
| continue |
| finally: |
| if i + 1 < len(lines): |
| nxt = asyncio.create_task(voice(lines[i + 1])) |
| await anchor.send_data({"type": "line", "speaker": spk["name"], |
| "color": spk["color"], "text": line["text"]}) |
| await pub.say_file(wav) |
| await pub.wait_until_idle() |
| played.append((ident, wav, line["text"])) |
| history.append({"speaker": spk["name"], "text": line["text"]}) |
| if await idle(): |
| return await cleanup() |
| await asyncio.sleep(0.3) |
|
|
| rounds += 1 |
| if rounds < 4 and not await idle(): |
| try: |
| |
| await anchor.send_data({"type": "status", "phase": "writing", |
| "text": "the cast is writing the next segment…"}) |
| cont = await showgen.write_script(room.title, room.topic, |
| n_speakers=n_speakers, history=history) |
| |
| lines = [l for l in cont["lines"] if l["speaker"] in speakers] or lines |
| continue |
| except Exception as e: |
| print(f"[show:{room.id}] continuation failed, looping: {e}") |
| |
| while played: |
| for ident, wav, text in played: |
| pub = pubs.get(ident) |
| if not pub: |
| return |
| spk_name = next((s["name"] for s in script["speakers"] |
| if f"gen-{room.id}-{s['id']}" == ident), "") |
| await anchor.send_data({"type": "line", "speaker": spk_name, "text": text}) |
| await pub.say_file(wav) |
| await pub.wait_until_idle() |
| if await idle(): |
| return await cleanup() |
| await asyncio.sleep(0.3) |
|
|
|
|
| |
| |
| |
| _ENSURE_LOCKS: dict[str, asyncio.Lock] = {} |
|
|
|
|
| async def _ensure_conversation(room: Room) -> None: |
| """Spin up the room's cast + looping conversation if it isn't already live.""" |
| async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()): |
| await _ensure_conversation_locked(room) |
|
|
|
|
| async def _ensure_conversation_locked(room: Room) -> None: |
| task = ROOM_TASKS.get(room.id) |
| if task and not task.done(): |
| return |
| cast = CAST_TEMPLATES.get(room.template) |
| if not cast: |
| |
| |
| if (room.status == "live" and room.topic |
| and config.MODAL_LLM_URL and config.MODAL_TTS_URL): |
| ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room)) |
| return |
| if not any(config.SAMPLES_DIR.glob("*.wav")): |
| raise HTTPException(400, "no voice clips found (run scripts/make-qwen-samples.py)") |
|
|
| pubs = ROOM_PUBS.setdefault(room.id, {}) |
| fresh = [] |
| for identity, name, color, persona, _prefix, props in cast: |
| if identity not in pubs: |
| pub = ReachyPublisher(identity, name, room=room.id, color=color, persona=persona, |
| props=props) |
| pubs[identity] = pub |
| fresh.append(pub) |
| if fresh: |
| await asyncio.gather(*(p.connect() for p in fresh)) |
|
|
| clips = {c[0]: sorted(config.SAMPLES_DIR.glob(f"{c[4]}*.wav")) for c in cast} |
| speakers = [c[0] for c in cast if clips[c[0]]] |
| sequence: list[tuple[str, str]] = [] |
| for k in range(max((len(clips[s]) for s in speakers), default=0)): |
| for s in speakers: |
| if k < len(clips[s]): |
| sequence.append((s, str(clips[s][k]))) |
| if not sequence: |
| raise HTTPException(400, "no clips for this room") |
|
|
| import pathlib as _pl |
|
|
| who = {c[0]: (c[1], c[2]) for c in cast} |
|
|
| async def run_loop(): |
| idle = _idle_tracker(room.id) |
| anchor = next(iter(pubs.values())) |
| i = 0 |
| while True: |
| identity, clip = sequence[i % len(sequence)] |
| pub = pubs.get(identity) |
| if not pub: |
| break |
| text = SEED_LINES.get(_pl.Path(clip).stem) |
| if text: |
| name, color = who.get(identity, ("", None)) |
| await anchor.send_data({"type": "line", "speaker": name, "color": color, "text": text}) |
| await pub.say_file(clip) |
| await pub.wait_until_idle() |
| if await idle(): |
| await _shutdown_room(room) |
| return |
| await asyncio.sleep(0.35) |
| i += 1 |
|
|
| ROOM_TASKS[room.id] = asyncio.create_task(run_loop()) |
|
|
|
|
| |
|
|
|
|
| class TokenRequest(BaseModel): |
| identity: str | None = None |
| name: str | None = None |
| room: str |
| |
| device: bool = False |
| color: str | None = None |
| persona: str | None = None |
| voice: str | None = None |
| bodyColor: str | None = None |
| hat: str | None = None |
| face: str | None = None |
| neck: str | None = None |
|
|
|
|
| class CreateRoomRequest(BaseModel): |
| title: str |
| topic: str = "" |
| emoji: str = "💬" |
| template: str = "open" |
| mode: str = "sim" |
| simCount: int = 3 |
|
|
|
|
| class ReachyRequest(BaseModel): |
| name: str |
| persona: str | None = None |
| voice: str | None = None |
| color: str = "#49e6c8" |
| bodyColor: str | None = None |
| hat: str | None = None |
| face: str | None = None |
| neck: str | None = None |
|
|
|
|
| |
|
|
|
|
| async def get_config(): |
| return {"livekitUrl": config.LIVEKIT_URL} |
|
|
|
|
| async def list_rooms(): |
| viewers = await _viewer_counts() |
| return {"rooms": [{**_room_dict(r), "viewers": viewers.get(r.id, 0)} for r in ROOMS.values()]} |
|
|
|
|
| async def create_room(req: CreateRoomRequest): |
| if FEATURES_LOCKED: |
| raise HTTPException(403, "Room creation is under construction.") |
| if req.template not in ("duo", "groupchat", "open"): |
| raise HTTPException(400, "bad template") |
| |
| ok, why = await showgen.moderate_topic(req.title.strip(), req.topic.strip()) |
| if not ok: |
| raise HTTPException(400, why) |
| rid = f"{_slug(req.title)}-{uuid.uuid4().hex[:4]}" |
| physical = req.template == "open" and req.mode == "physical" |
| status = "waiting" if physical else "live" |
| |
| |
| sim_count = max(1, min(4, req.simCount)) if physical else max(2, min(5, req.simCount)) |
| ROOMS[rid] = Room(rid, req.title.strip() or "Untitled room", req.topic.strip(), |
| req.emoji or "💬", req.template, status=status, sim_count=sim_count) |
| if status == "waiting": |
| asyncio.create_task(_expire_waiting_room(rid)) |
| return {"ok": True, "id": rid, "room": _room_dict(ROOMS[rid])} |
|
|
|
|
| async def _expire_waiting_room(rid: str, ttl: float = 900.0) -> None: |
| """A green room nobody starts or visits gets cleaned up after a while.""" |
| await asyncio.sleep(ttl) |
| room = ROOMS.get(rid) |
| if room and room.status == "waiting" and rid not in ROOM_TASKS: |
| if (await _viewer_counts()).get(rid, 0) == 0: |
| ROOMS.pop(rid, None) |
| print(f"[room:{rid}] waiting room expired") |
|
|
|
|
| async def post_token(req: TokenRequest): |
| metadata = None |
| if req.device: |
| identity = f"reachy-device-{_slug(req.name or 'reachy')}-{uuid.uuid4().hex[:4]}" |
| metadata = {"role": "reachy", "device": True, |
| "color": req.color or "#49e6c8", "persona": req.persona or "a real Reachy Mini, live"} |
| for k in ("voice", "bodyColor", "hat", "face", "neck"): |
| if getattr(req, k): |
| metadata[k] = getattr(req, k) |
| else: |
| identity = req.identity or f"viewer-{uuid.uuid4().hex[:8]}" |
| token = tokens.make_token(identity, req.name, req.room, can_publish=False, |
| can_subscribe=True, metadata=metadata) |
| return {"token": token, "url": config.LIVEKIT_URL, "room": req.room, "identity": identity} |
|
|
|
|
| async def join_room(room_id: str): |
| room = ROOMS.get(room_id) |
| if not room: |
| raise HTTPException(404, "no such room") |
| |
| |
| |
| asyncio.create_task(_ensure_conversation_logged(room)) |
| return {"ok": True, "room": _room_dict(room)} |
|
|
|
|
| async def _ensure_conversation_logged(room: Room) -> None: |
| try: |
| await _ensure_conversation(room) |
| except Exception as e: |
| print(f"[join:{room.id}] ensure_conversation failed: {e}") |
|
|
|
|
| async def start_show(room_id: str): |
| """Leave the green room: cast the physical Reachys currently connected and |
| kick off the generated show.""" |
| room = ROOMS.get(room_id) |
| if not room: |
| raise HTTPException(404, "no such room") |
| if room.template != "open" or not room.topic: |
| raise HTTPException(400, "only topic'd open rooms start this way") |
| async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()): |
| task = ROOM_TASKS.get(room.id) |
| if task and not task.done(): |
| return {"ok": True, "already": True, "room": _room_dict(room)} |
| if not (config.MODAL_LLM_URL and config.MODAL_TTS_URL): |
| raise HTTPException(503, "show brain not configured") |
| guests = await _list_device_guests(room.id) |
| room.status = "live" |
| ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room, required=guests)) |
| return {"ok": True, "guests": [g["name"] for g in guests], "room": _room_dict(room)} |
|
|
|
|
| async def add_reachy(room_id: str, req: ReachyRequest): |
| """Drop a user-configured Reachy into the room. It joins and reacts; live |
| speech needs the on-device brain + TTS, so its track stays silent for now.""" |
| if FEATURES_LOCKED: |
| raise HTTPException(403, "Custom Reachies are under construction.") |
| room = ROOMS.get(room_id) |
| if not room: |
| raise HTTPException(404, "no such room") |
| if room.template != "open": |
| raise HTTPException(403, "demo rooms are watch-only") |
| ident = f"guest-{_slug(req.name)}-{uuid.uuid4().hex[:4]}" |
| props = {k: v for k, v in (("hat", req.hat), ("face", req.face), ("neck", req.neck), |
| ("bodyColor", req.bodyColor)) if v} |
| pub = ReachyPublisher(ident, req.name, room=room_id, color=req.color, persona=req.persona, |
| props=props) |
| await pub.connect() |
| ROOM_PUBS.setdefault(room_id, {})[ident] = pub |
| return {"ok": True, "identity": ident} |
|
|
|
|
| class IdentityRequest(BaseModel): |
| identity: str |
|
|
|
|
| async def remove_reachy(room_id: str, req: IdentityRequest): |
| """Remove a guest Reachy when its viewer leaves (called on leave + on tab |
| close via sendBeacon), so rooms don't accumulate ghost guests.""" |
| pub = ROOM_PUBS.get(room_id, {}).pop(req.identity, None) |
| if pub: |
| await pub.aclose() |
| return {"ok": True} |
|
|
|
|
| |
| |
| |
| CURATED_PROPS = { |
| "hat": ["wizard", "cowboy", "tophat", "crown", "party", "pirate", "viking", |
| "propeller", "santa", "halo", "baseball"], |
| "face": ["sunglasses", "monocle", "skigoggles"], |
| "neck": ["bowtie", "necktie"], |
| } |
|
|
|
|
| class StyleRequest(BaseModel): |
| description: str |
|
|
|
|
| async def style_reachy(req: StyleRequest): |
| """Dress the Reachy from its description (same Nemotron brain as the shows).""" |
| desc = (req.description or "").strip()[:600] |
| if not desc: |
| raise HTTPException(400, "describe your Reachy first") |
| if not config.MODAL_LLM_URL: |
| raise HTTPException(503, "stylist brain not configured") |
| try: |
| return await showgen.style_outfit(desc, CURATED_PROPS) |
| except Exception as e: |
| raise HTTPException(502, f"stylist unavailable: {e}") |
|
|
|
|
| |
| |
| |
| from collections import deque |
|
|
| DEBUG_LOG: deque[str] = deque(maxlen=3000) |
|
|
|
|
| class DebugLines(BaseModel): |
| tag: str = "" |
| lines: list[str] |
|
|
|
|
| async def post_debug_log(req: DebugLines): |
| import time as _t |
|
|
| ts = _t.strftime("%H:%M:%S") |
| for ln in req.lines[:100]: |
| DEBUG_LOG.append(f"{ts} [{req.tag[:24]}] {ln[:500]}") |
| return {"ok": True} |
|
|
|
|
| |
| from fastapi import Header |
|
|
|
|
| def _check_admin(token: str | None) -> None: |
| if not config.ADMIN_TOKEN or token != config.ADMIN_TOKEN: |
| raise HTTPException(403, "bad admin token") |
|
|
|
|
| async def admin_debug_log(x_admin_token: str | None = Header(default=None)): |
| _check_admin(x_admin_token) |
| from fastapi.responses import PlainTextResponse |
|
|
| return PlainTextResponse("\n".join(DEBUG_LOG)) |
|
|
|
|
| async def admin_rooms(x_admin_token: str | None = Header(default=None)): |
| """Everything running, with enough detail to decide what to kill.""" |
| _check_admin(x_admin_token) |
| viewers = await _viewer_counts() |
| out = [] |
| for r in list(ROOMS.values()): |
| task = ROOM_TASKS.get(r.id) |
| out.append({ |
| **_room_dict(r), |
| "viewers": viewers.get(r.id, 0), |
| "publishers": len(ROOM_PUBS.get(r.id, {})), |
| "task": "running" if task and not task.done() else ("done" if task else "none"), |
| "seed": r.id in SEED_ROOM_IDS, |
| }) |
| return {"rooms": out} |
|
|
|
|
| async def _admin_stop_one(room_id: str, *, remove: bool) -> bool: |
| room = ROOMS.get(room_id) |
| task = ROOM_TASKS.get(room_id) |
| if task and not task.done(): |
| task.cancel() |
| if room: |
| await _shutdown_room(room, remove=remove) |
| return True |
| |
| for pub in ROOM_PUBS.pop(room_id, {}).values(): |
| try: |
| await pub.aclose() |
| except Exception: |
| pass |
| ROOM_TASKS.pop(room_id, None) |
| return False |
|
|
|
|
| async def admin_stop(room_id: str, x_admin_token: str | None = Header(default=None)): |
| """Stop the show: cast disconnects, room stays and restarts on next join.""" |
| _check_admin(x_admin_token) |
| await _admin_stop_one(room_id, remove=False) |
| return {"ok": True} |
|
|
|
|
| async def admin_delete(room_id: str, x_admin_token: str | None = Header(default=None)): |
| """Stop the show AND remove the room entirely (seed rooms can't be deleted).""" |
| _check_admin(x_admin_token) |
| await _admin_stop_one(room_id, remove=True) |
| return {"ok": True} |
|
|
|
|
| class AdminBatch(BaseModel): |
| ids: list[str] |
| delete: bool = False |
|
|
|
|
| async def admin_stop_batch(req: AdminBatch, x_admin_token: str | None = Header(default=None)): |
| """Stop (or delete) a selected set of rooms in one go.""" |
| _check_admin(x_admin_token) |
| for rid in req.ids[:50]: |
| await _admin_stop_one(rid, remove=req.delete) |
| return {"ok": True, "count": len(req.ids[:50])} |
|
|
|
|
| async def admin_stop_all(x_admin_token: str | None = Header(default=None)): |
| _check_admin(x_admin_token) |
| ids = set(ROOM_TASKS) | set(ROOM_PUBS) |
| for rid in ids: |
| await _admin_stop_one(rid, remove=False) |
| return {"ok": True, "stopped": sorted(ids)} |
|
|
|
|
| def attach(app) -> None: |
| """Register CORS + gzip + the /api routes onto `app`.""" |
| from starlette.middleware.gzip import GZipMiddleware |
|
|
| app.add_middleware(GZipMiddleware, minimum_size=1024) |
| app.add_middleware( |
| CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] |
| ) |
| app.add_api_route("/api/config", get_config, methods=["GET"]) |
| app.add_api_route("/api/rooms", list_rooms, methods=["GET"]) |
| app.add_api_route("/api/rooms", create_room, methods=["POST"]) |
| app.add_api_route("/api/rooms/{room_id}/join", join_room, methods=["POST"]) |
| app.add_api_route("/api/rooms/{room_id}/start", start_show, methods=["POST"]) |
| app.add_api_route("/api/rooms/{room_id}/reachy", add_reachy, methods=["POST"]) |
| app.add_api_route("/api/rooms/{room_id}/reachy/leave", remove_reachy, methods=["POST"]) |
| app.add_api_route("/api/token", post_token, methods=["POST"]) |
| app.add_api_route("/api/style-reachy", style_reachy, methods=["POST"]) |
| app.add_api_route("/api/debug-log", post_debug_log, methods=["POST"]) |
| app.add_api_route("/api/admin/debug-log", admin_debug_log, methods=["GET"]) |
| app.add_api_route("/api/admin/rooms", admin_rooms, methods=["GET"]) |
| app.add_api_route("/api/admin/rooms/{room_id}/stop", admin_stop, methods=["POST"]) |
| app.add_api_route("/api/admin/rooms/{room_id}", admin_delete, methods=["DELETE"]) |
| app.add_api_route("/api/admin/stop-batch", admin_stop_batch, methods=["POST"]) |
| app.add_api_route("/api/admin/stop-all", admin_stop_all, methods=["POST"]) |
|
|
|
|
| |
| app = FastAPI(title="Small Talk") |
| attach(app) |
|
|
|
|
| def run() -> None: |
| import uvicorn |
|
|
| uvicorn.run("backend.server:app", host="0.0.0.0", port=8000, reload=False) |
|
|
|
|
| if __name__ == "__main__": |
| run() |
|
|