File size: 31,151 Bytes
7d3861e
 
 
 
 
 
d2794cb
 
adf9d9b
7d3861e
d2794cb
7d3861e
d2794cb
 
 
7d3861e
d2794cb
 
8985ea5
d2794cb
 
82b1537
 
d2794cb
82b1537
 
 
 
d2794cb
 
82b1537
 
 
 
 
 
 
 
 
 
d2794cb
 
 
82b1537
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d2794cb
7d3861e
 
 
 
 
 
 
f9fb2c5
62e19bb
 
 
 
7d3861e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d2794cb
8985ea5
 
a477da7
7d3861e
 
 
 
 
 
 
 
 
 
 
 
 
62e19bb
7d3861e
 
 
 
a477da7
 
 
 
 
 
 
 
 
 
 
 
7d3861e
 
a477da7
7d3861e
a477da7
 
7d3861e
a477da7
 
 
7d3861e
 
210b966
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f50d394
 
 
 
210b966
 
 
 
 
 
f50d394
 
 
210b966
 
 
f9fb2c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62e19bb
f9fb2c5
 
 
8985ea5
f9fb2c5
 
 
 
8985ea5
f50d394
 
 
 
 
 
 
 
62e19bb
 
 
 
 
8985ea5
f50d394
 
8985ea5
12e16ee
8985ea5
 
 
f9fb2c5
 
 
 
 
8985ea5
 
f9fb2c5
8985ea5
 
12e16ee
 
 
8985ea5
 
 
 
 
 
210b966
8985ea5
 
 
 
210b966
 
 
 
 
 
 
 
 
8985ea5
 
 
 
 
 
 
 
 
17f2697
 
 
 
 
 
8985ea5
 
 
 
 
 
 
 
 
 
 
 
 
210b966
 
8985ea5
 
 
210b966
8985ea5
12e16ee
 
 
62e19bb
 
8985ea5
 
 
 
 
210b966
8985ea5
 
 
 
 
 
 
 
 
 
210b966
 
8985ea5
 
 
cfd1c1c
 
 
 
 
 
7d3861e
 
cfd1c1c
 
 
 
 
7d3861e
 
 
 
 
a3cbfdc
 
 
 
 
8985ea5
d2794cb
7d3861e
 
 
12e16ee
82b1537
7d3861e
82b1537
 
7d3861e
12e16ee
 
 
d2794cb
 
 
 
7d3861e
d2794cb
 
 
 
7d3861e
d2794cb
82b1537
 
 
 
d2794cb
210b966
82b1537
d2794cb
7d3861e
d2794cb
7d3861e
d2794cb
 
82b1537
 
 
 
d2794cb
 
210b966
 
 
d2794cb
 
 
7d3861e
 
 
 
 
d2794cb
7d3861e
 
 
 
5ad919d
 
 
 
f9fb2c5
5ad919d
 
 
 
d2794cb
 
7d3861e
 
 
 
 
a3cbfdc
62e19bb
d2794cb
7d3861e
 
 
 
 
 
8985ea5
 
 
 
7d3861e
 
 
 
 
 
 
 
 
 
a477da7
 
7d3861e
 
 
a477da7
 
7d3861e
 
80bb3b6
 
 
 
7d3861e
62e19bb
 
 
 
 
7d3861e
62e19bb
f9fb2c5
 
7d3861e
 
 
f9fb2c5
 
 
 
 
 
 
 
 
 
7d3861e
5ad919d
 
 
 
 
f9fb2c5
5ad919d
 
 
 
 
 
7d3861e
 
 
 
 
 
 
12e16ee
 
 
 
7d3861e
 
 
12e16ee
 
 
 
 
 
 
f9fb2c5
 
 
 
 
 
 
 
cfd1c1c
 
 
 
 
 
 
 
 
 
f9fb2c5
 
7d3861e
 
 
a477da7
 
816a792
 
7d3861e
816a792
 
7d3861e
8985ea5
 
 
 
7d3861e
 
 
d2794cb
 
a0d7912
 
 
 
 
 
 
 
 
 
 
 
 
ef44d49
b38fc97
 
ef44d49
65f842e
9e5d1ca
 
65f842e
ef44d49
 
 
 
 
 
 
 
b38fc97
ef44d49
 
 
b38fc97
 
ef44d49
b38fc97
ef44d49
 
 
 
d207605
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210b966
 
 
 
 
 
 
 
 
d207605
 
 
 
 
 
 
210b966
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f50d394
210b966
 
 
 
 
f50d394
210b966
 
 
 
 
 
 
 
 
 
 
 
