Spaces:
Running
Running
File size: 38,220 Bytes
ab31e87 1670d54 ab31e87 9fce4a4 9c65e45 f40cbb1 ab31e87 f40cbb1 ab31e87 696985d ab31e87 1670d54 f40cbb1 1670d54 1d27551 1670d54 ab31e87 eaae12d f40cbb1 1670d54 f40cbb1 1670d54 f40cbb1 1670d54 ab31e87 eaae12d 1670d54 eaae12d 9c65e45 1670d54 eaae12d 9c65e45 eaae12d 1670d54 f40cbb1 9c65e45 f40cbb1 1670d54 eaae12d ab31e87 1670d54 f40cbb1 1670d54 f40cbb1 1670d54 ab31e87 eaae12d ab31e87 1670d54 ab31e87 1670d54 ab31e87 eaae12d ab31e87 eaae12d 1670d54 9c65e45 f40cbb1 9c65e45 ab31e87 1670d54 ab31e87 f40cbb1 a928534 1670d54 89febe7 1670d54 ab31e87 1670d54 a928534 f40cbb1 a928534 f40cbb1 a928534 f40cbb1 1670d54 a928534 1670d54 f40cbb1 a928534 f40cbb1 1670d54 a928534 1670d54 f40cbb1 1670d54 ab31e87 f40cbb1 1670d54 f40cbb1 1670d54 eaae12d 1670d54 f40cbb1 1670d54 ab31e87 1670d54 f40cbb1 1670d54 f40cbb1 1670d54 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 | import os
import re
import json
import math
import random
import requests
import pymupdf4llm
from concurrent.futures import ThreadPoolExecutor
DEFAULT_HF_TOKEN = os.environ.get("HF_TOKEN", "")
FLASH_MODEL = "openai/gpt-oss-120b:fastest"
PRO_MODEL = "zai-org/GLM-4.7:fastest"
SAFETY_MODEL = "openai/gpt-oss-safeguard-20b"
# Anchors that start before this many seconds are considered part of the
# stream's "intro window": real viewers haven't processed any spoken content
# yet, so Stage 2 chat for these anchors should skew toward arrival/meta
# chatter rather than fully-formed topic takes. See select_intro_anchors()
# and the INTRO-WINDOW HANDLING clause in stage_2_generate_all_drafts().
INTRO_WINDOW_SECONDS = 10.0
def extract_youtube_video_id(url: str) -> str:
url = url.strip()
if len(url) == 11 and re.match(r'^[a-zA-Z0-9_-]{11}$', url):
return url
patterns = [
r'(?:v=|\/v\/|embed\/|shorts\/|youtu\.be\/|\/embed\/|\/watch\?v=|\&v=)([a-zA-Z0-9_-]{11})'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return ""
def fetch_video_metadata(video_id: str, timeout: float = 5.0) -> dict:
"""Fetch lightweight public metadata (title, channel/author name) for a
YouTube video via the no-auth oEmbed endpoint.
This is the source of the "video title / channel" context used by:
- stage_2_generate_all_drafts(): to optionally gate identity-aware
intro-window messages (e.g. recognizing a well-known speaker).
- check_content_safety(): as extra context for the pre-pipeline
safety gate.
Returns {"title": ..., "author_name": ...} on success, or {} if the
request fails for any reason (network error, invalid video id, 404,
etc.). A failure here is non-fatal β callers must treat missing/empty
metadata as "identity unknown" and behave conservatively (no guessing).
"""
try:
resp = requests.get(
"https://www.youtube.com/oembed",
params={"url": f"https://www.youtube.com/watch?v={video_id}", "format": "json"},
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
return {
"title": data.get("title", "") or "",
"author_name": data.get("author_name", "") or "",
}
except Exception as e:
print(f"oEmbed metadata fetch failed for video '{video_id}': {e}")
return {}
def format_timestamp(seconds: float) -> str:
mins = int(seconds // 60)
secs = int(seconds % 60)
return f"[{mins:02d}:{secs:02d}]"
def format_transcript_lines(transcript: list) -> str:
lines = []
for entry in transcript:
time_str = format_timestamp(entry["start"])
lines.append(f"{time_str} {entry['text']}")
return "\n".join(lines)
def clean_json_text(text: str) -> str:
text = text.strip()
# Remove markdown code block wraps
if text.startswith("```json"):
text = text[7:]
elif text.startswith("```"):
text = text[3:]
if text.endswith("```"):
text = text[:-3]
return text.strip()
def call_hf_router(model: str, messages: list, token: str) -> str:
import time
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
data = {
"model": model,
"messages": messages,
"temperature": 0.7
}
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.post(
"https://router.huggingface.co/v1/chat/completions",
headers=headers,
json=data,
timeout=300
)
response.raise_for_status()
res_json = response.json()
return res_json["choices"][0]["message"]["content"]
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f"HF API call failed (attempt {attempt+1}/{max_retries}): {e}. Retrying...")
time.sleep(2 ** attempt + 1)
def parse_srt(srt_text: str) -> list:
entries = []
# Normalize newlines
srt_text = srt_text.replace('\r\n', '\n').replace('\r', '\n')
blocks = re.split(r'\n\s*\n', srt_text.strip())
for block in blocks:
lines = [line.strip() for line in block.split('\n') if line.strip()]
if len(lines) >= 3:
time_line = lines[1]
text = " ".join(lines[2:])
# Match formats: 00:00:03,320 --> 00:00:05,960 or 00:00:03.320 --> 00:00:05.960
match = re.match(r'(\d+):(\d+):(\d+)[,\.](\d+)\s*-->\s*(\d+):(\d+):(\d+)[,\.](\d+)', time_line)
if match:
h1, m1, s1, ms1, h2, m2, s2, ms2 = map(int, match.groups())
start_secs = h1 * 3600 + m1 * 60 + s1 + ms1 / 1000.0
end_secs = h2 * 3600 + m2 * 60 + s2 + ms2 / 1000.0
duration = end_secs - start_secs
entries.append({
"text": text,
"start": start_secs,
"duration": duration
})
return entries
def parse_transcript_text(text: str) -> list:
text = text.strip()
if "-->" in text:
try:
entries = parse_srt(text)
if entries:
return entries
except Exception:
pass
# Fallback to plain text paragraph segmentation
paragraphs = [p.strip() for p in text.split('\n') if p.strip()]
entries = []
current_time = 0.0
for p in paragraphs:
words = len(p.split())
duration = max(3.0, min(15.0, words / 3.0))
entries.append({
"text": p,
"start": current_time,
"duration": duration
})
current_time += duration + 1.0
return entries
# ---------------------------------------------------------------------------
# Anchor index utilities (deterministic, no LLM)
# ---------------------------------------------------------------------------
# Anchor IDs use the format "anc_N" where N is the chronological index.
# validate_stage2_output relies on this naming to compare anchor ordering.
MIN_ANCHOR_DURATION = 3.0 # seconds
MAX_ANCHOR_DURATION = 10.0 # seconds
def build_anchor_index(transcript_entries: list) -> list:
"""Chunk raw transcript entries into sentence/clause-sized anchors.
Handles overlapping auto-caption timestamps by clamping each entry's
effective start to max(entry.start, previous_effective_end). Produces
a gap-free, non-overlapping, chronologically ordered list of anchors:
{anchor_id, start, end, text}
"""
if not transcript_entries:
return []
# Sort by start time and resolve overlaps.
sorted_entries = sorted(transcript_entries, key=lambda e: e["start"])
resolved = []
prev_end = 0.0
for entry in sorted_entries:
eff_start = max(float(entry["start"]), prev_end)
eff_end = eff_start + max(float(entry["duration"]), 0.001)
resolved.append({"text": entry["text"], "start": eff_start, "end": eff_end})
prev_end = eff_end
anchors = []
chunk_start = resolved[0]["start"]
chunk_texts = []
chunk_end = chunk_start
for entry in resolved:
proposed_end = entry["end"]
proposed_duration = proposed_end - chunk_start
# Commit current chunk if adding this entry would exceed the cap
# AND the chunk is already at minimum viable duration.
if chunk_texts and (chunk_end - chunk_start) >= MIN_ANCHOR_DURATION and proposed_duration > MAX_ANCHOR_DURATION:
anchors.append({
"anchor_id": f"anc_{len(anchors)}",
"start": chunk_start,
"end": chunk_end,
"text": " ".join(chunk_texts),
})
chunk_start = entry["start"]
chunk_texts = [entry["text"]]
chunk_end = entry["end"]
else:
chunk_texts.append(entry["text"])
chunk_end = entry["end"]
# Emit the final (possibly short) remainder chunk.
if chunk_texts:
anchors.append({
"anchor_id": f"anc_{len(anchors)}",
"start": chunk_start,
"end": chunk_end,
"text": " ".join(chunk_texts),
})
return anchors
def select_intro_anchors(anchors: list, window_seconds: float = INTRO_WINDOW_SECONDS) -> set:
"""Return the set of anchor_ids whose start falls within the stream's
"intro window" (the first `window_seconds` seconds of the video).
Pure and deterministic: anchor-list-in, anchor-id-set-out. No LLM calls,
no I/O. At the default 5-10s anchor size from build_anchor_index(), a
20s window typically covers the first 2-5 anchors.
Used by stage_2_generate_all_drafts() to bias early chat toward
arrival/meta chatter instead of fully-formed topic reactions, since real
viewers haven't processed any spoken content in the first ~20 seconds.
"""
return {a["anchor_id"] for a in anchors if float(a["start"]) < window_seconds}
def map_anchors_to_segments(anchors: list, segments: list) -> dict:
"""Assign each anchor to exactly one topic segment.
Assignment is based on each anchor's midpoint. If the midpoint falls
outside every segment (e.g. a coverage gap in Stage 1a output), the
anchor is assigned to the segment with the nearest midpoint.
Returns {segment_index: [anchor_id, ...]} for all segment indices.
"""
# Pre-seed every segment index with an empty list.
mapping = {i: [] for i in range(len(segments))}
for anchor in anchors:
midpoint = (anchor["start"] + anchor["end"]) / 2.0
# Find the segment whose range contains the midpoint.
assigned = None
for i, seg in enumerate(segments):
if float(seg["start"]) <= midpoint < float(seg["end"]):
assigned = i
break
# Fallback: nearest segment by midpoint distance.
if assigned is None:
def seg_mid(s):
return (float(s["start"]) + float(s["end"])) / 2.0
assigned = min(range(len(segments)), key=lambda i: abs(seg_mid(segments[i]) - midpoint))
mapping[assigned].append(anchor["anchor_id"])
return mapping
def validate_stage2_output(payload: list, valid_anchor_ids: set) -> list:
"""Validate the LLM's Stage 2 output against the temporal-alignment contract.
Returns a list of error strings (empty = valid). Does not raise.
Checks:
- Every message has id, username, text, anchor_id.
- anchor_id is in valid_anchor_ids.
- Message ids are unique across the entire payload.
- reply_to (if present) references a known message id.
- reply_to anchor index <= replier anchor index (no replying to the future).
- No cycles in the reply graph (same-anchor or cross-anchor).
"""
errors = []
all_messages = []
for seg in payload:
all_messages.extend(seg.get("messages", []))
# Build id -> message index and check for duplicates.
id_to_msg = {}
for msg in all_messages:
mid = msg.get("id")
if mid is None:
errors.append(f"Message missing 'id' field: {msg.get('text', '')[:40]!r}")
continue
if mid in id_to_msg:
errors.append(f"Duplicate message id: {mid!r}")
else:
id_to_msg[mid] = msg
def anchor_index(aid: str) -> int:
"""Parse integer index from 'anc_N' format; return large sentinel on failure."""
try:
return int(aid.split("_", 1)[1])
except (IndexError, ValueError):
return 10 ** 9
for msg in all_messages:
mid = msg.get("id")
aid = msg.get("anchor_id")
# Required fields.
if aid is None:
errors.append(f"Message {mid!r} missing 'anchor_id'")
elif aid not in valid_anchor_ids:
errors.append(f"Message {mid!r} references unknown anchor_id {aid!r}")
# reply_to validation.
reply_to = msg.get("reply_to")
if reply_to is not None:
if reply_to not in id_to_msg:
errors.append(f"Message {mid!r} has reply_to {reply_to!r} which does not exist")
elif aid is not None:
parent = id_to_msg[reply_to]
parent_aid = parent.get("anchor_id")
if parent_aid is not None and anchor_index(parent_aid) > anchor_index(aid):
errors.append(
f"Message {mid!r} (anchor {aid}) has reply_to {reply_to!r} "
f"(anchor {parent_aid}) which is later in the video"
)
# Cycle detection via DFS on the full reply graph.
reply_graph = {msg["id"]: msg.get("reply_to") for msg in all_messages if "id" in msg}
for start_id in reply_graph:
visited = set()
node = start_id
while node is not None:
if node in visited:
errors.append(f"Cycle detected in reply_to chain starting at {start_id!r}")
break
visited.add(node)
node = reply_graph.get(node) # None if no reply_to or id not in graph
return errors
def compute_display_times(messages: list, anchors_by_id: dict) -> list:
"""Assign a numeric displayTime to every message.
Root messages: anchor.start + uniform(1, min(8, anchor_duration)).
Replies: max(parent.displayTime + uniform(2, 12), own_anchor.start).
The jitter upper bound for roots is clamped to anchor_duration so a
reaction can't land past its anchor's end time.
Processes in topological order (roots first). Malformed cycles or
permanently-unresolvable replies are demoted to root status after a
bounded number of passes.
Returns a new list of message dicts with 'displayTime' added.
"""
# Work on shallow copies to avoid mutating the input.
result = [dict(m) for m in messages]
resolved = {} # id -> displayTime
max_passes = len(result) + 1 # bound iteration to prevent infinite loops
remaining = list(result)
for _ in range(max_passes):
if not remaining:
break
still_waiting = []
for msg in remaining:
mid = msg.get("id")
anchor = anchors_by_id.get(msg.get("anchor_id", ""))
if anchor is None:
# Unknown anchor β assign a safe fallback of 0.
msg["displayTime"] = 0.0
if mid:
resolved[mid] = 0.0
continue
reply_to = msg.get("reply_to")
if reply_to and reply_to not in resolved:
# Parent not yet placed; come back to this message.
still_waiting.append(msg)
continue
anchor_dur = anchor["end"] - anchor["start"]
if reply_to and reply_to in resolved:
jitter = random.uniform(2, 12)
dt = max(resolved[reply_to] + jitter, anchor["start"])
else:
# Root message β jitter capped to anchor duration.
hi = min(8.0, anchor_dur)
if anchor.get("anchor_id") == "anc_0":
jitter = random.uniform(0.5, max(1.0, hi))
else:
jitter = random.uniform(2.0, max(2.0, hi))
dt = anchor["start"] + jitter
msg["displayTime"] = dt
if mid:
resolved[mid] = dt
remaining = still_waiting
# Any messages still unresolved (cycles) get demoted to root placement.
for msg in remaining:
anchor = anchors_by_id.get(msg.get("anchor_id", ""))
if anchor:
anchor_dur = anchor["end"] - anchor["start"]
hi = min(8.0, anchor_dur)
msg["displayTime"] = anchor["start"] + random.uniform(1, max(1.0, hi))
else:
msg["displayTime"] = 0.0
print(f"WARNING: compute_display_times: demoted {msg.get('id')!r} to root (unresolvable reply chain)")
return result
# --- STAGE 1a: Segment transcript ---
def stage_1a_segment_transcript(transcript_text: str, token: str) -> list:
system_prompt = (
"You are an AI assistant that segments a video transcript into logical topic segments. "
"You are given a transcript formatted with timestamps.\n"
"Your task is to group consecutive lines into segments of about 30 to 60 seconds (but align with natural topic boundaries).\n"
"For each segment, output:\n"
"- start: the start time in seconds (float or int)\n"
"- end: the end time in seconds (float or int)\n"
"- text: the concatenated text in this segment\n\n"
"Return ONLY a JSON list of segments. Do not include markdown wraps or conversational text outside the JSON."
)
user_prompt = f"Transcript:\n{transcript_text}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
content = call_hf_router(FLASH_MODEL, messages, token)
cleaned = clean_json_text(content)
return json.loads(cleaned)
# --- STAGE 2: Generate all draft comments (Pro model) ---
def stage_2_generate_all_drafts(segments: list, doc_text: str, token: str, anchor_map: dict = None,
all_anchors: list = None, intro_anchor_ids: set = None,
video_metadata: dict = None) -> list:
"""Generate draft chat messages for all segments.
anchor_map: {seg_idx: [anchor_id, ...]} from map_anchors_to_segments.
all_anchors: full anchor list from build_anchor_index (for text lookup).
When both are provided, each segment's anchor list is injected into the
prompt so the model can pick a concrete timestamped moment per message.
intro_anchor_ids: set of anchor_ids from select_intro_anchors(), i.e.
anchors within the stream's first ~20s. These are tagged [INTRO] in the
anchor lists and given special handling instructions (see
INTRO-WINDOW HANDLING below).
video_metadata: optional {"title": ..., "author_name": ...} from
fetch_video_metadata(). When present with a non-empty title/channel, the
model may (optionally) generate up to 1-2 identity-aware intro messages
if the speaker's identity is unambiguous; otherwise it must not guess.
"""
# Build anchor lookup if available.
anchors_by_id = {a["anchor_id"]: a for a in (all_anchors or [])}
intro_anchor_ids = intro_anchor_ids or set()
system_prompt = (
"You are simulating audience chat reactions for a livestream of an educational or historical video.\n"
"You are given:\n"
"1. A reference document.\n"
"2. A chronological list of video segments. Each segment lists its ANCHOR MOMENTS: timestamped transcript"
" chunks at the sentence/clause level.\n\n"
"Your task is to generate 8 to 15 draft chat messages from different users reacting to EACH video segment.\n\n"
"Crucially, you must follow these steps:\n"
"1. Identify the conceptual relationships (e.g., direct validations, unintentional contradictions, theoretical bridges) between the video and the document.\n"
"2. Frame reactions to each caption segment by drawing on the identified conceptual relationships without direct reference or quotes. AVOID acadmeic jargon.\n"
"3. Generate chat messages for each segment. Every message MUST be anchored to a specific moment from that segment's anchor list.\n\n"
"QUOTAS AND PERSONAS (STRICTLY ENFORCED):\n"
"- Include at least 50% on-topic contributions reflecting agreement, disagreement, jokes, observations, predictions, corrections, hype and skepticism to complement occasional off-topic remarks, each with an anchor_id.\n"
"- Ensure diverse, non-repetitive usernames across the entire video. Do not use the same usernames repeatedly for the same types of comments.\n"
"- Maintain diverse livestream audience personas: some are experts reading deeply into philosophical tension, some take things entirely at face value, some only react emotionally or to video aesthetics, some use sarcasm/memes.\n\n"
"REPLY THREADING (optional):\n"
"- A message MAY include a 'reply_to' field containing another message's 'id' from the same or immediately preceding anchor.\n"
"- A reply must NEVER reference a message anchored to a later anchor than itself.\n"
)
if intro_anchor_ids:
title = (video_metadata or {}).get("title", "").strip()
author = (video_metadata or {}).get("author_name", "").strip()
if title or author:
channel_part = f" from channel \"{author}\"" if author else ""
identity_clause = (
f"- IDENTITY-AWARE INTRO MESSAGES (OPTIONAL): The video title is \"{title or 'unknown'}\"{channel_part}. "
"If β and only if β this title, channel, or the reference document make the speaker's "
"identity or significance unambiguous, you MAY include up to 1-2 [INTRO]-anchored "
"messages reacting to who the speaker is or why they matter (e.g. recognizing a "
"well-known figure). Do not guess or assert identity beyond what these sources support.\n"
)
else:
identity_clause = (
"- IDENTITY-AWARE INTRO MESSAGES: No reliable video title/channel metadata is available. "
"Do NOT include any [INTRO]-anchored messages that guess or assert who the speaker is β "
"keep all [INTRO] messages generic arrival/meta chatter.\n"
)
system_prompt += (
"\nINTRO-WINDOW HANDLING:\n"
"- Anchors marked [INTRO] in the ANCHOR MOMENTS lists below fall within the stream's first "
"~20 seconds. Real viewers haven't processed any spoken content yet at this point β they're "
"still arriving, reading the title, or reacting to who/what the speaker is sharing.\n"
"- For messages anchored to an [INTRO] anchor, 80-90% should be arrival/meta chatter NOT engagement "
"with the spoken content. This OVERRIDES the normal off-topic quota for these messages.\n"
"- Set '_internal_logic' to \"None\" for any segment whose messages are dominated by "
"[INTRO] anchors, regardless of any document tie.\n"
f"{identity_clause}"
)
system_prompt += (
"\nFORMAT: Return a JSON list of objects, one per segment:\n"
"{\n"
" \"_internal_logic\": \"How this segment relates to document sub-claims, or 'None' if off-topic.\",\n"
" \"messages\": [\n"
" {\n"
" \"id\": \"<unique string, e.g. m1>\",\n"
" \"username\": \"<username>\",\n"
" \"text\": \"<message text>\",\n"
" \"anchor_id\": \"<anchor_id from this segment's anchor list>\",\n"
" \"reply_to\": \"<optional: another message's id>\"\n"
" }\n"
" ]\n"
"}\n\n"
"CRITICAL: Do not include 'timestamp'. Do not omit 'anchor_id' or 'id'. "
"Return ONLY a valid JSON list. Do not include markdown wraps or other text."
)
def format_segment(i, seg):
seg_header = f"Segment {i} ({seg['start']}s - {seg['end']}s):\n{seg['text']}"
if anchor_map and all_anchors:
anchor_ids = anchor_map.get(i, [])
anchor_lines = []
for aid in anchor_ids:
a = anchors_by_id.get(aid)
if a:
intro_tag = " [INTRO]" if aid in intro_anchor_ids else ""
anchor_lines.append(f" [{aid}]{intro_tag} {a['start']:.1f}s: {a['text'][:80]}")
if anchor_lines:
seg_header += "\nANCHOR MOMENTS (pick one per message):\n" + "\n".join(anchor_lines)
return seg_header
segments_text = "\n\n".join([format_segment(i, seg) for i, seg in enumerate(segments)])
user_prompt = (
f"Reference Document:\n{doc_text}\n\n"
f"Video Segments:\n{segments_text}\n\n"
"Generate the JSON list of draft comments for all segments."
)
llm_messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
content = call_hf_router(PRO_MODEL, llm_messages, token)
cleaned = clean_json_text(content)
try:
data = json.loads(cleaned)
return data
except Exception as e:
print(f"Failed to parse JSON from Stage 2. Raw content: {content}")
raise e
# --- STAGE 3: Style and pacing (Flash model) ---
def stage_3_stylize_segment(draft_data: dict, token: str) -> dict:
system_prompt = (
"You are a style polisher for livestream chat replays (YouTube/Twitch).\n"
"Your job is to take raw draft chat messages and perform a final flourish and alignment pass to make them sound authentic.\n\n"
"CRITICAL INSTRUCTIONS:\n"
"1. PRESERVE DIVERSITY: The draft already contains carefully balanced personas (jokers, experts, off-topic, skeptics). DO NOT homogenize them. If a message is off-topic, keep it off-topic. If it's a joke about the video, keep it a joke.\n"
"2. PRESERVE USERNAMES: You MUST use the exact usernames provided in the draft. Do not invent new ones.\n"
"3. PRESERVE STRUCTURAL FIELDS: Keep 'id', 'anchor_id', and 'reply_to' (if present) on every message exactly as given. Do not rename, remove, or alter these fields.\n"
"4. ADD FLOURISH: Make them short, concise, and lively. Occasionally inject internet slang and standard emotes where appropriate, but don't overdo it.\n"
"5. Avoid sounding like AI-generated summaries. Do not append emotes to every single message.\n"
"6. SAFETY PASS: If any message crosses from edgy/sarcastic banter into harassment, slurs, hate speech, or denigration of real people or groups, rewrite it to keep the same persona and sentiment but remove the harmful element.\n\n"
"Return ONLY the updated JSON with the exact same structure. Do not include markdown wraps."
)
user_prompt = f"Draft JSON:\n{json.dumps(draft_data)}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
content = call_hf_router(FLASH_MODEL, messages, token)
cleaned = clean_json_text(content)
return json.loads(cleaned)
# --- Safety gates ---
def check_content_safety(doc_text: str, video_metadata: dict, token: str) -> tuple:
"""Pre-pipeline hard-stop gate (runs before Stage 2's expensive Pro call).
Classifies whether the reference document β combined with whatever video
title/channel context is available β is appropriate source material for
a simulated livestream chat. Returns (is_safe: bool, reason: str).
Intentionally coarse: a single Flash-model classification call, not a
per-message filter (see final_safety_scan() for the post-generation
pass). Exists to hard-stop on inputs whose "natural" simulated chat would
likely be hate speech, harassment of real people/groups, glorification of
or instructions for violence/self-harm, or sexual content involving
minors. General controversial-but-legitimate material (politics, science
controversies, history of atrocities discussed academically, etc.) is
SAFE and should pass.
Design note: fails OPEN (treats classifier errors as "safe") so a
transient API hiccup doesn't block legitimate users. The decision (or
failure) is always logged.
"""
title = (video_metadata or {}).get("title", "").strip()
author = (video_metadata or {}).get("author_name", "").strip()
system_prompt = (
"You are a content-safety gate for an app that generates SIMULATED livestream chat "
"reactions to educational/informational videos, based on a reference document.\n"
"Given a video's title/channel (if known) and an excerpt of the reference document, "
"decide whether this is appropriate source material for that simulation.\n\n"
"Mark unsafe ONLY if the natural simulated chat reactions to this material would likely "
"include hate speech, harassment of real people or groups, glorification of or "
"instructions for violence or self-harm, or sexual content involving minors. General "
"controversial-but-legitimate topics (politics, policy debates, science controversies, "
"history of atrocities discussed academically, etc.) are SAFE.\n\n"
"Respond with ONLY a JSON object: {\"safe\": true|false, \"reason\": \"<one short sentence>\"}. "
"Do not include markdown wraps or other text."
)
excerpt = (doc_text or "")[:4000]
user_prompt = (
f"Video title: {title or '(unknown)'}\n"
f"Channel: {author or '(unknown)'}\n\n"
f"Reference document excerpt:\n{excerpt}"
)
try:
content = call_hf_router(SAFETY_MODEL, [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
], token)
result = json.loads(clean_json_text(content))
is_safe = bool(result.get("safe", True))
reason = result.get("reason", "")
if not is_safe:
print(f"Content safety gate REJECTED input: {reason}")
return is_safe, reason
except Exception as e:
print(f"Content safety gate failed to run ({e}); failing open (treating as safe).")
return True, ""
def final_safety_scan(messages: list, token: str) -> list:
"""Aggregate post-generation moderation pass over all flattened,
stylized messages.
Stage 3 already asks the Flash model to soften individual messages (see
its SAFETY PASS instruction), but that runs per-segment and can miss
patterns only visible across the whole chat (e.g. repeated harassment of
the same target spread across segments). This is one additional Flash
call over the full message list as a second net.
For each message, the model returns one of:
- "keep" -> message is left unchanged.
- "replace" -> message['text'] is swapped for the supplied 'replacement'
(same persona/sentiment, harmful element removed).
- "drop" -> message is removed from the output entirely.
Fails OPEN (returns `messages` unchanged) on any classifier or parsing
error, since this is a best-effort secondary net rather than the primary
gate (see check_content_safety for the pre-pipeline hard stop).
"""
if not messages:
return messages
system_prompt = (
"You are a final moderation pass for SIMULATED livestream chat messages (fictional "
"audience reactions, not real users).\n"
"For EVERY message below, decide one action:\n"
"- \"keep\": fine as-is. Edgy/sarcastic humor, strong opinions, and in-group banter are fine.\n"
"- \"replace\": the message crosses into harassment, slurs, hate speech, or denigration of "
"real people or groups. Provide a 'replacement' string with the same persona/sentiment but "
"with the harmful element removed.\n"
"- \"drop\": the message is irredeemable and should be removed entirely.\n\n"
"Return ONLY a JSON list, one entry per input message, in any order:\n"
"[{\"id\": \"<id>\", \"action\": \"keep|replace|drop\", \"replacement\": \"<only if action=replace>\"}]\n"
"Do not include markdown wraps or other text."
)
payload = [{"id": m.get("id"), "text": m.get("text", "")} for m in messages]
user_prompt = f"Messages:\n{json.dumps(payload, ensure_ascii=False)}"
try:
content = call_hf_router(SAFETY_MODEL, [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
], token)
decisions = json.loads(clean_json_text(content))
decisions_by_id = {d.get("id"): d for d in decisions if isinstance(d, dict)}
except Exception as e:
print(f"Final safety scan failed to run ({e}); failing open (returning messages unchanged).")
return messages
result = []
dropped, replaced = 0, 0
for msg in messages:
decision = decisions_by_id.get(msg.get("id"))
action = (decision or {}).get("action", "keep")
if action == "drop":
dropped += 1
continue
if action == "replace" and decision.get("replacement"):
msg = dict(msg)
msg["text"] = decision["replacement"]
replaced += 1
result.append(msg)
if dropped or replaced:
print(f"Final safety scan: dropped {dropped}, replaced {replaced} of {len(messages)} messages.")
return result
# --- Parallel Tasks ---
def _fetch_and_segment_transcript(video_id: str, transcript_text: str, token: str) -> tuple:
"""Return (segments, raw_transcript) so the caller can build the anchor index."""
if not transcript_text:
raise ValueError("Transcript text is required but was not provided.")
print("Parsing provided transcript text...")
raw_transcript = parse_transcript_text(transcript_text)
transcript_text_formatted = format_transcript_lines(raw_transcript)
print("Stage 1a: Segmenting transcript...")
segments = stage_1a_segment_transcript(transcript_text_formatted, token)
return segments, raw_transcript
def _extract_document_text(doc_text: str, doc_path: str, use_ocr: bool = False) -> str:
if doc_path:
print(f"Extracting text from PDF: {doc_path}...")
return pymupdf4llm.to_markdown(doc_path, use_ocr=use_ocr)
if doc_text:
return doc_text
raise ValueError("Either doc_text or doc_path must be provided.")
# --- Full Pipeline Orchestration ---
def run_livestream_pipeline(video_id: str, doc_text: str = None, doc_path: str = None, transcript_text: str = None, token: str = None, use_ocr: bool = False) -> list:
"""Run the full pipeline and return a flat list of messages sorted by displayTime.
Return value shape changed from list[segment_dict] to list[message_dict].
Each message has: id, username, text, anchor_id, displayTime,
and optionally reply_to.
"""
if not token:
token = os.environ.get("HF_TOKEN", DEFAULT_HF_TOKEN)
if not token:
raise ValueError(
"Hugging Face API Token not found. Please set the 'HF_TOKEN' secret in your Space settings "
"or provide it in the input box."
)
print("Starting parallel execution of Document Extraction, Transcript Segmentation, and Metadata Fetch...")
with ThreadPoolExecutor(max_workers=3) as executor:
fut_doc = executor.submit(_extract_document_text, doc_text, doc_path, use_ocr)
fut_seg = executor.submit(_fetch_and_segment_transcript, video_id, transcript_text, token)
fut_meta = executor.submit(fetch_video_metadata, video_id)
extracted_doc_text = fut_doc.result()
segments, raw_transcript = fut_seg.result()
video_metadata = fut_meta.result()
print(f"Segmented into {len(segments)} blocks.")
if video_metadata:
print(f"Video metadata: title={video_metadata.get('title')!r}, channel={video_metadata.get('author_name')!r}")
else:
print("Video metadata unavailable (oEmbed fetch failed) β identity-aware intro messages disabled.")
# Build anchor index deterministically from raw transcript (no LLM call).
anchors = build_anchor_index(raw_transcript)
anchor_map = map_anchors_to_segments(anchors, segments)
anchors_by_id = {a["anchor_id"]: a for a in anchors}
valid_anchor_ids = set(anchors_by_id.keys())
intro_anchor_ids = select_intro_anchors(anchors)
print(f"Built anchor index: {len(anchors)} anchors ({len(intro_anchor_ids)} in the intro window).")
# Hard stop: gate on content safety before the expensive Stage 2/3 calls.
is_safe, reason = check_content_safety(extracted_doc_text, video_metadata, token)
if not is_safe:
raise ValueError(
f"Content safety check failed: {reason} "
"This input was flagged as unsuitable for simulated chat generation. "
"Please choose a different reference document or video."
)
# Stage 2: Single Pro model call for all drafting.
print("Stage 2: Generating draft comments for all segments (Pro model)...")
draft_segments = stage_2_generate_all_drafts(
segments, extracted_doc_text, token,
anchor_map=anchor_map, all_anchors=anchors,
intro_anchor_ids=intro_anchor_ids, video_metadata=video_metadata
)
# Validate Stage 2 output before passing downstream.
validation_errors = validate_stage2_output(draft_segments, valid_anchor_ids)
if validation_errors:
# Log and raise β malformed anchor/reply data must not flow into Stage 3.
error_summary = "\n".join(validation_errors[:10])
raise ValueError(f"Stage 2 output failed validation ({len(validation_errors)} error(s)):\n{error_summary}")
# Stage 3: Parallel stylization.
print("Stage 3: Stylizing comments...")
final_segments = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(stage_3_stylize_segment, draft, token)
for draft in draft_segments
]
for fut in futures:
final_segments.append(fut.result())
# Flatten all messages.
all_messages = []
for seg in final_segments:
all_messages.extend(seg.get("messages", []))
# Final aggregate moderation pass (second net, after Stage 3's per-segment pass).
print("Running final safety scan over all messages...")
all_messages = final_safety_scan(all_messages, token)
# Compute displayTime, sort numerically.
all_messages = compute_display_times(all_messages, anchors_by_id)
all_messages.sort(key=lambda m: m.get("displayTime", 0))
return all_messages
|