#!/usr/bin/env python3 import argparse import json from datetime import datetime from pathlib import Path from typing import Any def _log(message: str) -> None: print(f"[cleanup] {message}", flush=True) def _load_json(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as f: return json.load(f) def _save_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: json.dump(payload, f, indent=2, ensure_ascii=False) def _extract_json_object(text: str) -> dict[str, Any]: text = text.strip() if not text: raise ValueError("Model returned empty text.") try: parsed = json.loads(text) if isinstance(parsed, dict): return parsed except Exception: pass start = text.find("{") while start >= 0: depth = 0 for idx in range(start, len(text)): ch = text[idx] if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: candidate = text[start : idx + 1] try: parsed = json.loads(candidate) if isinstance(parsed, dict): return parsed except Exception: break start = text.find("{", start + 1) raise ValueError("Could not parse a JSON object from model output.") def _response_to_dict(response: Any) -> dict[str, Any]: if hasattr(response, "model_dump") and callable(response.model_dump): return response.model_dump() if hasattr(response, "to_dict") and callable(response.to_dict): return response.to_dict() return {"raw_response": str(response)} def _response_text(response: Any) -> str: output_text = getattr(response, "output_text", None) if isinstance(output_text, str) and output_text.strip(): return output_text data = _response_to_dict(response) if isinstance(data, dict): for key in ("output_text", "text"): val = data.get(key) if isinstance(val, str) and val.strip(): return val return "" def _usage_from_response_dict(payload: dict[str, Any]) -> dict[str, int | None]: usage = payload.get("usage") if not isinstance(usage, dict): return { "input_tokens": None, "output_tokens": None, "total_tokens": None, "cached_input_tokens": None, "reasoning_tokens": None, } input_details = usage.get("input_tokens_details", {}) output_details = usage.get("output_tokens_details", {}) return { "input_tokens": usage.get("input_tokens"), "output_tokens": usage.get("output_tokens"), "total_tokens": usage.get("total_tokens"), "cached_input_tokens": input_details.get("cached_tokens") if isinstance(input_details, dict) else None, "reasoning_tokens": output_details.get("reasoning_tokens") if isinstance(output_details, dict) else None, } def _sum_usage( first: dict[str, int | None], second: dict[str, int | None], ) -> dict[str, int | None]: def _sum_key(key: str) -> int | None: a = first.get(key) b = second.get(key) if isinstance(a, int) and isinstance(b, int): return a + b if isinstance(a, int): return a if isinstance(b, int): return b return None total = _sum_key("total_tokens") input_tokens = _sum_key("input_tokens") output_tokens = _sum_key("output_tokens") if total is None and isinstance(input_tokens, int) and isinstance(output_tokens, int): total = input_tokens + output_tokens return { "input_tokens": input_tokens, "output_tokens": output_tokens, "total_tokens": total, "cached_input_tokens": _sum_key("cached_input_tokens"), "reasoning_tokens": _sum_key("reasoning_tokens"), } def _parse_executive_names( *, names_csv: str | None, ) -> list[str]: out: list[str] = [] if names_csv: for item in names_csv.split(","): name = item.strip().strip('"').strip("'") if name: out.append(name) # Preserve order while removing duplicates. seen = set() deduped: list[str] = [] for name in out: key = name.lower() if key in seen: continue seen.add(key) deduped.append(name) return deduped def _build_intro_payload(turns: list[dict[str, Any]], intro_turn_limit: int) -> list[dict[str, Any]]: sampled = turns[: max(1, intro_turn_limit)] payload: list[dict[str, Any]] = [] for idx, turn in enumerate(sampled): payload.append( { "turn_index": idx, "speaker": turn.get("speaker"), "start": turn.get("start"), "end": turn.get("end"), "text": turn.get("text"), } ) return payload def _extract_qna_announcements(turns: list[dict[str, Any]], max_items: int = 200) -> list[dict[str, Any]]: announcements: list[dict[str, Any]] = [] for idx, turn in enumerate(turns): text = str(turn.get("text", "")).strip() if not text: continue lowered = text.lower() if "line of" in lowered and ("please go ahead" in lowered or "question" in lowered): announcements.append( { "turn_index": idx, "speaker": turn.get("speaker"), "text": text, } ) if len(announcements) >= max_items: break return announcements def _extract_response_id(response: Any, response_dict: dict[str, Any]) -> str | None: rid = getattr(response, "id", None) if isinstance(rid, str) and rid: return rid candidate = response_dict.get("id") if isinstance(candidate, str) and candidate: return candidate return None def run_cleanup_pipeline( *, input_file: Path, api_key: str, model: str, output_dir: Path, intro_turn_limit: int, executive_names_csv: str | None, ) -> dict[str, Any]: try: from openai import OpenAI except ImportError as exc: raise RuntimeError( "Missing dependency: openai. Install with `pip install openai`." ) from exc _log("Loading transcript JSON...") transcript_json = _load_json(input_file) turns = transcript_json.get("turns") if not isinstance(turns, list) or not turns: raise ValueError("Input JSON must contain a non-empty `turns` list.") _log("Parsing executive names input...") executive_names = _parse_executive_names( names_csv=executive_names_csv, ) intro_turns_payload = _build_intro_payload(turns, intro_turn_limit=intro_turn_limit) qna_announcements = _extract_qna_announcements(turns) run_dir = output_dir / datetime.now().strftime("%Y%m%d_%H%M%S") run_dir.mkdir(parents=True, exist_ok=True) executive_names_out_path = run_dir / "executive_names.json" _save_json(executive_names_out_path, {"names": executive_names}) _log(f"Run directory: {run_dir}") _log(f"Saved executive names file: {executive_names_out_path}") client = OpenAI(api_key=api_key) speaker_map_system = ( "You are a transcript entity-resolution assistant. " "Return strict JSON only, no markdown. " "Infer speaker identities from transcript context." ) speaker_map_user = json.dumps( { "task": "Infer speaker mapping from transcript context (intro + Q&A announcements).", "rules": [ "Use explicit or near-explicit intro context ('I now hand over to ...', self-intros, operator intros).", "Label any conference host/queue-management voice as exactly 'Operator' when they do call control.", "Do not map Operator to an executive name.", "Do not guess beyond evidence.", "Prefer names from `executive_names` when they match context.", "In Q&A, infer non-executive participant names from operator announcements such as 'line of from ', even if absent in executive list.", "Keep unknown speakers as null names if evidence is weak.", ], "output_schema": { "speaker_mapping": [ { "speaker_label": "SPEAKER_XX", "inferred_name": "string or null", "confidence": "number 0..1", "evidence_turn_indexes": ["int"], "reason": "short string", } ], "notes": ["string"], }, "executive_names": executive_names, "intro_turns": intro_turns_payload, "qna_announcements": qna_announcements, "transcript_turns": turns, }, ensure_ascii=False, ) _log("OpenAI call 1/2: inferring speaker mapping...") speaker_map_response = client.responses.create( model=model, input=[ {"role": "system", "content": speaker_map_system}, {"role": "user", "content": speaker_map_user}, ], ) speaker_map_raw = _response_to_dict(speaker_map_response) first_response_id = _extract_response_id(speaker_map_response, speaker_map_raw) speaker_map_usage = _usage_from_response_dict(speaker_map_raw) speaker_map_text = _response_text(speaker_map_response) speaker_map_json = _extract_json_object(speaker_map_text) speaker_map_path = run_dir / "speaker_mapping.json" speaker_map_raw_path = run_dir / "speaker_mapping_raw_response.json" _save_json(speaker_map_path, speaker_map_json) _save_json(speaker_map_raw_path, speaker_map_raw) cleanup_system = ( "You are a transcript cleanup and diarization refinement assistant. " "Return strict JSON only, no markdown." ) cleanup_payload_base = { "task": "Clean transcript and produce final speaker-attributed turns.", "rules": [ "Correct likely misspellings and improve punctuation/casing.", "Remove false starts and repeated filler where safe, but keep meaning.", "Standardize executive names to the canonical forms in `executive_names` where applicable.", "Use `speaker_mapping` from call 1, but keep unknown labels if unsupported.", "Label the conference host/control speaker as exactly 'Operator' when they are handling queue/instructions.", "In Q&A, infer names not present in `executive_names` from context and operator announcements.", "If a very short mid-sentence speaker switch is likely diarization noise, merge/reassign using sentence continuity.", "Preserve turn order and timing progression.", "Output speaker labels as inferred names when confidence is sufficient; otherwise keep SPEAKER_XX.", "Do not invent facts not present in transcript context.", ], "output_schema": { "speaker_mapping_final": [ { "source_label": "SPEAKER_XX", "final_label": "Name or SPEAKER_XX", "confidence": "number 0..1", "reason": "short string", } ], "turns": [ { "speaker": "Name or SPEAKER_XX", "start": "float", "end": "float", "text": "cleaned text", } ], "summary": { "turn_count": "int", "speaker_count": "int", "notes": ["string"], }, }, "executive_names": executive_names, "speaker_mapping": speaker_map_json.get("speaker_mapping", []), } cleanup_payload_with_turns = dict(cleanup_payload_base) cleanup_payload_with_turns["transcript_turns"] = turns cleanup_payload_context_only = dict(cleanup_payload_base) cleanup_payload_context_only["context_hint"] = ( "Use the transcript context from the previous response. " "Do not request retransmission." ) _log("OpenAI call 2/2: cleaning transcript and refining speaker labels...") cleanup_response = None used_context_chaining = False if first_response_id: _log("Using previous_response_id context chaining for call 2.") try: cleanup_response = client.responses.create( model=model, previous_response_id=first_response_id, input=[ {"role": "system", "content": cleanup_system}, {"role": "user", "content": json.dumps(cleanup_payload_context_only, ensure_ascii=False)}, ], ) used_context_chaining = True except TypeError: _log("Client does not support previous_response_id; falling back to explicit transcript payload.") except Exception as exc: _log(f"Context-chained call failed ({exc}); falling back to explicit transcript payload.") if cleanup_response is None: cleanup_response = client.responses.create( model=model, input=[ {"role": "system", "content": cleanup_system}, {"role": "user", "content": json.dumps(cleanup_payload_with_turns, ensure_ascii=False)}, ], ) cleanup_raw = _response_to_dict(cleanup_response) cleanup_usage = _usage_from_response_dict(cleanup_raw) cleanup_text = _response_text(cleanup_response) cleaned_json = _extract_json_object(cleanup_text) token_usage = { "speaker_mapping_call": speaker_map_usage, "cleanup_call": cleanup_usage, "combined": _sum_usage(speaker_map_usage, cleanup_usage), } cleaned_json["inputs"] = { "source_file": str(input_file), "speaker_mapping_file": str(speaker_map_path), "context_chaining_used_for_cleanup": used_context_chaining, } cleaned_json["openai_token_usage"] = token_usage cleaned_path = run_dir / "cleaned_transcript.json" cleaned_raw_path = run_dir / "cleanup_raw_response.json" cleaned_text_path = run_dir / "cleaned_transcript.txt" _save_json(cleaned_path, cleaned_json) _save_json(cleaned_raw_path, cleanup_raw) output_turns = cleaned_json.get("turns", []) lines: list[str] = [] if isinstance(output_turns, list): for turn in output_turns: if not isinstance(turn, dict): continue speaker = str(turn.get("speaker", "SPEAKER_XX")) text = str(turn.get("text", "")).strip() if text: lines.append(f"{speaker}: {text}") cleaned_text_path.write_text("\n".join(lines), encoding="utf-8") _log("Saved cleaned transcript outputs.") run_summary = { "run_dir": str(run_dir), "input_file": str(input_file), "model": model, "speaker_mapping_file": str(speaker_map_path), "speaker_mapping_raw_file": str(speaker_map_raw_path), "cleaned_transcript_file": str(cleaned_path), "cleaned_transcript_raw_file": str(cleaned_raw_path), "cleaned_text_file": str(cleaned_text_path), "intro_turn_limit": intro_turn_limit, "executive_names_file": str(executive_names_out_path), "context_chaining_used_for_cleanup": used_context_chaining, "openai_token_usage": token_usage, } _save_json(run_dir / "run_summary.json", run_summary) _log("Completed.") return run_summary def main() -> None: parser = argparse.ArgumentParser( description=( "Run two OpenAI calls over a merged transcript JSON: " "(1) speaker mapping inference, (2) cleaned/re-labeled transcript." ) ) parser.add_argument("--input-file", required=True, help="Path to merged transcript JSON.") parser.add_argument("--api-key", required=True, help="OpenAI API key.") parser.add_argument("--model", default="gpt-5", help="OpenAI model ID (default: gpt-5).") parser.add_argument( "--intro-turn-limit", type=int, default=80, help="Number of initial turns to use for speaker-introduction inference.", ) parser.add_argument( "--executive-names-csv", default=None, help='Comma-separated executive names, e.g. "Name A,Name B,Name C".', ) parser.add_argument( "--output-dir", default="benchmark_outputs/cleanup_openai", help="Directory to store outputs.", ) args = parser.parse_args() summary = run_cleanup_pipeline( input_file=Path(args.input_file), api_key=args.api_key, model=args.model, output_dir=Path(args.output_dir), intro_turn_limit=args.intro_turn_limit, executive_names_csv=args.executive_names_csv, ) print(json.dumps(summary, indent=2)) if __name__ == "__main__": main()