f50d394
 
 
 
 
 
 
 
210b966
f50d394
210b966
 
 
f50d394
 
 
 
 
 
 
 
 
 
 
 
 
210b966
 
 
 
f50d394
210b966
 
 
d2794cb
7d3861e
e4bce35
 
 
d2794cb
 
 
 
7d3861e
 
 
f9fb2c5
7d3861e
a0d7912
d2794cb
ef44d49
d207605
 
210b966
 
f50d394
 
210b966
d2794cb
 
 
7d3861e
d2794cb
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
"""Control plane for Small Talk.

Multi-room model: each Room is its own LiveKit room with a topic and a cast.
Seed rooms run the pre-rendered conversations; users can create rooms and drop
their own configured Reachy in (it joins and reacts — live speech needs the
on-device brain + TTS).
"""
import asyncio
import os
import re
import uuid
from dataclasses import dataclass

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from livekit import api
from pydantic import BaseModel

from . import config, showgen, tokens
from .publisher import ReachyPublisher

# A character: (identity, display name, card colour, persona blurb, clip prefix, wardrobe).
CAST_TEMPLATES: dict[str, list[tuple[str, str, str, str, str, dict]]] = {
    "duo": [
        ("reachy-ada", "Ada", "#c98a3c", "curious, warm, asks big questions", "ada",
         {"hat": "party"}),
        ("reachy-bode", "Bode", "#5a6b3a", "dry wit, contrarian, loves a tangent", "bode",
         {"hat": "tophat"}),
    ],
    "groupchat": [
        ("reachy-batman", "Batman", "#5b6b8c", "broods, works alone, encased-meat justice", "batman",
         {}),
        ("reachy-jarvis", "JARVIS", "#37b3c9", "polite British AI butler, cites the data", "jarvis",
         {"face": "monocle", "neck": "bowtie"}),
        ("reachy-jack", "Jack Sparrow", "#3fae9a", "rum-soaked pirate, picks whichever side has the rum", "jack",
         {"hat": "pirate"}),
        ("reachy-yoda", "Yoda", "#7bbf6a", "speaks in riddles, perpetually hungry", "yoda",
         {"hat": "halo"}),
        ("reachy-surfer", "Surfer", "#ff8c42", "chill, everything is just vibes, brah", "surfer",
         {"face": "sunglasses"}),
    ],
}

# Subtitle text for the pre-rendered seed clips (keyed by wav stem — the same
# lines scripts/make-qwen-samples.py rendered).
SEED_LINES: dict[str, str] = {
    "ada-1": "Welcome to the Reachy podcast! I'm Ada. Today's big question: can a tiny model, with just a few billion parameters, actually be charming?",
    "ada-2": "Oh come on, that's the magic! Small models run right here on the laptop. No cloud, no latency. That's not algebra, that's freedom!",
    "ada-3": "Unhinged is the whole point! Two robots, one brain, zero stage fright. We might just be the best hosts this hackathon has ever seen.",
    "bode-1": "Bode here. Charming? It's a robot reading sine waves off a tensor. But sure, Ada... let's anthropomorphize the linear algebra.",
    "bode-2": "Freedom to hallucinate locally, instead of in a data center. Progress. Though two robots talking with no human in the loop is, I admit, delightfully unhinged.",
    "bode-3": "We're the only robot hosts this hackathon has ever seen. But I'll take the win. To small models... and smaller egos.",
    "batman-1": "A hot dog is not a sandwich. The night does not negotiate with bread.",
    "batman-2": "Vibes are not evidence. I work alone. And I eat my hot dog... in the shadows.",
    "jarvis-1": "Sir, by every culinary classification, a filling between bread is a sandwich. The data is unambiguous.",
    "jarvis-2": "Might I suggest we resolve this democratically. Although, statistically, you will simply brood.",
    "jack-1": "Why is the rum always gone? And more importantly... is this hot dog a sandwich, or a tiny edible boat? Savvy?",
    "jack-2": "Me? I don't pick sides, mate. I pick whichever side has the rum. And the hot dog. ...Now, where's the hot dog?",
    "yoda-1": "Hmmm. A sandwich, a hot dog may be. Or not. Cloudy, the bun's true nature is.",
    "yoda-2": "Argue about lunch, we do, while the galaxy burns. Hungry... I am.",
    "surfer-1": "Whoa whoa, chill guys. It's like a taco's cousin, you know? A bread boat. It's all vibes, brah.",
    "surfer-2": "Okay so we all agree it's delicious. That's the real dub, dudes. Group hug!",
}


