File size: 38,220 Bytes
ab31e87
 
 
1670d54
 
ab31e87
 
 
 
 
9fce4a4
9c65e45
f40cbb1
 
 
 
 
 
 
 
ab31e87
 
 
 
 
 
 
 
 
 
 
 
 
 
f40cbb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab31e87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
696985d
ab31e87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1670d54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f40cbb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1670d54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1d27551
 
 
 
1670d54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab31e87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eaae12d
f40cbb1
 
 
1670d54
 
 
 
 
 
f40cbb1
 
 
 
 
 
 
 
 
 
1670d54
 
 
f40cbb1
1670d54
ab31e87
 
 
eaae12d
1670d54
 
eaae12d
 
9c65e45
 
1670d54
eaae12d
9c65e45
eaae12d
 
1670d54
 
f40cbb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9c65e45
 
 
f40cbb1
 
 
 
 
 
 
1670d54
 
 
 
 
 
 
 
 
 
 
 
 
eaae12d
ab31e87
1670d54
 
 
 
 
 
 
 
 
f40cbb1
 
1670d54
 
 
 
f40cbb1
1670d54
 
ab31e87
eaae12d
 
 
ab31e87
1670d54
 
ab31e87
 
 
1670d54
ab31e87
eaae12d
 
 
 
 
 
ab31e87
 
 
 
 
eaae12d
 
 
 
1670d54
9c65e45
f40cbb1
9c65e45
ab31e87
 
 
1670d54
ab31e87
 
 
 
 
 
 
 
f40cbb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a928534
1670d54
 
89febe7
 
 
 
 
1670d54
ab31e87
 
1670d54
 
a928534
f40cbb1
a928534
 
f40cbb1
a928534
 
 
 
 
f40cbb1
1670d54
 
 
 
 
 
a928534
 
 
 
 
 
 
1670d54
f40cbb1
 
 
a928534
f40cbb1
1670d54
a928534
1670d54
f40cbb1
1670d54
ab31e87
f40cbb1
 
 
 
1670d54
 
 
 
 
 
f40cbb1
 
 
 
 
 
 
 
 
 
 
1670d54
 
eaae12d
1670d54
 
f40cbb1
 
1670d54
 
 
 
 
 
 
 
 
 
ab31e87
 
 
 
 
 
 
 
 
1670d54
f40cbb1
1670d54
 
 
 
f40cbb1
 
 
 
 
1670d54
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
import os
import re
import json
import math
import random
import requests
import pymupdf4llm
from concurrent.futures import ThreadPoolExecutor

DEFAULT_HF_TOKEN = os.environ.get("HF_TOKEN", "")
FLASH_MODEL = "openai/gpt-oss-120b:fastest"
PRO_MODEL = "zai-org/GLM-4.7:fastest"
SAFETY_MODEL = "openai/gpt-oss-safeguard-20b"

# Anchors that start before this many seconds are considered part of the
# stream's "intro window": real viewers haven't processed any spoken content
# yet, so Stage 2 chat for these anchors should skew toward arrival/meta
# chatter rather than fully-formed topic takes. See select_intro_anchors()
# and the INTRO-WINDOW HANDLING clause in stage_2_generate_all_drafts().
INTRO_WINDOW_SECONDS = 10.0

