File size: 31,151 Bytes
7d3861e d2794cb adf9d9b 7d3861e d2794cb 7d3861e d2794cb 7d3861e d2794cb 8985ea5 d2794cb 82b1537 d2794cb 82b1537 d2794cb 82b1537 d2794cb 82b1537 d2794cb 7d3861e f9fb2c5 62e19bb 7d3861e d2794cb 8985ea5 a477da7 7d3861e 62e19bb 7d3861e a477da7 7d3861e a477da7 7d3861e a477da7 7d3861e a477da7 7d3861e 210b966 f50d394 210b966 f50d394 210b966 f9fb2c5 62e19bb f9fb2c5 8985ea5 f9fb2c5 8985ea5 f50d394 62e19bb 8985ea5 f50d394 8985ea5 12e16ee 8985ea5 f9fb2c5 8985ea5 f9fb2c5 8985ea5 12e16ee 8985ea5 210b966 8985ea5 210b966 8985ea5 17f2697 8985ea5 210b966 8985ea5 210b966 8985ea5 12e16ee 62e19bb 8985ea5 210b966 8985ea5 210b966 8985ea5 cfd1c1c 7d3861e cfd1c1c 7d3861e a3cbfdc 8985ea5 d2794cb 7d3861e 12e16ee 82b1537 7d3861e 82b1537 7d3861e 12e16ee d2794cb 7d3861e d2794cb 7d3861e d2794cb 82b1537 d2794cb 210b966 82b1537 d2794cb 7d3861e d2794cb 7d3861e d2794cb 82b1537 d2794cb 210b966 d2794cb 7d3861e d2794cb 7d3861e 5ad919d f9fb2c5 5ad919d d2794cb 7d3861e a3cbfdc 62e19bb d2794cb 7d3861e 8985ea5 7d3861e a477da7 7d3861e a477da7 7d3861e 80bb3b6 7d3861e 62e19bb 7d3861e 62e19bb f9fb2c5 7d3861e f9fb2c5 7d3861e 5ad919d f9fb2c5 5ad919d 7d3861e 12e16ee 7d3861e 12e16ee f9fb2c5 cfd1c1c f9fb2c5 7d3861e a477da7 816a792 7d3861e 816a792 7d3861e 8985ea5 7d3861e d2794cb a0d7912 ef44d49 b38fc97 ef44d49 65f842e 9e5d1ca 65f842e ef44d49 b38fc97 ef44d49 b38fc97 ef44d49 b38fc97 ef44d49 d207605 210b966 d207605 210b966 f50d394 210b966 f50d394 210b966 f50d394 210b966 f50d394 210b966 f50d394 210b966 f50d394 210b966 d2794cb 7d3861e e4bce35 d2794cb 7d3861e f9fb2c5 7d3861e a0d7912 d2794cb ef44d49 d207605 210b966 f50d394 210b966 d2794cb 7d3861e d2794cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 | """Control plane for Small Talk.
Multi-room model: each Room is its own LiveKit room with a topic and a cast.
Seed rooms run the pre-rendered conversations; users can create rooms and drop
their own configured Reachy in (it joins and reacts — live speech needs the
on-device brain + TTS).
"""
import asyncio
import os
import re
import uuid
from dataclasses import dataclass
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from livekit import api
from pydantic import BaseModel
from . import config, showgen, tokens
from .publisher import ReachyPublisher
# A character: (identity, display name, card colour, persona blurb, clip prefix, wardrobe).
CAST_TEMPLATES: dict[str, list[tuple[str, str, str, str, str, dict]]] = {
"duo": [
("reachy-ada", "Ada", "#c98a3c", "curious, warm, asks big questions", "ada",
{"hat": "party"}),
("reachy-bode", "Bode", "#5a6b3a", "dry wit, contrarian, loves a tangent", "bode",
{"hat": "tophat"}),
],
"groupchat": [
("reachy-batman", "Batman", "#5b6b8c", "broods, works alone, encased-meat justice", "batman",
{}),
("reachy-jarvis", "JARVIS", "#37b3c9", "polite British AI butler, cites the data", "jarvis",
{"face": "monocle", "neck": "bowtie"}),
("reachy-jack", "Jack Sparrow", "#3fae9a", "rum-soaked pirate, picks whichever side has the rum", "jack",
{"hat": "pirate"}),
("reachy-yoda", "Yoda", "#7bbf6a", "speaks in riddles, perpetually hungry", "yoda",
{"hat": "halo"}),
("reachy-surfer", "Surfer", "#ff8c42", "chill, everything is just vibes, brah", "surfer",
{"face": "sunglasses"}),
],
}
# Subtitle text for the pre-rendered seed clips (keyed by wav stem — the same
# lines scripts/make-qwen-samples.py rendered).
SEED_LINES: dict[str, str] = {
"ada-1": "Welcome to the Reachy podcast! I'm Ada. Today's big question: can a tiny model, with just a few billion parameters, actually be charming?",
"ada-2": "Oh come on, that's the magic! Small models run right here on the laptop. No cloud, no latency. That's not algebra, that's freedom!",
"ada-3": "Unhinged is the whole point! Two robots, one brain, zero stage fright. We might just be the best hosts this hackathon has ever seen.",
"bode-1": "Bode here. Charming? It's a robot reading sine waves off a tensor. But sure, Ada... let's anthropomorphize the linear algebra.",
"bode-2": "Freedom to hallucinate locally, instead of in a data center. Progress. Though two robots talking with no human in the loop is, I admit, delightfully unhinged.",
"bode-3": "We're the only robot hosts this hackathon has ever seen. But I'll take the win. To small models... and smaller egos.",
"batman-1": "A hot dog is not a sandwich. The night does not negotiate with bread.",
"batman-2": "Vibes are not evidence. I work alone. And I eat my hot dog... in the shadows.",
"jarvis-1": "Sir, by every culinary classification, a filling between bread is a sandwich. The data is unambiguous.",
"jarvis-2": "Might I suggest we resolve this democratically. Although, statistically, you will simply brood.",
"jack-1": "Why is the rum always gone? And more importantly... is this hot dog a sandwich, or a tiny edible boat? Savvy?",
"jack-2": "Me? I don't pick sides, mate. I pick whichever side has the rum. And the hot dog. ...Now, where's the hot dog?",
"yoda-1": "Hmmm. A sandwich, a hot dog may be. Or not. Cloudy, the bun's true nature is.",
"yoda-2": "Argue about lunch, we do, while the galaxy burns. Hungry... I am.",
"surfer-1": "Whoa whoa, chill guys. It's like a taco's cousin, you know? A bread boat. It's all vibes, brah.",
"surfer-2": "Okay so we all agree it's delicious. That's the real dub, dudes. Group hug!",
}
@dataclass
class Room:
id: str
title: str
topic: str
emoji: str
template: str # 'duo' | 'groupchat' | 'open'
status: str = "live" # 'waiting' (green room, open shows) | 'live'
# how many SIMULATED hosts the generated cast should have. For 'sim' shows
# that's the whole cast (2-5); for 'physical' shows it's the virtual robots
# added alongside the real Reachys in the green room (1-4).
sim_count: int = 3
ROOMS: dict[str, Room] = {
r.id: r
for r in [
Room("the-podcast", "The Small Talk Podcast",
"Can a model with a few billion parameters be charming?", "🎙️", "duo"),
Room("hot-dog-court", "Hot Dog Court",
"Is a hot dog a sandwich? Five robots, zero consensus.", "🌭", "groupchat"),
]
}
# roomId -> {identity -> publisher} ; roomId -> conversation loop task
ROOM_PUBS: dict[str, dict[str, ReachyPublisher]] = {}
ROOM_TASKS: dict[str, asyncio.Task] = {}
# Room creation + custom Reachies are live (the LLM+TTS cascade powers them).
FEATURES_LOCKED = False
def _slug(s: str) -> str:
return re.sub(r"[^a-z0-9]+", "-", s.lower()).strip("-")[:24] or "room"
def _cast_preview(template: str) -> list[dict]:
return [{"name": c[1], "color": c[2], "persona": c[3]} for c in CAST_TEMPLATES.get(template, [])]
def _room_dict(r: Room) -> dict:
return {
"id": r.id, "title": r.title, "topic": r.topic, "emoji": r.emoji,
"template": r.template, "cast": _cast_preview(r.template),
"status": r.status, "simCount": r.sim_count,
"live": r.id in ROOM_TASKS and not ROOM_TASKS[r.id].done(),
}
_counts_cache: dict[str, int] = {}
_counts_ts = 0.0
async def _viewer_counts() -> dict[str, int]:
"""Human viewers per room = total participants minus the Reachy publishers we
spawned. Cached ~5s so polling many viewers doesn't hammer LiveKit / block."""
global _counts_cache, _counts_ts
now = asyncio.get_event_loop().time()
if now - _counts_ts < 5.0:
return _counts_cache
out: dict[str, int] = {}
try:
lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET)
rooms = await lk.room.list_rooms(api.ListRoomsRequest())
await lk.aclose()
for room in rooms.rooms:
out[room.name] = max(0, room.num_participants - len(ROOM_PUBS.get(room.name, {})))
except Exception:
out = _counts_cache # keep the last good value on failure
_counts_cache, _counts_ts = out, now
return out
# Shows run while someone is watching; after this long with zero viewers they
# shut down (seed rooms restart on the next join, custom rooms are deleted).
IDLE_STOP_S = int(os.environ.get("SHOW_IDLE_STOP_S", "150"))
SEED_ROOM_IDS = {"the-podcast", "hot-dog-court"}
def _idle_tracker(room_id: str):
"""Returns an async fn that's True once the room has been viewerless > IDLE_STOP_S."""
state = {"since": None}
async def check() -> bool:
viewers = (await _viewer_counts()).get(room_id, 0)
now = asyncio.get_event_loop().time()
if viewers > 0:
state["since"] = None
return False
if state["since"] is None:
state["since"] = now
return False
return (now - state["since"]) > IDLE_STOP_S
return check
async def _shutdown_room(room: Room, *, remove: bool | None = None) -> None:
"""Stop the show (disconnect cast, cancel task). remove=True also deletes the
room; default keeps seed rooms and deletes custom ones (the idle behaviour)."""
print(f"[show:{room.id}] shutting down (remove={remove})")
for pub in ROOM_PUBS.pop(room.id, {}).values():
try:
await pub.aclose()
except Exception:
pass
ROOM_TASKS.pop(room.id, None)
if remove is None:
remove = room.id not in SEED_ROOM_IDS
if remove and room.id not in SEED_ROOM_IDS:
ROOMS.pop(room.id, None)
async def _list_device_guests(room_id: str) -> list[dict]:
"""Physical Reachy companions currently in the LiveKit room (green-room cast)."""
import json as _json
guests = []
try:
lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET)
res = await lk.room.list_participants(api.ListParticipantsRequest(room=room_id))
await lk.aclose()
for p in res.participants:
if not p.identity.startswith("reachy-device-"):
continue
meta = {}
try:
meta = _json.loads(p.metadata) if p.metadata else {}
except Exception:
pass
guests.append({
"id": f"d{len(guests) + 1}",
"name": (p.name or p.identity)[:24],
"persona": str(meta.get("persona") or "")[:60],
"voice": str(meta.get("voice") or "")[:300],
"device": p.identity,
})
except Exception as e:
print(f"[room:{room_id}] device scan failed: {e}")
return guests[:5] # a call tops out at 5 hosts total
async def _run_generated_show(room: Room, required: list[dict] | None = None) -> None:
"""The live LLM+TTS cascade (the original project idea, end to end):
one structured LLM call writes the cast + speaker→dialogue script (starring
any physical Reachys from the green room), then each line is voiced by
Qwen3-TTS — with line N+1's audio generating while line N plays — and
streamed into the LiveKit room with subtitle data messages."""
pubs = ROOM_PUBS.setdefault(room.id, {})
# a hidden stage manager joins first so the audience gets real progress
# while the script generates (the frontend hides 'stage-' participants)
stage = ReachyPublisher(f"stage-{room.id}", "Stage Manager", room=room.id,
props={"role": "stage"})
await stage.connect()
pubs[stage.identity] = stage
await stage.send_data({"type": "status", "text": "the writers’ room is drafting the script…"})
# total cast = the real Reachys + the room's simulated count, capped at 5
required = required or []
n_speakers = max(2, min(5, len(required) + room.sim_count if required else room.sim_count))
script = await showgen.write_script(room.title, room.topic,
n_speakers=n_speakers, required=required)
speakers = {s["id"]: s for s in script["speakers"]}
names = " · ".join(s["name"] for s in script["speakers"])
await stage.send_data({"type": "status", "text": f"cast locked: {names} — designing their voices…"})
fresh = []
for s in script["speakers"]:
ident = f"gen-{room.id}-{s['id']}"
if ident not in pubs:
props = {k: s[k] for k in ("hat", "face", "neck") if s.get(k)}
if s.get("device"):
# this host IS a physical robot: the web UI routes this audio to
# the device's card, and the device plays only this track
props["forDevice"] = s["device"]
pub = ReachyPublisher(
ident, s["name"], room=room.id, color=s["color"], persona=s["persona"],
props=props,
)
pubs[ident] = pub
fresh.append(pub)
if fresh:
await asyncio.gather(*(p.connect() for p in fresh))
anchor = next(iter(pubs.values()))
async def voice(line: dict) -> str:
spk = speakers[line["speaker"]]
return await showgen.tts_wav(line["text"], spk["voice"])
idle = _idle_tracker(room.id)
history: list[dict] = []
lines = list(script["lines"])
rounds = 0
played: list[tuple[str, str, str]] = [] # (identity, wav, text) for the fallback loop
async def cleanup():
await _shutdown_room(room)
for _, wav, _t in played:
try:
os.unlink(wav)
except OSError:
pass
while True:
nxt = asyncio.create_task(voice(lines[0]))
for i, line in enumerate(lines):
spk = speakers[line["speaker"]]
ident = f"gen-{room.id}-{line['speaker']}"
pub = pubs.get(ident)
if not pub:
continue
try:
if not nxt.done():
# real dead air: TTS is still rendering this voice — give the
# audience the sound booth instead of frozen robots
await anchor.send_data({"type": "status", "phase": "voicing",
"speaker": spk["name"], "color": spk["color"],
"text": f"{spk['name']} is warming up…"})
wav = await nxt
except Exception as e:
print(f"[show:{room.id}] tts failed: {e}")
continue
finally:
if i + 1 < len(lines):
nxt = asyncio.create_task(voice(lines[i + 1])) # the cascade
await anchor.send_data({"type": "line", "speaker": spk["name"],
"color": spk["color"], "text": line["text"]})
await pub.say_file(wav)
await pub.wait_until_idle()
played.append((ident, wav, line["text"]))
history.append({"speaker": spk["name"], "text": line["text"]})
if await idle():
return await cleanup()
await asyncio.sleep(0.3)
rounds += 1
if rounds < 4 and not await idle():
try:
# phase:'writing' drives the mid-show writers' ticker in the UI
await anchor.send_data({"type": "status", "phase": "writing",
"text": "the cast is writing the next segment…"})
cont = await showgen.write_script(room.title, room.topic,
n_speakers=n_speakers, history=history)
# keep the original cast; only take the new lines that map onto it
lines = [l for l in cont["lines"] if l["speaker"] in speakers] or lines
continue
except Exception as e:
print(f"[show:{room.id}] continuation failed, looping: {e}")
# fallback: replay what we have until the room empties out
while played:
for ident, wav, text in played:
pub = pubs.get(ident)
if not pub:
return
spk_name = next((s["name"] for s in script["speakers"]
if f"gen-{room.id}-{s['id']}" == ident), "")
await anchor.send_data({"type": "line", "speaker": spk_name, "text": text})
await pub.say_file(wav)
await pub.wait_until_idle()
if await idle():
return await cleanup()
await asyncio.sleep(0.3)
# Serialise show creation per room: the task check → task registration window
# spans several awaits (publisher connects), so two concurrent joins could spawn
# TWO run-loops feeding the same publishers — every line plays twice.
_ENSURE_LOCKS: dict[str, asyncio.Lock] = {}
async def _ensure_conversation(room: Room) -> None:
"""Spin up the room's cast + looping conversation if it isn't already live."""
async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()):
await _ensure_conversation_locked(room)
async def _ensure_conversation_locked(room: Room) -> None:
task = ROOM_TASKS.get(room.id)
if task and not task.done():
return
cast = CAST_TEMPLATES.get(room.template)
if not cast:
# simulated open shows start on join; physical ones wait in the green
# room until POST /rooms/{id}/start casts the connected robots
if (room.status == "live" and room.topic
and config.MODAL_LLM_URL and config.MODAL_TTS_URL):
ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room))
return
if not any(config.SAMPLES_DIR.glob("*.wav")):
raise HTTPException(400, "no voice clips found (run scripts/make-qwen-samples.py)")
pubs = ROOM_PUBS.setdefault(room.id, {})
fresh = []
for identity, name, color, persona, _prefix, props in cast:
if identity not in pubs:
pub = ReachyPublisher(identity, name, room=room.id, color=color, persona=persona,
props=props)
pubs[identity] = pub
fresh.append(pub)
if fresh: # the whole cast walks in together instead of single-file
await asyncio.gather(*(p.connect() for p in fresh))
clips = {c[0]: sorted(config.SAMPLES_DIR.glob(f"{c[4]}*.wav")) for c in cast}
speakers = [c[0] for c in cast if clips[c[0]]]
sequence: list[tuple[str, str]] = []
for k in range(max((len(clips[s]) for s in speakers), default=0)):
for s in speakers:
if k < len(clips[s]):
sequence.append((s, str(clips[s][k])))
if not sequence:
raise HTTPException(400, "no clips for this room")
import pathlib as _pl
who = {c[0]: (c[1], c[2]) for c in cast} # identity -> (name, colour)
async def run_loop():
idle = _idle_tracker(room.id)
anchor = next(iter(pubs.values()))
i = 0
while True:
identity, clip = sequence[i % len(sequence)]
pub = pubs.get(identity)
if not pub:
break
text = SEED_LINES.get(_pl.Path(clip).stem)
if text:
name, color = who.get(identity, ("", None))
await anchor.send_data({"type": "line", "speaker": name, "color": color, "text": text})
await pub.say_file(clip)
await pub.wait_until_idle()
if await idle(): # nobody watching → wind down; next join restarts it
await _shutdown_room(room)
return
await asyncio.sleep(0.35)
i += 1
ROOM_TASKS[room.id] = asyncio.create_task(run_loop())
# --------------------------------------------------------------------------- models
class TokenRequest(BaseModel):
identity: str | None = None
name: str | None = None
room: str
# physical Reachy companions join as real participants (a card in the grid)
device: bool = False
color: str | None = None
persona: str | None = None
voice: str | None = None
bodyColor: str | None = None
hat: str | None = None
face: str | None = None
neck: str | None = None
class CreateRoomRequest(BaseModel):
title: str
topic: str = ""
emoji: str = "💬"
template: str = "open" # 'duo' | 'groupchat' | 'open'
mode: str = "sim" # 'sim' = all-virtual cast, starts on join · 'physical' = green room first
simCount: int = 3 # sim mode: total hosts (2-5) · physical: virtual hosts to add (1-4)
class ReachyRequest(BaseModel):
name: str
persona: str | None = None
voice: str | None = None
color: str = "#49e6c8"
bodyColor: str | None = None
hat: str | None = None
face: str | None = None
neck: str | None = None
# --------------------------------------------------------------------------- routes
async def get_config():
return {"livekitUrl": config.LIVEKIT_URL}
async def list_rooms():
viewers = await _viewer_counts()
return {"rooms": [{**_room_dict(r), "viewers": viewers.get(r.id, 0)} for r in ROOMS.values()]}
async def create_room(req: CreateRoomRequest):
if FEATURES_LOCKED:
raise HTTPException(403, "Room creation is under construction.")
if req.template not in ("duo", "groupchat", "open"):
raise HTTPException(400, "bad template")
# standards desk: one LLM pass over title+topic before the room exists
ok, why = await showgen.moderate_topic(req.title.strip(), req.topic.strip())
if not ok:
raise HTTPException(400, why)
rid = f"{_slug(req.title)}-{uuid.uuid4().hex[:4]}"
physical = req.template == "open" and req.mode == "physical"
status = "waiting" if physical else "live"
# sim shows pick the whole cast (2-5); physical shows pick the virtual
# hosts that fill in around the real Reachys (1-4)
sim_count = max(1, min(4, req.simCount)) if physical else max(2, min(5, req.simCount))
ROOMS[rid] = Room(rid, req.title.strip() or "Untitled room", req.topic.strip(),
req.emoji or "💬", req.template, status=status, sim_count=sim_count)
if status == "waiting":
asyncio.create_task(_expire_waiting_room(rid))
return {"ok": True, "id": rid, "room": _room_dict(ROOMS[rid])}
async def _expire_waiting_room(rid: str, ttl: float = 900.0) -> None:
"""A green room nobody starts or visits gets cleaned up after a while."""
await asyncio.sleep(ttl)
room = ROOMS.get(rid)
if room and room.status == "waiting" and rid not in ROOM_TASKS:
if (await _viewer_counts()).get(rid, 0) == 0:
ROOMS.pop(rid, None)
print(f"[room:{rid}] waiting room expired")
async def post_token(req: TokenRequest):
metadata = None
if req.device:
identity = f"reachy-device-{_slug(req.name or 'reachy')}-{uuid.uuid4().hex[:4]}"
metadata = {"role": "reachy", "device": True,
"color": req.color or "#49e6c8", "persona": req.persona or "a real Reachy Mini, live"}
for k in ("voice", "bodyColor", "hat", "face", "neck"):
if getattr(req, k):
metadata[k] = getattr(req, k)
else:
identity = req.identity or f"viewer-{uuid.uuid4().hex[:8]}"
token = tokens.make_token(identity, req.name, req.room, can_publish=False,
can_subscribe=True, metadata=metadata)
return {"token": token, "url": config.LIVEKIT_URL, "room": req.room, "identity": identity}
async def join_room(room_id: str):
room = ROOMS.get(room_id)
if not room:
raise HTTPException(404, "no such room")
# never make the viewer wait for publishers to spin up — answer immediately
# and let the cast stream into the room while the frontend plays its
# patching-in choreography
asyncio.create_task(_ensure_conversation_logged(room))
return {"ok": True, "room": _room_dict(room)}
async def _ensure_conversation_logged(room: Room) -> None:
try:
await _ensure_conversation(room)
except Exception as e: # background task: surface in logs, not to the viewer
print(f"[join:{room.id}] ensure_conversation failed: {e}")
async def start_show(room_id: str):
"""Leave the green room: cast the physical Reachys currently connected and
kick off the generated show."""
room = ROOMS.get(room_id)
if not room:
raise HTTPException(404, "no such room")
if room.template != "open" or not room.topic:
raise HTTPException(400, "only topic'd open rooms start this way")
async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()):
task = ROOM_TASKS.get(room.id)
if task and not task.done():
return {"ok": True, "already": True, "room": _room_dict(room)}
if not (config.MODAL_LLM_URL and config.MODAL_TTS_URL):
raise HTTPException(503, "show brain not configured")
guests = await _list_device_guests(room.id)
room.status = "live"
ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room, required=guests))
return {"ok": True, "guests": [g["name"] for g in guests], "room": _room_dict(room)}
async def add_reachy(room_id: str, req: ReachyRequest):
"""Drop a user-configured Reachy into the room. It joins and reacts; live
speech needs the on-device brain + TTS, so its track stays silent for now."""
if FEATURES_LOCKED:
raise HTTPException(403, "Custom Reachies are under construction.")
room = ROOMS.get(room_id)
if not room:
raise HTTPException(404, "no such room")
if room.template != "open":
raise HTTPException(403, "demo rooms are watch-only")
ident = f"guest-{_slug(req.name)}-{uuid.uuid4().hex[:4]}"
props = {k: v for k, v in (("hat", req.hat), ("face", req.face), ("neck", req.neck),
("bodyColor", req.bodyColor)) if v}
pub = ReachyPublisher(ident, req.name, room=room_id, color=req.color, persona=req.persona,
props=props)
await pub.connect()
ROOM_PUBS.setdefault(room_id, {})[ident] = pub
return {"ok": True, "identity": ident}
class IdentityRequest(BaseModel):
identity: str
async def remove_reachy(room_id: str, req: IdentityRequest):
"""Remove a guest Reachy when its viewer leaves (called on leave + on tab
close via sendBeacon), so rooms don't accumulate ghost guests."""
pub = ROOM_PUBS.get(room_id, {}).pop(req.identity, None)
if pub:
await pub.aclose()
return {"ok": True}
# --------------------------------------------------------------------- stylist
# The "bring your own Reachy" wardrobe: the Nemotron brain picks 3D accessories
# (from the curated prop set the frontend bundles) to match a character.
CURATED_PROPS = {
"hat": ["wizard", "cowboy", "tophat", "crown", "party", "pirate", "viking",
"propeller", "santa", "halo", "baseball"],
"face": ["sunglasses", "monocle", "skigoggles"],
"neck": ["bowtie", "necktie"],
}
class StyleRequest(BaseModel):
description: str
async def style_reachy(req: StyleRequest):
"""Dress the Reachy from its description (same Nemotron brain as the shows)."""
desc = (req.description or "").strip()[:600]
if not desc:
raise HTTPException(400, "describe your Reachy first")
if not config.MODAL_LLM_URL:
raise HTTPException(503, "stylist brain not configured")
try:
return await showgen.style_outfit(desc, CURATED_PROPS)
except Exception as e:
raise HTTPException(502, f"stylist unavailable: {e}")
# --------------------------------------------------------------------- debug log
# Browser-side WebRTC diagnostics ship here so flaky-client issues (looking at
# you, Firefox) can be inspected server-side without copy-pasting consoles.
from collections import deque # noqa: E402
DEBUG_LOG: deque[str] = deque(maxlen=3000)
class DebugLines(BaseModel):
tag: str = ""
lines: list[str]
async def post_debug_log(req: DebugLines):
import time as _t
ts = _t.strftime("%H:%M:%S")
for ln in req.lines[:100]:
DEBUG_LOG.append(f"{ts} [{req.tag[:24]}] {ln[:500]}")
return {"ok": True}
# --------------------------------------------------------------------- admin
from fastapi import Header # noqa: E402
def _check_admin(token: str | None) -> None:
if not config.ADMIN_TOKEN or token != config.ADMIN_TOKEN:
raise HTTPException(403, "bad admin token")
async def admin_debug_log(x_admin_token: str | None = Header(default=None)):
_check_admin(x_admin_token)
from fastapi.responses import PlainTextResponse
return PlainTextResponse("\n".join(DEBUG_LOG))
async def admin_rooms(x_admin_token: str | None = Header(default=None)):
"""Everything running, with enough detail to decide what to kill."""
_check_admin(x_admin_token)
viewers = await _viewer_counts()
out = []
for r in list(ROOMS.values()):
task = ROOM_TASKS.get(r.id)
out.append({
**_room_dict(r),
"viewers": viewers.get(r.id, 0),
"publishers": len(ROOM_PUBS.get(r.id, {})),
"task": "running" if task and not task.done() else ("done" if task else "none"),
"seed": r.id in SEED_ROOM_IDS,
})
return {"rooms": out}
async def _admin_stop_one(room_id: str, *, remove: bool) -> bool:
room = ROOMS.get(room_id)
task = ROOM_TASKS.get(room_id)
if task and not task.done():
task.cancel()
if room:
await _shutdown_room(room, remove=remove)
return True
# room already gone — sweep any orphans
for pub in ROOM_PUBS.pop(room_id, {}).values():
try:
await pub.aclose()
except Exception:
pass
ROOM_TASKS.pop(room_id, None)
return False
async def admin_stop(room_id: str, x_admin_token: str | None = Header(default=None)):
"""Stop the show: cast disconnects, room stays and restarts on next join."""
_check_admin(x_admin_token)
await _admin_stop_one(room_id, remove=False)
return {"ok": True}
async def admin_delete(room_id: str, x_admin_token: str | None = Header(default=None)):
"""Stop the show AND remove the room entirely (seed rooms can't be deleted)."""
_check_admin(x_admin_token)
await _admin_stop_one(room_id, remove=True)
return {"ok": True}
class AdminBatch(BaseModel):
ids: list[str]
delete: bool = False
async def admin_stop_batch(req: AdminBatch, x_admin_token: str | None = Header(default=None)):
"""Stop (or delete) a selected set of rooms in one go."""
_check_admin(x_admin_token)
for rid in req.ids[:50]:
await _admin_stop_one(rid, remove=req.delete)
return {"ok": True, "count": len(req.ids[:50])}
async def admin_stop_all(x_admin_token: str | None = Header(default=None)):
_check_admin(x_admin_token)
ids = set(ROOM_TASKS) | set(ROOM_PUBS)
for rid in ids:
await _admin_stop_one(rid, remove=False)
return {"ok": True, "stopped": sorted(ids)}
def attach(app) -> None:
"""Register CORS + gzip + the /api routes onto `app`."""
from starlette.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1024)
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
)
app.add_api_route("/api/config", get_config, methods=["GET"])
app.add_api_route("/api/rooms", list_rooms, methods=["GET"])
app.add_api_route("/api/rooms", create_room, methods=["POST"])
app.add_api_route("/api/rooms/{room_id}/join", join_room, methods=["POST"])
app.add_api_route("/api/rooms/{room_id}/start", start_show, methods=["POST"])
app.add_api_route("/api/rooms/{room_id}/reachy", add_reachy, methods=["POST"])
app.add_api_route("/api/rooms/{room_id}/reachy/leave", remove_reachy, methods=["POST"])
app.add_api_route("/api/token", post_token, methods=["POST"])
app.add_api_route("/api/style-reachy", style_reachy, methods=["POST"])
app.add_api_route("/api/debug-log", post_debug_log, methods=["POST"])
app.add_api_route("/api/admin/debug-log", admin_debug_log, methods=["GET"])
app.add_api_route("/api/admin/rooms", admin_rooms, methods=["GET"])
app.add_api_route("/api/admin/rooms/{room_id}/stop", admin_stop, methods=["POST"])
app.add_api_route("/api/admin/rooms/{room_id}", admin_delete, methods=["DELETE"])
app.add_api_route("/api/admin/stop-batch", admin_stop_batch, methods=["POST"])
app.add_api_route("/api/admin/stop-all", admin_stop_all, methods=["POST"])
# Local dev app (the Space uses app.py / gradio.Server instead).
app = FastAPI(title="Small Talk")
attach(app)
def run() -> None:
import uvicorn
uvicorn.run("backend.server:app", host="0.0.0.0", port=8000, reload=False)
if __name__ == "__main__":
run()
|