@dataclass
class Room:
    id: str
    title: str
    topic: str
    emoji: str
    template: str  # 'duo' | 'groupchat' | 'open'
    status: str = "live"  # 'waiting' (green room, open shows) | 'live'
    # how many SIMULATED hosts the generated cast should have. For 'sim' shows
    # that's the whole cast (2-5); for 'physical' shows it's the virtual robots
    # added alongside the real Reachys in the green room (1-4).
    sim_count: int = 3


ROOMS: dict[str, Room] = {
    r.id: r
    for r in [
        Room("the-podcast", "The Small Talk Podcast",
             "Can a model with a few billion parameters be charming?", "🎙️", "duo"),
        Room("hot-dog-court", "Hot Dog Court",
             "Is a hot dog a sandwich? Five robots, zero consensus.", "🌭", "groupchat"),
    ]
}

# roomId -> {identity -> publisher} ; roomId -> conversation loop task
ROOM_PUBS: dict[str, dict[str, ReachyPublisher]] = {}
ROOM_TASKS: dict[str, asyncio.Task] = {}

# Room creation + custom Reachies are live (the LLM+TTS cascade powers them).
FEATURES_LOCKED = False


def _slug(s: str) -> str:
    return re.sub(r"[^a-z0-9]+", "-", s.lower()).strip("-")[:24] or "room"


def _cast_preview(template: str) -> list[dict]:
    return [{"name": c[1], "color": c[2], "persona": c[3]} for c in CAST_TEMPLATES.get(template, [])]


def _room_dict(r: Room) -> dict:
    return {
        "id": r.id, "title": r.title, "topic": r.topic, "emoji": r.emoji,
        "template": r.template, "cast": _cast_preview(r.template),
        "status": r.status, "simCount": r.sim_count,
        "live": r.id in ROOM_TASKS and not ROOM_TASKS[r.id].done(),
    }


_counts_cache: dict[str, int] = {}
_counts_ts = 0.0


async def _viewer_counts() -> dict[str, int]:
    """Human viewers per room = total participants minus the Reachy publishers we
    spawned. Cached ~5s so polling many viewers doesn't hammer LiveKit / block."""
    global _counts_cache, _counts_ts
    now = asyncio.get_event_loop().time()
    if now - _counts_ts < 5.0:
        return _counts_cache
    out: dict[str, int] = {}
    try:
        lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET)
        rooms = await lk.room.list_rooms(api.ListRoomsRequest())
        await lk.aclose()
        for room in rooms.rooms:
            out[room.name] = max(0, room.num_participants - len(ROOM_PUBS.get(room.name, {})))
    except Exception:
        out = _counts_cache  # keep the last good value on failure
    _counts_cache, _counts_ts = out, now
    return out


# Shows run while someone is watching; after this long with zero viewers they
# shut down (seed rooms restart on the next join, custom rooms are deleted).
IDLE_STOP_S = int(os.environ.get("SHOW_IDLE_STOP_S", "150"))
SEED_ROOM_IDS = {"the-podcast", "hot-dog-court"}


def _idle_tracker(room_id: str):
    """Returns an async fn that's True once the room has been viewerless > IDLE_STOP_S."""
    state = {"since": None}

    async def check() -> bool:
        viewers = (await _viewer_counts()).get(room_id, 0)
        now = asyncio.get_event_loop().time()
        if viewers > 0:
            state["since"] = None
            return False
        if state["since"] is None:
            state["since"] = now
            return False
        return (now - state["since"]) > IDLE_STOP_S

    return check


async def _shutdown_room(room: Room, *, remove: bool | None = None) -> None:
    """Stop the show (disconnect cast, cancel task). remove=True also deletes the
    room; default keeps seed rooms and deletes custom ones (the idle behaviour)."""
    print(f"[show:{room.id}] shutting down (remove={remove})")
    for pub in ROOM_PUBS.pop(room.id, {}).values():
        try:
            await pub.aclose()
        except Exception:
            pass
    ROOM_TASKS.pop(room.id, None)
    if remove is None:
        remove = room.id not in SEED_ROOM_IDS
    if remove and room.id not in SEED_ROOM_IDS:
        ROOMS.pop(room.id, None)