def extract_youtube_video_id(url: str) -> str:
    url = url.strip()
    if len(url) == 11 and re.match(r'^[a-zA-Z0-9_-]{11}$', url):
        return url
    patterns = [
        r'(?:v=|\/v\/|embed\/|shorts\/|youtu\.be\/|\/embed\/|\/watch\?v=|\&v=)([a-zA-Z0-9_-]{11})'
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return ""

def fetch_video_metadata(video_id: str, timeout: float = 5.0) -> dict:
    """Fetch lightweight public metadata (title, channel/author name) for a
    YouTube video via the no-auth oEmbed endpoint.

    This is the source of the "video title / channel" context used by:
      - stage_2_generate_all_drafts(): to optionally gate identity-aware
        intro-window messages (e.g. recognizing a well-known speaker).
      - check_content_safety(): as extra context for the pre-pipeline
        safety gate.

    Returns {"title": ..., "author_name": ...} on success, or {} if the
    request fails for any reason (network error, invalid video id, 404,
    etc.). A failure here is non-fatal β€” callers must treat missing/empty
    metadata as "identity unknown" and behave conservatively (no guessing).
    """
    try:
        resp = requests.get(
            "https://www.youtube.com/oembed",
            params={"url": f"https://www.youtube.com/watch?v={video_id}", "format": "json"},
            timeout=timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        return {
            "title": data.get("title", "") or "",
            "author_name": data.get("author_name", "") or "",
        }
    except Exception as e:
        print(f"oEmbed metadata fetch failed for video '{video_id}': {e}")
        return {}


def format_timestamp(seconds: float) -> str:
    mins = int(seconds // 60)
    secs = int(seconds % 60)
    return f"[{mins:02d}:{secs:02d}]"

def format_transcript_lines(transcript: list) -> str:
    lines = []
    for entry in transcript:
        time_str = format_timestamp(entry["start"])
        lines.append(f"{time_str} {entry['text']}")
    return "\n".join(lines)

def clean_json_text(text: str) -> str:
    text = text.strip()
    # Remove markdown code block wraps
    if text.startswith("```json"):
        text = text[7:]
    elif text.startswith("```"):
        text = text[3:]
    if text.endswith("```"):
        text = text[:-3]
    return text.strip()

def call_hf_router(model: str, messages: list, token: str) -> str:
    import time
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    data = {
        "model": model,
        "messages": messages,
        "temperature": 0.7
    }
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = requests.post(
                "https://router.huggingface.co/v1/chat/completions",
                headers=headers,
                json=data,
                timeout=300
            )
            response.raise_for_status()
            res_json = response.json()
            return res_json["choices"][0]["message"]["content"]
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            print(f"HF API call failed (attempt {attempt+1}/{max_retries}): {e}. Retrying...")
            time.sleep(2 ** attempt + 1)


def parse_srt(srt_text: str) -> list:
    entries = []
    # Normalize newlines
    srt_text = srt_text.replace('\r\n', '\n').replace('\r', '\n')
    blocks = re.split(r'\n\s*\n', srt_text.strip())
    for block in blocks:
        lines = [line.strip() for line in block.split('\n') if line.strip()]
        if len(lines) >= 3:
            time_line = lines[1]
            text = " ".join(lines[2:])
            # Match formats: 00:00:03,320 --> 00:00:05,960 or 00:00:03.320 --> 00:00:05.960
            match = re.match(r'(\d+):(\d+):(\d+)[,\.](\d+)\s*-->\s*(\d+):(\d+):(\d+)[,\.](\d+)', time_line)
            if match:
                h1, m1, s1, ms1, h2, m2, s2, ms2 = map(int, match.groups())
                start_secs = h1 * 3600 + m1 * 60 + s1 + ms1 / 1000.0
                end_secs = h2 * 3600 + m2 * 60 + s2 + ms2 / 1000.0
                duration = end_secs - start_secs
                entries.append({
                    "text": text,
                    "start": start_secs,
                    "duration": duration
                })
    return entries

def parse_transcript_text(text: str) -> list:
    text = text.strip()
    if "-->" in text:
        try:
            entries = parse_srt(text)
            if entries:
                return entries
        except Exception:
            pass
            
    # Fallback to plain text paragraph segmentation
    paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
    entries = []
    current_time = 0.0
    for p in paragraphs:
        words = len(p.split())
        duration = max(3.0, min(15.0, words / 3.0))
        entries.append({
            "text": p,
            "start": current_time,
            "duration": duration
        })
        current_time += duration + 1.0
    return entries

# ---------------------------------------------------------------------------
# Anchor index utilities (deterministic, no LLM)
# ---------------------------------------------------------------------------

# Anchor IDs use the format "anc_N" where N is the chronological index.
# validate_stage2_output relies on this naming to compare anchor ordering.
MIN_ANCHOR_DURATION = 3.0   # seconds
MAX_ANCHOR_DURATION = 10.0  # seconds


def build_anchor_index(transcript_entries: list) -> list:
    """Chunk raw transcript entries into sentence/clause-sized anchors.

    Handles overlapping auto-caption timestamps by clamping each entry's
    effective start to max(entry.start, previous_effective_end). Produces
    a gap-free, non-overlapping, chronologically ordered list of anchors:
        {anchor_id, start, end, text}
    """
    if not transcript_entries:
        return []

    # Sort by start time and resolve overlaps.
    sorted_entries = sorted(transcript_entries, key=lambda e: e["start"])
    resolved = []
    prev_end = 0.0
    for entry in sorted_entries:
        eff_start = max(float(entry["start"]), prev_end)
        eff_end = eff_start + max(float(entry["duration"]), 0.001)
        resolved.append({"text": entry["text"], "start": eff_start, "end": eff_end})
        prev_end = eff_end

    anchors = []
    chunk_start = resolved[0]["start"]
    chunk_texts = []
    chunk_end = chunk_start

    for entry in resolved:
        proposed_end = entry["end"]
        proposed_duration = proposed_end - chunk_start

        # Commit current chunk if adding this entry would exceed the cap
        # AND the chunk is already at minimum viable duration.
        if chunk_texts and (chunk_end - chunk_start) >= MIN_ANCHOR_DURATION and proposed_duration > MAX_ANCHOR_DURATION:
            anchors.append({
                "anchor_id": f"anc_{len(anchors)}",
                "start": chunk_start,
                "end": chunk_end,
                "text": " ".join(chunk_texts),
            })
            chunk_start = entry["start"]
            chunk_texts = [entry["text"]]
            chunk_end = entry["end"]
        else:
            chunk_texts.append(entry["text"])
            chunk_end = entry["end"]

    # Emit the final (possibly short) remainder chunk.
    if chunk_texts:
        anchors.append({
            "anchor_id": f"anc_{len(anchors)}",
            "start": chunk_start,
            "end": chunk_end,
            "text": " ".join(chunk_texts),
        })

    return anchors


def select_intro_anchors(anchors: list, window_seconds: float = INTRO_WINDOW_SECONDS) -> set:
    """Return the set of anchor_ids whose start falls within the stream's
    "intro window" (the first `window_seconds` seconds of the video).

    Pure and deterministic: anchor-list-in, anchor-id-set-out. No LLM calls,
    no I/O. At the default 5-10s anchor size from build_anchor_index(), a
    20s window typically covers the first 2-5 anchors.

    Used by stage_2_generate_all_drafts() to bias early chat toward
    arrival/meta chatter instead of fully-formed topic reactions, since real
    viewers haven't processed any spoken content in the first ~20 seconds.
    """
    return {a["anchor_id"] for a in anchors if float(a["start"]) < window_seconds}


def map_anchors_to_segments(anchors: list, segments: list) -> dict:
    """Assign each anchor to exactly one topic segment.

    Assignment is based on each anchor's midpoint. If the midpoint falls
    outside every segment (e.g. a coverage gap in Stage 1a output), the
    anchor is assigned to the segment with the nearest midpoint.

    Returns {segment_index: [anchor_id, ...]} for all segment indices.
    """
    # Pre-seed every segment index with an empty list.
    mapping = {i: [] for i in range(len(segments))}

    for anchor in anchors:
        midpoint = (anchor["start"] + anchor["end"]) / 2.0

        # Find the segment whose range contains the midpoint.
        assigned = None
        for i, seg in enumerate(segments):
            if float(seg["start"]) <= midpoint < float(seg["end"]):
                assigned = i
                break

        # Fallback: nearest segment by midpoint distance.
        if assigned is None:
            def seg_mid(s):
                return (float(s["start"]) + float(s["end"])) / 2.0
            assigned = min(range(len(segments)), key=lambda i: abs(seg_mid(segments[i]) - midpoint))

        mapping[assigned].append(anchor["anchor_id"])

    return mapping


def validate_stage2_output(payload: list, valid_anchor_ids: set) -> list:
    """Validate the LLM's Stage 2 output against the temporal-alignment contract.

    Returns a list of error strings (empty = valid). Does not raise.

    Checks:
    - Every message has id, username, text, anchor_id.
    - anchor_id is in valid_anchor_ids.
    - Message ids are unique across the entire payload.
    - reply_to (if present) references a known message id.
    - reply_to anchor index <= replier anchor index (no replying to the future).
    - No cycles in the reply graph (same-anchor or cross-anchor).
    """
    errors = []
    all_messages = []
    for seg in payload:
        all_messages.extend(seg.get("messages", []))

    # Build id -> message index and check for duplicates.
    id_to_msg = {}
    for msg in all_messages:
        mid = msg.get("id")
        if mid is None:
            errors.append(f"Message missing 'id' field: {msg.get('text', '')[:40]!r}")
            continue
        if mid in id_to_msg:
            errors.append(f"Duplicate message id: {mid!r}")
        else:
            id_to_msg[mid] = msg

    def anchor_index(aid: str) -> int:
        """Parse integer index from 'anc_N' format; return large sentinel on failure."""
        try:
            return int(aid.split("_", 1)[1])
        except (IndexError, ValueError):
            return 10 ** 9

    for msg in all_messages:
        mid = msg.get("id")
        aid = msg.get("anchor_id")

        # Required fields.
        if aid is None:
            errors.append(f"Message {mid!r} missing 'anchor_id'")
        elif aid not in valid_anchor_ids:
            errors.append(f"Message {mid!r} references unknown anchor_id {aid!r}")

        # reply_to validation.
        reply_to = msg.get("reply_to")
        if reply_to is not None:
            if reply_to not in id_to_msg:
                errors.append(f"Message {mid!r} has reply_to {reply_to!r} which does not exist")
            elif aid is not None:
                parent = id_to_msg[reply_to]
                parent_aid = parent.get("anchor_id")
                if parent_aid is not None and anchor_index(parent_aid) > anchor_index(aid):
                    errors.append(
                        f"Message {mid!r} (anchor {aid}) has reply_to {reply_to!r} "
                        f"(anchor {parent_aid}) which is later in the video"
                    )

    # Cycle detection via DFS on the full reply graph.
    reply_graph = {msg["id"]: msg.get("reply_to") for msg in all_messages if "id" in msg}
    for start_id in reply_graph:
        visited = set()
        node = start_id
        while node is not None:
            if node in visited:
                errors.append(f"Cycle detected in reply_to chain starting at {start_id!r}")
                break
            visited.add(node)
            node = reply_graph.get(node)  # None if no reply_to or id not in graph

    return errors


def compute_display_times(messages: list, anchors_by_id: dict) -> list:
    """Assign a numeric displayTime to every message.

    Root messages: anchor.start + uniform(1, min(8, anchor_duration)).
    Replies:       max(parent.displayTime + uniform(2, 12), own_anchor.start).

    The jitter upper bound for roots is clamped to anchor_duration so a
    reaction can't land past its anchor's end time.

    Processes in topological order (roots first). Malformed cycles or
    permanently-unresolvable replies are demoted to root status after a
    bounded number of passes.

    Returns a new list of message dicts with 'displayTime' added.
    """
    # Work on shallow copies to avoid mutating the input.
    result = [dict(m) for m in messages]
    resolved = {}  # id -> displayTime

    max_passes = len(result) + 1  # bound iteration to prevent infinite loops
    remaining = list(result)

    for _ in range(max_passes):
        if not remaining:
            break
        still_waiting = []
        for msg in remaining:
            mid = msg.get("id")
            anchor = anchors_by_id.get(msg.get("anchor_id", ""))
            if anchor is None:
                # Unknown anchor β€” assign a safe fallback of 0.
                msg["displayTime"] = 0.0
                if mid:
                    resolved[mid] = 0.0
                continue

            reply_to = msg.get("reply_to")
            if reply_to and reply_to not in resolved:
                # Parent not yet placed; come back to this message.
                still_waiting.append(msg)
                continue

            anchor_dur = anchor["end"] - anchor["start"]
            if reply_to and reply_to in resolved:
                jitter = random.uniform(2, 12)
                dt = max(resolved[reply_to] + jitter, anchor["start"])
            else:
                # Root message β€” jitter capped to anchor duration.
                hi = min(8.0, anchor_dur)
                if anchor.get("anchor_id") == "anc_0":
                    jitter = random.uniform(0.5, max(1.0, hi))
                else:
                    jitter = random.uniform(2.0, max(2.0, hi))
                dt = anchor["start"] + jitter

            msg["displayTime"] = dt
            if mid:
                resolved[mid] = dt

        remaining = still_waiting

    # Any messages still unresolved (cycles) get demoted to root placement.
    for msg in remaining:
        anchor = anchors_by_id.get(msg.get("anchor_id", ""))
        if anchor:
            anchor_dur = anchor["end"] - anchor["start"]
            hi = min(8.0, anchor_dur)
            msg["displayTime"] = anchor["start"] + random.uniform(1, max(1.0, hi))
        else:
            msg["displayTime"] = 0.0
        print(f"WARNING: compute_display_times: demoted {msg.get('id')!r} to root (unresolvable reply chain)")

    return result


# --- STAGE 1a: Segment transcript ---
def stage_1a_segment_transcript(transcript_text: str, token: str) -> list:
    system_prompt = (
        "You are an AI assistant that segments a video transcript into logical topic segments. "
        "You are given a transcript formatted with timestamps.\n"
        "Your task is to group consecutive lines into segments of about 30 to 60 seconds (but align with natural topic boundaries).\n"
        "For each segment, output:\n"
        "- start: the start time in seconds (float or int)\n"
        "- end: the end time in seconds (float or int)\n"
        "- text: the concatenated text in this segment\n\n"
        "Return ONLY a JSON list of segments. Do not include markdown wraps or conversational text outside the JSON."
    )
    user_prompt = f"Transcript:\n{transcript_text}"
    
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]
    content = call_hf_router(FLASH_MODEL, messages, token)
    cleaned = clean_json_text(content)
    return json.loads(cleaned)

# --- STAGE 2: Generate all draft comments (Pro model) ---
def stage_2_generate_all_drafts(segments: list, doc_text: str, token: str, anchor_map: dict = None,
                                 all_anchors: list = None, intro_anchor_ids: set = None,
                                 video_metadata: dict = None) -> list:
    """Generate draft chat messages for all segments.

    anchor_map: {seg_idx: [anchor_id, ...]} from map_anchors_to_segments.
    all_anchors: full anchor list from build_anchor_index (for text lookup).
    When both are provided, each segment's anchor list is injected into the
    prompt so the model can pick a concrete timestamped moment per message.

    intro_anchor_ids: set of anchor_ids from select_intro_anchors(), i.e.
    anchors within the stream's first ~20s. These are tagged [INTRO] in the
    anchor lists and given special handling instructions (see
    INTRO-WINDOW HANDLING below).

    video_metadata: optional {"title": ..., "author_name": ...} from
    fetch_video_metadata(). When present with a non-empty title/channel, the
    model may (optionally) generate up to 1-2 identity-aware intro messages
    if the speaker's identity is unambiguous; otherwise it must not guess.
    """
    # Build anchor lookup if available.
    anchors_by_id = {a["anchor_id"]: a for a in (all_anchors or [])}
    intro_anchor_ids = intro_anchor_ids or set()

    system_prompt = (
        "You are simulating audience chat reactions for a livestream of an educational or historical video.\n"
        "You are given:\n"
        "1. A reference document.\n"
        "2. A chronological list of video segments. Each segment lists its ANCHOR MOMENTS: timestamped transcript"
        " chunks at the sentence/clause level.\n\n"
        "Your task is to generate 8 to 15 draft chat messages from different users reacting to EACH video segment.\n\n"
        "Crucially, you must follow these steps:\n"
        "1. Identify the conceptual relationships (e.g., direct validations, unintentional contradictions, theoretical bridges) between the video and the document.\n"
        "2. Frame reactions to each caption segment by drawing on the identified conceptual relationships without direct reference or quotes. AVOID acadmeic jargon.\n"
        "3. Generate chat messages for each segment. Every message MUST be anchored to a specific moment from that segment's anchor list.\n\n"
        "QUOTAS AND PERSONAS (STRICTLY ENFORCED):\n"
        "- Include at least 50% on-topic contributions reflecting agreement, disagreement, jokes, observations, predictions, corrections, hype and skepticism to complement occasional off-topic remarks, each with an anchor_id.\n"
        "- Ensure diverse, non-repetitive usernames across the entire video. Do not use the same usernames repeatedly for the same types of comments.\n"
        "- Maintain diverse livestream audience personas: some are experts reading deeply into philosophical tension, some take things entirely at face value, some only react emotionally or to video aesthetics, some use sarcasm/memes.\n\n"
        "REPLY THREADING (optional):\n"
        "- A message MAY include a 'reply_to' field containing another message's 'id' from the same or immediately preceding anchor.\n"
        "- A reply must NEVER reference a message anchored to a later anchor than itself.\n"
    )

    if intro_anchor_ids:
        title = (video_metadata or {}).get("title", "").strip()
        author = (video_metadata or {}).get("author_name", "").strip()

        if title or author:
            channel_part = f" from channel \"{author}\"" if author else ""
            identity_clause = (
                f"- IDENTITY-AWARE INTRO MESSAGES (OPTIONAL): The video title is \"{title or 'unknown'}\"{channel_part}. "
                "If β€” and only if β€” this title, channel, or the reference document make the speaker's "
                "identity or significance unambiguous, you MAY include up to 1-2 [INTRO]-anchored "
                "messages reacting to who the speaker is or why they matter (e.g. recognizing a "
                "well-known figure). Do not guess or assert identity beyond what these sources support.\n"
            )
        else:
            identity_clause = (
                "- IDENTITY-AWARE INTRO MESSAGES: No reliable video title/channel metadata is available. "
                "Do NOT include any [INTRO]-anchored messages that guess or assert who the speaker is β€” "
                "keep all [INTRO] messages generic arrival/meta chatter.\n"
            )

        system_prompt += (
            "\nINTRO-WINDOW HANDLING:\n"
            "- Anchors marked [INTRO] in the ANCHOR MOMENTS lists below fall within the stream's first "
            "~20 seconds. Real viewers haven't processed any spoken content yet at this point β€” they're "
            "still arriving, reading the title, or reacting to who/what the speaker is sharing.\n"
            "- For messages anchored to an [INTRO] anchor, 80-90% should be arrival/meta chatter NOT engagement "
            "with the spoken content. This OVERRIDES the normal off-topic quota for these messages.\n"
            "- Set '_internal_logic' to \"None\" for any segment whose messages are dominated by "
            "[INTRO] anchors, regardless of any document tie.\n"
            f"{identity_clause}"
        )

    system_prompt += (
        "\nFORMAT: Return a JSON list of objects, one per segment:\n"
        "{\n"
        "  \"_internal_logic\": \"How this segment relates to document sub-claims, or 'None' if off-topic.\",\n"
        "  \"messages\": [\n"
        "    {\n"
        "      \"id\": \"<unique string, e.g. m1>\",\n"
        "      \"username\": \"<username>\",\n"
        "      \"text\": \"<message text>\",\n"
        "      \"anchor_id\": \"<anchor_id from this segment's anchor list>\",\n"
        "      \"reply_to\": \"<optional: another message's id>\"\n"
        "    }\n"
        "  ]\n"
        "}\n\n"
        "CRITICAL: Do not include 'timestamp'. Do not omit 'anchor_id' or 'id'. "
        "Return ONLY a valid JSON list. Do not include markdown wraps or other text."
    )

    def format_segment(i, seg):
        seg_header = f"Segment {i} ({seg['start']}s - {seg['end']}s):\n{seg['text']}"
        if anchor_map and all_anchors:
            anchor_ids = anchor_map.get(i, [])
            anchor_lines = []
            for aid in anchor_ids:
                a = anchors_by_id.get(aid)
                if a:
                    intro_tag = " [INTRO]" if aid in intro_anchor_ids else ""
                    anchor_lines.append(f"  [{aid}]{intro_tag} {a['start']:.1f}s: {a['text'][:80]}")
            if anchor_lines:
                seg_header += "\nANCHOR MOMENTS (pick one per message):\n" + "\n".join(anchor_lines)
        return seg_header


    segments_text = "\n\n".join([format_segment(i, seg) for i, seg in enumerate(segments)])

    user_prompt = (
        f"Reference Document:\n{doc_text}\n\n"
        f"Video Segments:\n{segments_text}\n\n"
        "Generate the JSON list of draft comments for all segments."
    )

    llm_messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]
    content = call_hf_router(PRO_MODEL, llm_messages, token)
    cleaned = clean_json_text(content)
    try:
        data = json.loads(cleaned)
        return data
    except Exception as e:
        print(f"Failed to parse JSON from Stage 2. Raw content: {content}")
        raise e

# --- STAGE 3: Style and pacing (Flash model) ---
def stage_3_stylize_segment(draft_data: dict, token: str) -> dict:
    system_prompt = (
        "You are a style polisher for livestream chat replays (YouTube/Twitch).\n"
        "Your job is to take raw draft chat messages and perform a final flourish and alignment pass to make them sound authentic.\n\n"
        "CRITICAL INSTRUCTIONS:\n"
        "1. PRESERVE DIVERSITY: The draft already contains carefully balanced personas (jokers, experts, off-topic, skeptics). DO NOT homogenize them. If a message is off-topic, keep it off-topic. If it's a joke about the video, keep it a joke.\n"
        "2. PRESERVE USERNAMES: You MUST use the exact usernames provided in the draft. Do not invent new ones.\n"
        "3. PRESERVE STRUCTURAL FIELDS: Keep 'id', 'anchor_id', and 'reply_to' (if present) on every message exactly as given. Do not rename, remove, or alter these fields.\n"
        "4. ADD FLOURISH: Make them short, concise, and lively. Occasionally inject internet slang and standard emotes where appropriate, but don't overdo it.\n"
        "5. Avoid sounding like AI-generated summaries. Do not append emotes to every single message.\n"
        "6. SAFETY PASS: If any message crosses from edgy/sarcastic banter into harassment, slurs, hate speech, or denigration of real people or groups, rewrite it to keep the same persona and sentiment but remove the harmful element.\n\n"
        "Return ONLY the updated JSON with the exact same structure. Do not include markdown wraps."
    )
    user_prompt = f"Draft JSON:\n{json.dumps(draft_data)}"

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]
    content = call_hf_router(FLASH_MODEL, messages, token)
    cleaned = clean_json_text(content)
    return json.loads(cleaned)

