WitGym / scripts /build_transcripts.py
Akshay Babbar
interim commit-v2
f1cf4a9
Raw
History Blame Contribute Delete
10 kB
"""
Office transcript labeler — DeepSeek V4 Flash (deepseek-chat).
Cost math (all 4055 candidate scenes):
~300 tokens input + ~180 tokens output per scene
= ~$0.32 total at deepseek-v4-flash pricing ($0.14/M in, $0.28/M out)
Concurrency: 50 simultaneous requests (limit is 2500 — zero 429 risk).
Speed: ~50 scenes/wave × ~2s/wave = all 4055 done in ~3 minutes.
Usage:
uv run python scripts/build_transcripts.py
Resume-safe: already-labeled scenes are skipped on re-run.
"""
import asyncio
import json
import os
import random
import re
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
from datasets import load_dataset
from loguru import logger
from openai import AsyncOpenAI
# ── Config ─────────────────────────────────────────────────────────────────────
DEEPSEEK_BASE = "https://api.deepseek.com"
MODEL = "deepseek-chat" # = deepseek-v4-flash: $0.14/M in, $0.28/M out, 2500 concurrency
CONCURRENCY = 50 # Safe: limit is 2500
TARGET_SCENES = 4055 # Label the whole candidate pool (~$0.32 total)
OUTPUT_PATH = "data/transcripts/office_generated.txt"
VALID_ARCHETYPES = {
"status_assertion", "self_delusion", "power_inversion",
"anxiety_escalation", "social_fail", "misplaced_conf",
}
VALID_TENSIONS = {
"social_embarrass", "existential", "status_threat",
"identity_expose", "logic_collapse",
}
VALID_DISTANCES = {"mild", "moderate", "sharp"}
VALID_REGISTERS = {"resigned", "gleeful", "deadpan", "panicked", "oblivious", "indignant"}
COMEDY_CHARACTERS = {
"Michael", "Dwight", "Jim", "Pam", "Ryan", "Andy", "Kevin",
"Angela", "Oscar", "Phyllis", "Stanley", "Creed", "Meredith",
"Kelly", "Toby", "Jan", "Darryl", "Gabe", "Robert",
}
LABEL_PROMPT = """\
You are labeling a scene from The Office (US) for a comedy retrieval engine.
SCENE (Season {season}, Episode {episode}):
{scene_text}
Identify the single FUNNIEST line and return ONLY this JSON (no markdown):
{{
"character": "speaker of the funniest line",
"setup": "one sentence: situation that makes the line funny (no spoilers)",
"response": "exact verbatim funniest line from the scene",
"archetype": "status_assertion | self_delusion | power_inversion | anxiety_escalation | social_fail | misplaced_conf",
"tension_type": "social_embarrass | existential | status_threat | identity_expose | logic_collapse",
"violation_distance": "mild | moderate | sharp",
"why_it_works": "one sentence: what expectation is violated and why it lands",
"emotional_register": "resigned | gleeful | deadpan | panicked | oblivious | indignant"
}}
If no genuinely funny line exists, return {{"skip": true}}"""
# ── Dataset ─────────────────────────────────────────────────────────────────────
def load_office_lines():
logger.info("Loading jxm/the_office_lines from HuggingFace...")
ds = load_dataset("jxm/the_office_lines", split="train")
logger.info(f"Loaded {len(ds)} lines")
return ds
def group_into_scenes(ds) -> list[dict]:
scene_map = defaultdict(list)
for row in ds:
if row.get("deleted"):
continue
key = (row["season"], row["episode"], row["scene"])
scene_map[key].append(row)
candidates = []
for (season, episode, scene_num), lines in scene_map.items():
speakers = {l["speaker"] for l in lines}
if not (speakers & COMEDY_CHARACTERS):
continue
if not (3 <= len(lines) <= 15):
continue
if not any(len(l["line_text"].split()) > 7 for l in lines):
continue
candidates.append({
"season": season, "episode": episode,
"scene": scene_num, "lines": lines,
})
logger.info(f"Candidate scenes: {len(candidates)}")
return candidates
def format_scene(lines: list[dict]) -> str:
return "\n".join(
f"{l['speaker']}: {l['line_text'].strip()}"
for l in lines if l["line_text"].strip()
)
def load_already_done(path: str) -> set:
done = set()
p = Path(path)
if not p.exists():
return done
for line in p.read_text().splitlines():
m = re.match(r"# S(\d+)E(\d+)Scene(\d+)", line)
if m:
done.add((int(m.group(1)), int(m.group(2)), int(m.group(3))))
return done
def to_pipe(scene: dict, label: dict) -> str:
reg = label.get("emotional_register", "deadpan")
if reg not in VALID_REGISTERS:
reg = "deadpan"
return (
f"The Office|{label['character']}|{label['setup']}|{label['response']}"
f"|{label['archetype']}|{label['tension_type']}|{label['violation_distance']}"
f"|{label['why_it_works']}|{reg}"
)
# ── API ─────────────────────────────────────────────────────────────────────────
def get_api_key() -> str:
for var in ["DEEPSEEK_API_KEY", "DEEPSEEK_KEY"]:
key = os.environ.get(var)
if key:
return key
# Fallback: extract from ~/.zshrc
for var in ["DEEPSEEK_API_KEY", "DEEPSEEK_KEY"]:
try:
r = subprocess.run(
["bash", "-c", f"source ~/.zshrc 2>/dev/null; echo ${var}"],
capture_output=True, text=True, timeout=5,
)
val = r.stdout.strip()
if val and not val.startswith("$"):
logger.info(f"Loaded {var} from ~/.zshrc")
return val
except Exception:
pass
logger.error("No DEEPSEEK_API_KEY or DEEPSEEK_KEY found in env or ~/.zshrc")
sys.exit(1)
def parse_label(raw: str) -> dict | None:
cleaned = re.sub(r"```(?:json)?|```", "", raw).strip()
m = re.search(r"\{.*\}", cleaned, re.DOTALL)
if not m:
return None
try:
d = json.loads(m.group())
if d.get("skip"):
return None
if d.get("archetype") not in VALID_ARCHETYPES:
return None
if d.get("tension_type") not in VALID_TENSIONS:
return None
if d.get("violation_distance") not in VALID_DISTANCES:
return None
for f in ["character", "setup", "response", "why_it_works"]:
if not d.get(f):
return None
d[f] = d[f].replace("|", " ").strip()
d["emotional_register"] = d.get("emotional_register", "deadpan").replace("|", " ").strip()
return d
except (json.JSONDecodeError, KeyError):
return None
async def label_one(
client: AsyncOpenAI,
scene: dict,
sem: asyncio.Semaphore,
n_done: list, # mutable counter [int]
n_total: int,
out_f,
lock: asyncio.Lock,
) -> None:
prompt = LABEL_PROMPT.format(
season=scene["season"],
episode=scene["episode"],
scene_text=format_scene(scene["lines"]),
)
try:
async with sem:
resp = await client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=380,
)
label = parse_label(resp.choices[0].message.content or "")
except Exception as e:
logger.warning(f"S{scene['season']}E{scene['episode']}: {str(e)[:60]}")
return
if label is None:
return
sid = (scene["season"], scene["episode"], scene["scene"])
line = f"# S{sid[0]}E{sid[1]}Scene{sid[2]}\n{to_pipe(scene, label)}\n"
async with lock:
out_f.write(line)
out_f.flush()
n_done[0] += 1
if n_done[0] % 50 == 0 or n_done[0] <= 5:
logger.info(
f"[{n_done[0]}/{n_total}] S{sid[0]}E{sid[1]} "
f"→ {label['archetype']} | {label['character']}: {label['response'][:55]}..."
)
async def main_async():
Path(OUTPUT_PATH).parent.mkdir(parents=True, exist_ok=True)
ds = load_office_lines()
candidates = group_into_scenes(ds)
already_done = load_already_done(OUTPUT_PATH)
logger.info(f"Already labeled: {len(already_done)}")
todo = [
c for c in candidates
if (c["season"], c["episode"], c["scene"]) not in already_done
]
random.seed(42)
random.shuffle(todo)
todo = todo[:TARGET_SCENES]
if not todo:
logger.success("Nothing to do — all scenes already labeled.")
return
# Cost estimate
n = len(todo)
est_cost = n * (300 * 0.14 + 180 * 0.28) / 1_000_000
est_min = round(n / CONCURRENCY * 2 / 60, 1) # ~2s per wave
logger.info(
f"Labeling {n} scenes | est. cost ≈ ${est_cost:.3f} | "
f"est. time ≈ {est_min} min @ concurrency={CONCURRENCY}"
)
api_key = get_api_key()
client = AsyncOpenAI(api_key=api_key, base_url=DEEPSEEK_BASE)
# Quick connectivity check
try:
test = await client.chat.completions.create(
model=MODEL,
messages=[{"role": "user", "content": "Say OK"}],
temperature=0.1, max_tokens=5,
)
logger.success(f"API OK — {test.choices[0].message.content!r}")
except Exception as e:
logger.error(f"API failed: {e}")
sys.exit(1)
sem = asyncio.Semaphore(CONCURRENCY)
lock = asyncio.Lock()
n_done = [0]
with open(OUTPUT_PATH, "a", encoding="utf-8") as out_f:
tasks = [
label_one(client, scene, sem, n_done, n, out_f, lock)
for scene in todo
]
await asyncio.gather(*tasks)
total = len(already_done) + n_done[0]
logger.success(f"Done. Labeled={n_done[0]}, total in file={total}")
if __name__ == "__main__":
asyncio.run(main_async())