async def _list_device_guests(room_id: str) -> list[dict]:
    """Physical Reachy companions currently in the LiveKit room (green-room cast)."""
    import json as _json

    guests = []
    try:
        lk = api.LiveKitAPI(config.LIVEKIT_URL, config.LIVEKIT_API_KEY, config.LIVEKIT_API_SECRET)
        res = await lk.room.list_participants(api.ListParticipantsRequest(room=room_id))
        await lk.aclose()
        for p in res.participants:
            if not p.identity.startswith("reachy-device-"):
                continue
            meta = {}
            try:
                meta = _json.loads(p.metadata) if p.metadata else {}
            except Exception:
                pass
            guests.append({
                "id": f"d{len(guests) + 1}",
                "name": (p.name or p.identity)[:24],
                "persona": str(meta.get("persona") or "")[:60],
                "voice": str(meta.get("voice") or "")[:300],
                "device": p.identity,
            })
    except Exception as e:
        print(f"[room:{room_id}] device scan failed: {e}")
    return guests[:5]  # a call tops out at 5 hosts total


async def _run_generated_show(room: Room, required: list[dict] | None = None) -> None:
    """The live LLM+TTS cascade (the original project idea, end to end):
    one structured LLM call writes the cast + speaker→dialogue script (starring
    any physical Reachys from the green room), then each line is voiced by
    Qwen3-TTS — with line N+1's audio generating while line N plays — and
    streamed into the LiveKit room with subtitle data messages."""
    pubs = ROOM_PUBS.setdefault(room.id, {})
    # a hidden stage manager joins first so the audience gets real progress
    # while the script generates (the frontend hides 'stage-' participants)
    stage = ReachyPublisher(f"stage-{room.id}", "Stage Manager", room=room.id,
                            props={"role": "stage"})
    await stage.connect()
    pubs[stage.identity] = stage
    await stage.send_data({"type": "status", "text": "the writers’ room is drafting the script…"})

    # total cast = the real Reachys + the room's simulated count, capped at 5
    required = required or []
    n_speakers = max(2, min(5, len(required) + room.sim_count if required else room.sim_count))
    script = await showgen.write_script(room.title, room.topic,
                                        n_speakers=n_speakers, required=required)
    speakers = {s["id"]: s for s in script["speakers"]}
    names = " · ".join(s["name"] for s in script["speakers"])
    await stage.send_data({"type": "status", "text": f"cast locked: {names} — designing their voices…"})

    fresh = []
    for s in script["speakers"]:
        ident = f"gen-{room.id}-{s['id']}"
        if ident not in pubs:
            props = {k: s[k] for k in ("hat", "face", "neck") if s.get(k)}
            if s.get("device"):
                # this host IS a physical robot: the web UI routes this audio to
                # the device's card, and the device plays only this track
                props["forDevice"] = s["device"]
            pub = ReachyPublisher(
                ident, s["name"], room=room.id, color=s["color"], persona=s["persona"],
                props=props,
            )
            pubs[ident] = pub
            fresh.append(pub)
    if fresh:
        await asyncio.gather(*(p.connect() for p in fresh))
    anchor = next(iter(pubs.values()))

    async def voice(line: dict) -> str:
        spk = speakers[line["speaker"]]
        return await showgen.tts_wav(line["text"], spk["voice"])

    idle = _idle_tracker(room.id)
    history: list[dict] = []
    lines = list(script["lines"])
    rounds = 0
    played: list[tuple[str, str, str]] = []  # (identity, wav, text) for the fallback loop

    async def cleanup():
        await _shutdown_room(room)
        for _, wav, _t in played:
            try:
                os.unlink(wav)
            except OSError:
                pass

    while True:
        nxt = asyncio.create_task(voice(lines[0]))
        for i, line in enumerate(lines):
            spk = speakers[line["speaker"]]
            ident = f"gen-{room.id}-{line['speaker']}"
            pub = pubs.get(ident)
            if not pub:
                continue
            try:
                if not nxt.done():
                    # real dead air: TTS is still rendering this voice — give the
                    # audience the sound booth instead of frozen robots
                    await anchor.send_data({"type": "status", "phase": "voicing",
                                            "speaker": spk["name"], "color": spk["color"],
                                            "text": f"{spk['name']} is warming up…"})
                wav = await nxt
            except Exception as e:
                print(f"[show:{room.id}] tts failed: {e}")
                continue
            finally:
                if i + 1 < len(lines):
                    nxt = asyncio.create_task(voice(lines[i + 1]))  # the cascade
            await anchor.send_data({"type": "line", "speaker": spk["name"],
                                    "color": spk["color"], "text": line["text"]})
            await pub.say_file(wav)
            await pub.wait_until_idle()
            played.append((ident, wav, line["text"]))
            history.append({"speaker": spk["name"], "text": line["text"]})
            if await idle():
                return await cleanup()
            await asyncio.sleep(0.3)

        rounds += 1
        if rounds < 4 and not await idle():
            try:
                # phase:'writing' drives the mid-show writers' ticker in the UI
                await anchor.send_data({"type": "status", "phase": "writing",
                                        "text": "the cast is writing the next segment…"})
                cont = await showgen.write_script(room.title, room.topic,
                                                  n_speakers=n_speakers, history=history)
                # keep the original cast; only take the new lines that map onto it
                lines = [l for l in cont["lines"] if l["speaker"] in speakers] or lines
                continue
            except Exception as e:
                print(f"[show:{room.id}] continuation failed, looping: {e}")
        # fallback: replay what we have until the room empties out
        while played:
            for ident, wav, text in played:
                pub = pubs.get(ident)
                if not pub:
                    return
                spk_name = next((s["name"] for s in script["speakers"]
                                 if f"gen-{room.id}-{s['id']}" == ident), "")
                await anchor.send_data({"type": "line", "speaker": spk_name, "text": text})
                await pub.say_file(wav)
                await pub.wait_until_idle()
                if await idle():
                    return await cleanup()
                await asyncio.sleep(0.3)