# --- Safety gates ---
def check_content_safety(doc_text: str, video_metadata: dict, token: str) -> tuple:
    """Pre-pipeline hard-stop gate (runs before Stage 2's expensive Pro call).

    Classifies whether the reference document β€” combined with whatever video
    title/channel context is available β€” is appropriate source material for
    a simulated livestream chat. Returns (is_safe: bool, reason: str).

    Intentionally coarse: a single Flash-model classification call, not a
    per-message filter (see final_safety_scan() for the post-generation
    pass). Exists to hard-stop on inputs whose "natural" simulated chat would
    likely be hate speech, harassment of real people/groups, glorification of
    or instructions for violence/self-harm, or sexual content involving
    minors. General controversial-but-legitimate material (politics, science
    controversies, history of atrocities discussed academically, etc.) is
    SAFE and should pass.

    Design note: fails OPEN (treats classifier errors as "safe") so a
    transient API hiccup doesn't block legitimate users. The decision (or
    failure) is always logged.
    """
    title = (video_metadata or {}).get("title", "").strip()
    author = (video_metadata or {}).get("author_name", "").strip()

    system_prompt = (
        "You are a content-safety gate for an app that generates SIMULATED livestream chat "
        "reactions to educational/informational videos, based on a reference document.\n"
        "Given a video's title/channel (if known) and an excerpt of the reference document, "
        "decide whether this is appropriate source material for that simulation.\n\n"
        "Mark unsafe ONLY if the natural simulated chat reactions to this material would likely "
        "include hate speech, harassment of real people or groups, glorification of or "
        "instructions for violence or self-harm, or sexual content involving minors. General "
        "controversial-but-legitimate topics (politics, policy debates, science controversies, "
        "history of atrocities discussed academically, etc.) are SAFE.\n\n"
        "Respond with ONLY a JSON object: {\"safe\": true|false, \"reason\": \"<one short sentence>\"}. "
        "Do not include markdown wraps or other text."
    )
    excerpt = (doc_text or "")[:4000]
    user_prompt = (
        f"Video title: {title or '(unknown)'}\n"
        f"Channel: {author or '(unknown)'}\n\n"
        f"Reference document excerpt:\n{excerpt}"
    )

    try:
        content = call_hf_router(SAFETY_MODEL, [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ], token)
        result = json.loads(clean_json_text(content))
        is_safe = bool(result.get("safe", True))
        reason = result.get("reason", "")
        if not is_safe:
            print(f"Content safety gate REJECTED input: {reason}")
        return is_safe, reason
    except Exception as e:
        print(f"Content safety gate failed to run ({e}); failing open (treating as safe).")
        return True, ""


