""" Office transcript labeler — DeepSeek V4 Flash (deepseek-chat). Cost math (all 4055 candidate scenes): ~300 tokens input + ~180 tokens output per scene = ~$0.32 total at deepseek-v4-flash pricing ($0.14/M in, $0.28/M out) Concurrency: 50 simultaneous requests (limit is 2500 — zero 429 risk). Speed: ~50 scenes/wave × ~2s/wave = all 4055 done in ~3 minutes. Usage: uv run python scripts/build_transcripts.py Resume-safe: already-labeled scenes are skipped on re-run. """ import asyncio import json import os import random import re import subprocess import sys from collections import defaultdict from pathlib import Path from datasets import load_dataset from loguru import logger from openai import AsyncOpenAI # ── Config ───────────────────────────────────────────────────────────────────── DEEPSEEK_BASE = "https://api.deepseek.com" MODEL = "deepseek-chat" # = deepseek-v4-flash: $0.14/M in, $0.28/M out, 2500 concurrency CONCURRENCY = 50 # Safe: limit is 2500 TARGET_SCENES = 4055 # Label the whole candidate pool (~$0.32 total) OUTPUT_PATH = "data/transcripts/office_generated.txt" VALID_ARCHETYPES = { "status_assertion", "self_delusion", "power_inversion", "anxiety_escalation", "social_fail", "misplaced_conf", } VALID_TENSIONS = { "social_embarrass", "existential", "status_threat", "identity_expose", "logic_collapse", } VALID_DISTANCES = {"mild", "moderate", "sharp"} VALID_REGISTERS = {"resigned", "gleeful", "deadpan", "panicked", "oblivious", "indignant"} COMEDY_CHARACTERS = { "Michael", "Dwight", "Jim", "Pam", "Ryan", "Andy", "Kevin", "Angela", "Oscar", "Phyllis", "Stanley", "Creed", "Meredith", "Kelly", "Toby", "Jan", "Darryl", "Gabe", "Robert", } LABEL_PROMPT = """\ You are labeling a scene from The Office (US) for a comedy retrieval engine. SCENE (Season {season}, Episode {episode}): {scene_text} Identify the single FUNNIEST line and return ONLY this JSON (no markdown): {{ "character": "speaker of the funniest line", "setup": "one sentence: situation that makes the line funny (no spoilers)", "response": "exact verbatim funniest line from the scene", "archetype": "status_assertion | self_delusion | power_inversion | anxiety_escalation | social_fail | misplaced_conf", "tension_type": "social_embarrass | existential | status_threat | identity_expose | logic_collapse", "violation_distance": "mild | moderate | sharp", "why_it_works": "one sentence: what expectation is violated and why it lands", "emotional_register": "resigned | gleeful | deadpan | panicked | oblivious | indignant" }} If no genuinely funny line exists, return {{"skip": true}}""" # ── Dataset ───────────────────────────────────────────────────────────────────── def load_office_lines(): logger.info("Loading jxm/the_office_lines from HuggingFace...") ds = load_dataset("jxm/the_office_lines", split="train") logger.info(f"Loaded {len(ds)} lines") return ds def group_into_scenes(ds) -> list[dict]: scene_map = defaultdict(list) for row in ds: if row.get("deleted"): continue key = (row["season"], row["episode"], row["scene"]) scene_map[key].append(row) candidates = [] for (season, episode, scene_num), lines in scene_map.items(): speakers = {l["speaker"] for l in lines} if not (speakers & COMEDY_CHARACTERS): continue if not (3 <= len(lines) <= 15): continue if not any(len(l["line_text"].split()) > 7 for l in lines): continue candidates.append({ "season": season, "episode": episode, "scene": scene_num, "lines": lines, }) logger.info(f"Candidate scenes: {len(candidates)}") return candidates def format_scene(lines: list[dict]) -> str: return "\n".join( f"{l['speaker']}: {l['line_text'].strip()}" for l in lines if l["line_text"].strip() ) def load_already_done(path: str) -> set: done = set() p = Path(path) if not p.exists(): return done for line in p.read_text().splitlines(): m = re.match(r"# S(\d+)E(\d+)Scene(\d+)", line) if m: done.add((int(m.group(1)), int(m.group(2)), int(m.group(3)))) return done def to_pipe(scene: dict, label: dict) -> str: reg = label.get("emotional_register", "deadpan") if reg not in VALID_REGISTERS: reg = "deadpan" return ( f"The Office|{label['character']}|{label['setup']}|{label['response']}" f"|{label['archetype']}|{label['tension_type']}|{label['violation_distance']}" f"|{label['why_it_works']}|{reg}" ) # ── API ───────────────────────────────────────────────────────────────────────── def get_api_key() -> str: for var in ["DEEPSEEK_API_KEY", "DEEPSEEK_KEY"]: key = os.environ.get(var) if key: return key # Fallback: extract from ~/.zshrc for var in ["DEEPSEEK_API_KEY", "DEEPSEEK_KEY"]: try: r = subprocess.run( ["bash", "-c", f"source ~/.zshrc 2>/dev/null; echo ${var}"], capture_output=True, text=True, timeout=5, ) val = r.stdout.strip() if val and not val.startswith("$"): logger.info(f"Loaded {var} from ~/.zshrc") return val except Exception: pass logger.error("No DEEPSEEK_API_KEY or DEEPSEEK_KEY found in env or ~/.zshrc") sys.exit(1) def parse_label(raw: str) -> dict | None: cleaned = re.sub(r"```(?:json)?|```", "", raw).strip() m = re.search(r"\{.*\}", cleaned, re.DOTALL) if not m: return None try: d = json.loads(m.group()) if d.get("skip"): return None if d.get("archetype") not in VALID_ARCHETYPES: return None if d.get("tension_type") not in VALID_TENSIONS: return None if d.get("violation_distance") not in VALID_DISTANCES: return None for f in ["character", "setup", "response", "why_it_works"]: if not d.get(f): return None d[f] = d[f].replace("|", " ").strip() d["emotional_register"] = d.get("emotional_register", "deadpan").replace("|", " ").strip() return d except (json.JSONDecodeError, KeyError): return None async def label_one( client: AsyncOpenAI, scene: dict, sem: asyncio.Semaphore, n_done: list, # mutable counter [int] n_total: int, out_f, lock: asyncio.Lock, ) -> None: prompt = LABEL_PROMPT.format( season=scene["season"], episode=scene["episode"], scene_text=format_scene(scene["lines"]), ) try: async with sem: resp = await client.chat.completions.create( model=MODEL, messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=380, ) label = parse_label(resp.choices[0].message.content or "") except Exception as e: logger.warning(f"S{scene['season']}E{scene['episode']}: {str(e)[:60]}") return if label is None: return sid = (scene["season"], scene["episode"], scene["scene"]) line = f"# S{sid[0]}E{sid[1]}Scene{sid[2]}\n{to_pipe(scene, label)}\n" async with lock: out_f.write(line) out_f.flush() n_done[0] += 1 if n_done[0] % 50 == 0 or n_done[0] <= 5: logger.info( f"[{n_done[0]}/{n_total}] S{sid[0]}E{sid[1]} " f"→ {label['archetype']} | {label['character']}: {label['response'][:55]}..." ) async def main_async(): Path(OUTPUT_PATH).parent.mkdir(parents=True, exist_ok=True) ds = load_office_lines() candidates = group_into_scenes(ds) already_done = load_already_done(OUTPUT_PATH) logger.info(f"Already labeled: {len(already_done)}") todo = [ c for c in candidates if (c["season"], c["episode"], c["scene"]) not in already_done ] random.seed(42) random.shuffle(todo) todo = todo[:TARGET_SCENES] if not todo: logger.success("Nothing to do — all scenes already labeled.") return # Cost estimate n = len(todo) est_cost = n * (300 * 0.14 + 180 * 0.28) / 1_000_000 est_min = round(n / CONCURRENCY * 2 / 60, 1) # ~2s per wave logger.info( f"Labeling {n} scenes | est. cost ≈ ${est_cost:.3f} | " f"est. time ≈ {est_min} min @ concurrency={CONCURRENCY}" ) api_key = get_api_key() client = AsyncOpenAI(api_key=api_key, base_url=DEEPSEEK_BASE) # Quick connectivity check try: test = await client.chat.completions.create( model=MODEL, messages=[{"role": "user", "content": "Say OK"}], temperature=0.1, max_tokens=5, ) logger.success(f"API OK — {test.choices[0].message.content!r}") except Exception as e: logger.error(f"API failed: {e}") sys.exit(1) sem = asyncio.Semaphore(CONCURRENCY) lock = asyncio.Lock() n_done = [0] with open(OUTPUT_PATH, "a", encoding="utf-8") as out_f: tasks = [ label_one(client, scene, sem, n_done, n, out_f, lock) for scene in todo ] await asyncio.gather(*tasks) total = len(already_done) + n_done[0] logger.success(f"Done. Labeled={n_done[0]}, total in file={total}") if __name__ == "__main__": asyncio.run(main_async())