# Serialise show creation per room: the task check → task registration window
# spans several awaits (publisher connects), so two concurrent joins could spawn
# TWO run-loops feeding the same publishers — every line plays twice.
_ENSURE_LOCKS: dict[str, asyncio.Lock] = {}


async def _ensure_conversation(room: Room) -> None:
    """Spin up the room's cast + looping conversation if it isn't already live."""
    async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()):
        await _ensure_conversation_locked(room)


async def _ensure_conversation_locked(room: Room) -> None:
    task = ROOM_TASKS.get(room.id)
    if task and not task.done():
        return
    cast = CAST_TEMPLATES.get(room.template)
    if not cast:
        # simulated open shows start on join; physical ones wait in the green
        # room until POST /rooms/{id}/start casts the connected robots
        if (room.status == "live" and room.topic
                and config.MODAL_LLM_URL and config.MODAL_TTS_URL):
            ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room))
        return
    if not any(config.SAMPLES_DIR.glob("*.wav")):
        raise HTTPException(400, "no voice clips found (run scripts/make-qwen-samples.py)")

    pubs = ROOM_PUBS.setdefault(room.id, {})
    fresh = []
    for identity, name, color, persona, _prefix, props in cast:
        if identity not in pubs:
            pub = ReachyPublisher(identity, name, room=room.id, color=color, persona=persona,
                                  props=props)
            pubs[identity] = pub
            fresh.append(pub)
    if fresh:  # the whole cast walks in together instead of single-file
        await asyncio.gather(*(p.connect() for p in fresh))

    clips = {c[0]: sorted(config.SAMPLES_DIR.glob(f"{c[4]}*.wav")) for c in cast}
    speakers = [c[0] for c in cast if clips[c[0]]]
    sequence: list[tuple[str, str]] = []
    for k in range(max((len(clips[s]) for s in speakers), default=0)):
        for s in speakers:
            if k < len(clips[s]):
                sequence.append((s, str(clips[s][k])))
    if not sequence:
        raise HTTPException(400, "no clips for this room")

    import pathlib as _pl

    who = {c[0]: (c[1], c[2]) for c in cast}  # identity -> (name, colour)

    async def run_loop():
        idle = _idle_tracker(room.id)
        anchor = next(iter(pubs.values()))
        i = 0
        while True:
            identity, clip = sequence[i % len(sequence)]
            pub = pubs.get(identity)
            if not pub:
                break
            text = SEED_LINES.get(_pl.Path(clip).stem)
            if text:
                name, color = who.get(identity, ("", None))
                await anchor.send_data({"type": "line", "speaker": name, "color": color, "text": text})
            await pub.say_file(clip)
            await pub.wait_until_idle()
            if await idle():  # nobody watching → wind down; next join restarts it
                await _shutdown_room(room)
                return
            await asyncio.sleep(0.35)
            i += 1

    ROOM_TASKS[room.id] = asyncio.create_task(run_loop())


# --------------------------------------------------------------------------- models