def final_safety_scan(messages: list, token: str) -> list:
    """Aggregate post-generation moderation pass over all flattened,
    stylized messages.

    Stage 3 already asks the Flash model to soften individual messages (see
    its SAFETY PASS instruction), but that runs per-segment and can miss
    patterns only visible across the whole chat (e.g. repeated harassment of
    the same target spread across segments). This is one additional Flash
    call over the full message list as a second net.

    For each message, the model returns one of:
      - "keep"    -> message is left unchanged.
      - "replace" -> message['text'] is swapped for the supplied 'replacement'
                      (same persona/sentiment, harmful element removed).
      - "drop"    -> message is removed from the output entirely.

    Fails OPEN (returns `messages` unchanged) on any classifier or parsing
    error, since this is a best-effort secondary net rather than the primary
    gate (see check_content_safety for the pre-pipeline hard stop).
    """
    if not messages:
        return messages

    system_prompt = (
        "You are a final moderation pass for SIMULATED livestream chat messages (fictional "
        "audience reactions, not real users).\n"
        "For EVERY message below, decide one action:\n"
        "- \"keep\": fine as-is. Edgy/sarcastic humor, strong opinions, and in-group banter are fine.\n"
        "- \"replace\": the message crosses into harassment, slurs, hate speech, or denigration of "
        "real people or groups. Provide a 'replacement' string with the same persona/sentiment but "
        "with the harmful element removed.\n"
        "- \"drop\": the message is irredeemable and should be removed entirely.\n\n"
        "Return ONLY a JSON list, one entry per input message, in any order:\n"
        "[{\"id\": \"<id>\", \"action\": \"keep|replace|drop\", \"replacement\": \"<only if action=replace>\"}]\n"
        "Do not include markdown wraps or other text."
    )
    payload = [{"id": m.get("id"), "text": m.get("text", "")} for m in messages]
    user_prompt = f"Messages:\n{json.dumps(payload, ensure_ascii=False)}"

    try:
        content = call_hf_router(SAFETY_MODEL, [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ], token)
        decisions = json.loads(clean_json_text(content))
        decisions_by_id = {d.get("id"): d for d in decisions if isinstance(d, dict)}
    except Exception as e:
        print(f"Final safety scan failed to run ({e}); failing open (returning messages unchanged).")
        return messages

    result = []
    dropped, replaced = 0, 0
    for msg in messages:
        decision = decisions_by_id.get(msg.get("id"))
        action = (decision or {}).get("action", "keep")
        if action == "drop":
            dropped += 1
            continue
        if action == "replace" and decision.get("replacement"):
            msg = dict(msg)
            msg["text"] = decision["replacement"]
            replaced += 1
        result.append(msg)

    if dropped or replaced:
        print(f"Final safety scan: dropped {dropped}, replaced {replaced} of {len(messages)} messages.")
    return result

# --- Parallel Tasks ---
def _fetch_and_segment_transcript(video_id: str, transcript_text: str, token: str) -> tuple:
    """Return (segments, raw_transcript) so the caller can build the anchor index."""
    if not transcript_text:
        raise ValueError("Transcript text is required but was not provided.")
        
    print("Parsing provided transcript text...")
    raw_transcript = parse_transcript_text(transcript_text)

    transcript_text_formatted = format_transcript_lines(raw_transcript)
    print("Stage 1a: Segmenting transcript...")
    segments = stage_1a_segment_transcript(transcript_text_formatted, token)
    return segments, raw_transcript

def _extract_document_text(doc_text: str, doc_path: str, use_ocr: bool = False) -> str:
    if doc_path:
        print(f"Extracting text from PDF: {doc_path}...")
        return pymupdf4llm.to_markdown(doc_path, use_ocr=use_ocr)
    if doc_text:
        return doc_text
    raise ValueError("Either doc_text or doc_path must be provided.")

# --- Full Pipeline Orchestration ---
def run_livestream_pipeline(video_id: str, doc_text: str = None, doc_path: str = None, transcript_text: str = None, token: str = None, use_ocr: bool = False) -> list:
    """Run the full pipeline and return a flat list of messages sorted by displayTime.

    Return value shape changed from list[segment_dict] to list[message_dict].
    Each message has: id, username, text, anchor_id, displayTime,
    and optionally reply_to.
    """
    if not token:
        token = os.environ.get("HF_TOKEN", DEFAULT_HF_TOKEN)
    if not token:
        raise ValueError(
            "Hugging Face API Token not found. Please set the 'HF_TOKEN' secret in your Space settings "
            "or provide it in the input box."
        )

    print("Starting parallel execution of Document Extraction, Transcript Segmentation, and Metadata Fetch...")
    with ThreadPoolExecutor(max_workers=3) as executor:
        fut_doc = executor.submit(_extract_document_text, doc_text, doc_path, use_ocr)
        fut_seg = executor.submit(_fetch_and_segment_transcript, video_id, transcript_text, token)
        fut_meta = executor.submit(fetch_video_metadata, video_id)

        extracted_doc_text = fut_doc.result()
        segments, raw_transcript = fut_seg.result()
        video_metadata = fut_meta.result()

    print(f"Segmented into {len(segments)} blocks.")
    if video_metadata:
        print(f"Video metadata: title={video_metadata.get('title')!r}, channel={video_metadata.get('author_name')!r}")
    else:
        print("Video metadata unavailable (oEmbed fetch failed) β€” identity-aware intro messages disabled.")

    # Build anchor index deterministically from raw transcript (no LLM call).
    anchors = build_anchor_index(raw_transcript)
    anchor_map = map_anchors_to_segments(anchors, segments)
    anchors_by_id = {a["anchor_id"]: a for a in anchors}
    valid_anchor_ids = set(anchors_by_id.keys())
    intro_anchor_ids = select_intro_anchors(anchors)
    print(f"Built anchor index: {len(anchors)} anchors ({len(intro_anchor_ids)} in the intro window).")

    # Hard stop: gate on content safety before the expensive Stage 2/3 calls.
    is_safe, reason = check_content_safety(extracted_doc_text, video_metadata, token)
    if not is_safe:
        raise ValueError(
            f"Content safety check failed: {reason} "
            "This input was flagged as unsuitable for simulated chat generation. "
            "Please choose a different reference document or video."
        )

    # Stage 2: Single Pro model call for all drafting.
    print("Stage 2: Generating draft comments for all segments (Pro model)...")
    draft_segments = stage_2_generate_all_drafts(
        segments, extracted_doc_text, token,
        anchor_map=anchor_map, all_anchors=anchors,
        intro_anchor_ids=intro_anchor_ids, video_metadata=video_metadata
    )

    # Validate Stage 2 output before passing downstream.
    validation_errors = validate_stage2_output(draft_segments, valid_anchor_ids)
    if validation_errors:
        # Log and raise β€” malformed anchor/reply data must not flow into Stage 3.
        error_summary = "\n".join(validation_errors[:10])
        raise ValueError(f"Stage 2 output failed validation ({len(validation_errors)} error(s)):\n{error_summary}")

    # Stage 3: Parallel stylization.
    print("Stage 3: Stylizing comments...")
    final_segments = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(stage_3_stylize_segment, draft, token)
            for draft in draft_segments
        ]
        for fut in futures:
            final_segments.append(fut.result())

    # Flatten all messages.
    all_messages = []
    for seg in final_segments:
        all_messages.extend(seg.get("messages", []))

    # Final aggregate moderation pass (second net, after Stage 3's per-segment pass).
    print("Running final safety scan over all messages...")
    all_messages = final_safety_scan(all_messages, token)

    # Compute displayTime, sort numerically.
    all_messages = compute_display_times(all_messages, anchors_by_id)
    all_messages.sort(key=lambda m: m.get("displayTime", 0))
    return all_messages