class TokenRequest(BaseModel):
    identity: str | None = None
    name: str | None = None
    room: str
    # physical Reachy companions join as real participants (a card in the grid)
    device: bool = False
    color: str | None = None
    persona: str | None = None
    voice: str | None = None
    bodyColor: str | None = None
    hat: str | None = None
    face: str | None = None
    neck: str | None = None


class CreateRoomRequest(BaseModel):
    title: str
    topic: str = ""
    emoji: str = "💬"
    template: str = "open"  # 'duo' | 'groupchat' | 'open'
    mode: str = "sim"  # 'sim' = all-virtual cast, starts on join · 'physical' = green room first
    simCount: int = 3  # sim mode: total hosts (2-5) · physical: virtual hosts to add (1-4)


class ReachyRequest(BaseModel):
    name: str
    persona: str | None = None
    voice: str | None = None
    color: str = "#49e6c8"
    bodyColor: str | None = None
    hat: str | None = None
    face: str | None = None
    neck: str | None = None


# --------------------------------------------------------------------------- routes


async def get_config():
    return {"livekitUrl": config.LIVEKIT_URL}


async def list_rooms():
    viewers = await _viewer_counts()
    return {"rooms": [{**_room_dict(r), "viewers": viewers.get(r.id, 0)} for r in ROOMS.values()]}


async def create_room(req: CreateRoomRequest):
    if FEATURES_LOCKED:
        raise HTTPException(403, "Room creation is under construction.")
    if req.template not in ("duo", "groupchat", "open"):
        raise HTTPException(400, "bad template")
    # standards desk: one LLM pass over title+topic before the room exists
    ok, why = await showgen.moderate_topic(req.title.strip(), req.topic.strip())
    if not ok:
        raise HTTPException(400, why)
    rid = f"{_slug(req.title)}-{uuid.uuid4().hex[:4]}"
    physical = req.template == "open" and req.mode == "physical"
    status = "waiting" if physical else "live"
    # sim shows pick the whole cast (2-5); physical shows pick the virtual
    # hosts that fill in around the real Reachys (1-4)
    sim_count = max(1, min(4, req.simCount)) if physical else max(2, min(5, req.simCount))
    ROOMS[rid] = Room(rid, req.title.strip() or "Untitled room", req.topic.strip(),
                      req.emoji or "💬", req.template, status=status, sim_count=sim_count)
    if status == "waiting":
        asyncio.create_task(_expire_waiting_room(rid))
    return {"ok": True, "id": rid, "room": _room_dict(ROOMS[rid])}


async def _expire_waiting_room(rid: str, ttl: float = 900.0) -> None:
    """A green room nobody starts or visits gets cleaned up after a while."""
    await asyncio.sleep(ttl)
    room = ROOMS.get(rid)
    if room and room.status == "waiting" and rid not in ROOM_TASKS:
        if (await _viewer_counts()).get(rid, 0) == 0:
            ROOMS.pop(rid, None)
            print(f"[room:{rid}] waiting room expired")


async def post_token(req: TokenRequest):
    metadata = None
    if req.device:
        identity = f"reachy-device-{_slug(req.name or 'reachy')}-{uuid.uuid4().hex[:4]}"
        metadata = {"role": "reachy", "device": True,
                    "color": req.color or "#49e6c8", "persona": req.persona or "a real Reachy Mini, live"}
        for k in ("voice", "bodyColor", "hat", "face", "neck"):
            if getattr(req, k):
                metadata[k] = getattr(req, k)
    else:
        identity = req.identity or f"viewer-{uuid.uuid4().hex[:8]}"
    token = tokens.make_token(identity, req.name, req.room, can_publish=False,
                              can_subscribe=True, metadata=metadata)
    return {"token": token, "url": config.LIVEKIT_URL, "room": req.room, "identity": identity}


async def join_room(room_id: str):
    room = ROOMS.get(room_id)
    if not room:
        raise HTTPException(404, "no such room")
    # never make the viewer wait for publishers to spin up — answer immediately
    # and let the cast stream into the room while the frontend plays its
    # patching-in choreography
    asyncio.create_task(_ensure_conversation_logged(room))
    return {"ok": True, "room": _room_dict(room)}


async def _ensure_conversation_logged(room: Room) -> None:
    try:
        await _ensure_conversation(room)
    except Exception as e:  # background task: surface in logs, not to the viewer
        print(f"[join:{room.id}] ensure_conversation failed: {e}")


async def start_show(room_id: str):
    """Leave the green room: cast the physical Reachys currently connected and
    kick off the generated show."""
    room = ROOMS.get(room_id)
    if not room:
        raise HTTPException(404, "no such room")
    if room.template != "open" or not room.topic:
        raise HTTPException(400, "only topic'd open rooms start this way")
    async with _ENSURE_LOCKS.setdefault(room.id, asyncio.Lock()):
        task = ROOM_TASKS.get(room.id)
        if task and not task.done():
            return {"ok": True, "already": True, "room": _room_dict(room)}
        if not (config.MODAL_LLM_URL and config.MODAL_TTS_URL):
            raise HTTPException(503, "show brain not configured")
        guests = await _list_device_guests(room.id)
        room.status = "live"
        ROOM_TASKS[room.id] = asyncio.create_task(_run_generated_show(room, required=guests))
        return {"ok": True, "guests": [g["name"] for g in guests], "room": _room_dict(room)}


async def add_reachy(room_id: str, req: ReachyRequest):
    """Drop a user-configured Reachy into the room. It joins and reacts; live
    speech needs the on-device brain + TTS, so its track stays silent for now."""
    if FEATURES_LOCKED:
        raise HTTPException(403, "Custom Reachies are under construction.")
    room = ROOMS.get(room_id)
    if not room:
        raise HTTPException(404, "no such room")
    if room.template != "open":
        raise HTTPException(403, "demo rooms are watch-only")
    ident = f"guest-{_slug(req.name)}-{uuid.uuid4().hex[:4]}"
    props = {k: v for k, v in (("hat", req.hat), ("face", req.face), ("neck", req.neck),
                               ("bodyColor", req.bodyColor)) if v}
    pub = ReachyPublisher(ident, req.name, room=room_id, color=req.color, persona=req.persona,
                          props=props)
    await pub.connect()
    ROOM_PUBS.setdefault(room_id, {})[ident] = pub
    return {"ok": True, "identity": ident}


class IdentityRequest(BaseModel):
    identity: str


async def remove_reachy(room_id: str, req: IdentityRequest):
    """Remove a guest Reachy when its viewer leaves (called on leave + on tab
    close via sendBeacon), so rooms don't accumulate ghost guests."""
    pub = ROOM_PUBS.get(room_id, {}).pop(req.identity, None)
    if pub:
        await pub.aclose()
    return {"ok": True}


# --------------------------------------------------------------------- stylist
# The "bring your own Reachy" wardrobe: the Nemotron brain picks 3D accessories
# (from the curated prop set the frontend bundles) to match a character.
CURATED_PROPS = {
    "hat": ["wizard", "cowboy", "tophat", "crown", "party", "pirate", "viking",
            "propeller", "santa", "halo", "baseball"],
    "face": ["sunglasses", "monocle", "skigoggles"],
    "neck": ["bowtie", "necktie"],
}


class StyleRequest(BaseModel):
    description: str


async def style_reachy(req: StyleRequest):
    """Dress the Reachy from its description (same Nemotron brain as the shows)."""
    desc = (req.description or "").strip()[:600]
    if not desc:
        raise HTTPException(400, "describe your Reachy first")
    if not config.MODAL_LLM_URL:
        raise HTTPException(503, "stylist brain not configured")
    try:
        return await showgen.style_outfit(desc, CURATED_PROPS)
    except Exception as e:
        raise HTTPException(502, f"stylist unavailable: {e}")


# --------------------------------------------------------------------- debug log
# Browser-side WebRTC diagnostics ship here so flaky-client issues (looking at
# you, Firefox) can be inspected server-side without copy-pasting consoles.
from collections import deque  # noqa: E402

DEBUG_LOG: deque[str] = deque(maxlen=3000)


class DebugLines(BaseModel):
    tag: str = ""
    lines: list[str]


async def post_debug_log(req: DebugLines):
    import time as _t

    ts = _t.strftime("%H:%M:%S")
    for ln in req.lines[:100]:
        DEBUG_LOG.append(f"{ts} [{req.tag[:24]}] {ln[:500]}")
    return {"ok": True}


# --------------------------------------------------------------------- admin
from fastapi import Header  # noqa: E402


def _check_admin(token: str | None) -> None:
    if not config.ADMIN_TOKEN or token != config.ADMIN_TOKEN:
        raise HTTPException(403, "bad admin token")


async def admin_debug_log(x_admin_token: str | None = Header(default=None)):
    _check_admin(x_admin_token)
    from fastapi.responses import PlainTextResponse

    return PlainTextResponse("\n".join(DEBUG_LOG))


async def admin_rooms(x_admin_token: str | None = Header(default=None)):
    """Everything running, with enough detail to decide what to kill."""
    _check_admin(x_admin_token)
    viewers = await _viewer_counts()
    out = []
    for r in list(ROOMS.values()):
        task = ROOM_TASKS.get(r.id)
        out.append({
            **_room_dict(r),
            "viewers": viewers.get(r.id, 0),
            "publishers": len(ROOM_PUBS.get(r.id, {})),
            "task": "running" if task and not task.done() else ("done" if task else "none"),
            "seed": r.id in SEED_ROOM_IDS,
        })
    return {"rooms": out}


async def _admin_stop_one(room_id: str, *, remove: bool) -> bool:
    room = ROOMS.get(room_id)
    task = ROOM_TASKS.get(room_id)
    if task and not task.done():
        task.cancel()
    if room:
        await _shutdown_room(room, remove=remove)
        return True
    # room already gone — sweep any orphans
    for pub in ROOM_PUBS.pop(room_id, {}).values():
        try:
            await pub.aclose()
        except Exception:
            pass
    ROOM_TASKS.pop(room_id, None)
    return False


async def admin_stop(room_id: str, x_admin_token: str | None = Header(default=None)):
    """Stop the show: cast disconnects, room stays and restarts on next join."""
    _check_admin(x_admin_token)
    await _admin_stop_one(room_id, remove=False)
    return {"ok": True}


async def admin_delete(room_id: str, x_admin_token: str | None = Header(default=None)):
    """Stop the show AND remove the room entirely (seed rooms can't be deleted)."""
    _check_admin(x_admin_token)
    await _admin_stop_one(room_id, remove=True)
    return {"ok": True}


class AdminBatch(BaseModel):
    ids: list[str]
    delete: bool = False


async def admin_stop_batch(req: AdminBatch, x_admin_token: str | None = Header(default=None)):
    """Stop (or delete) a selected set of rooms in one go."""
    _check_admin(x_admin_token)
    for rid in req.ids[:50]:
        await _admin_stop_one(rid, remove=req.delete)
    return {"ok": True, "count": len(req.ids[:50])}


async def admin_stop_all(x_admin_token: str | None = Header(default=None)):
    _check_admin(x_admin_token)
    ids = set(ROOM_TASKS) | set(ROOM_PUBS)
    for rid in ids:
        await _admin_stop_one(rid, remove=False)
    return {"ok": True, "stopped": sorted(ids)}


def attach(app) -> None:
    """Register CORS + gzip + the /api routes onto `app`."""
    from starlette.middleware.gzip import GZipMiddleware

    app.add_middleware(GZipMiddleware, minimum_size=1024)
    app.add_middleware(
        CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]
    )
    app.add_api_route("/api/config", get_config, methods=["GET"])
    app.add_api_route("/api/rooms", list_rooms, methods=["GET"])
    app.add_api_route("/api/rooms", create_room, methods=["POST"])
    app.add_api_route("/api/rooms/{room_id}/join", join_room, methods=["POST"])
    app.add_api_route("/api/rooms/{room_id}/start", start_show, methods=["POST"])
    app.add_api_route("/api/rooms/{room_id}/reachy", add_reachy, methods=["POST"])
    app.add_api_route("/api/rooms/{room_id}/reachy/leave", remove_reachy, methods=["POST"])
    app.add_api_route("/api/token", post_token, methods=["POST"])
    app.add_api_route("/api/style-reachy", style_reachy, methods=["POST"])
    app.add_api_route("/api/debug-log", post_debug_log, methods=["POST"])
    app.add_api_route("/api/admin/debug-log", admin_debug_log, methods=["GET"])
    app.add_api_route("/api/admin/rooms", admin_rooms, methods=["GET"])
    app.add_api_route("/api/admin/rooms/{room_id}/stop", admin_stop, methods=["POST"])
    app.add_api_route("/api/admin/rooms/{room_id}", admin_delete, methods=["DELETE"])
    app.add_api_route("/api/admin/stop-batch", admin_stop_batch, methods=["POST"])
    app.add_api_route("/api/admin/stop-all", admin_stop_all, methods=["POST"])


# Local dev app (the Space uses app.py / gradio.Server instead).
app = FastAPI(title="Small Talk")
attach(app)


def run() -> None:
    import uvicorn

    uvicorn.run("backend.server:app", host="0.0.0.0", port=8000, reload=False)


if __name__ == "__main__":
    run()