Spaces:
Running on Zero
Running on Zero
| #!/usr/bin/env python3 | |
| import argparse | |
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any | |
| def _log(message: str) -> None: | |
| print(f"[cleanup] {message}", flush=True) | |
| def _load_json(path: Path) -> dict[str, Any]: | |
| with path.open("r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def _save_json(path: Path, payload: Any) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("w", encoding="utf-8") as f: | |
| json.dump(payload, f, indent=2, ensure_ascii=False) | |
| def _extract_json_object(text: str) -> dict[str, Any]: | |
| text = text.strip() | |
| if not text: | |
| raise ValueError("Model returned empty text.") | |
| try: | |
| parsed = json.loads(text) | |
| if isinstance(parsed, dict): | |
| return parsed | |
| except Exception: | |
| pass | |
| start = text.find("{") | |
| while start >= 0: | |
| depth = 0 | |
| for idx in range(start, len(text)): | |
| ch = text[idx] | |
| if ch == "{": | |
| depth += 1 | |
| elif ch == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| candidate = text[start : idx + 1] | |
| try: | |
| parsed = json.loads(candidate) | |
| if isinstance(parsed, dict): | |
| return parsed | |
| except Exception: | |
| break | |
| start = text.find("{", start + 1) | |
| raise ValueError("Could not parse a JSON object from model output.") | |
| def _response_to_dict(response: Any) -> dict[str, Any]: | |
| if hasattr(response, "model_dump") and callable(response.model_dump): | |
| return response.model_dump() | |
| if hasattr(response, "to_dict") and callable(response.to_dict): | |
| return response.to_dict() | |
| return {"raw_response": str(response)} | |
| def _response_text(response: Any) -> str: | |
| output_text = getattr(response, "output_text", None) | |
| if isinstance(output_text, str) and output_text.strip(): | |
| return output_text | |
| data = _response_to_dict(response) | |
| if isinstance(data, dict): | |
| for key in ("output_text", "text"): | |
| val = data.get(key) | |
| if isinstance(val, str) and val.strip(): | |
| return val | |
| return "" | |
| def _usage_from_response_dict(payload: dict[str, Any]) -> dict[str, int | None]: | |
| usage = payload.get("usage") | |
| if not isinstance(usage, dict): | |
| return { | |
| "input_tokens": None, | |
| "output_tokens": None, | |
| "total_tokens": None, | |
| "cached_input_tokens": None, | |
| "reasoning_tokens": None, | |
| } | |
| input_details = usage.get("input_tokens_details", {}) | |
| output_details = usage.get("output_tokens_details", {}) | |
| return { | |
| "input_tokens": usage.get("input_tokens"), | |
| "output_tokens": usage.get("output_tokens"), | |
| "total_tokens": usage.get("total_tokens"), | |
| "cached_input_tokens": input_details.get("cached_tokens") if isinstance(input_details, dict) else None, | |
| "reasoning_tokens": output_details.get("reasoning_tokens") if isinstance(output_details, dict) else None, | |
| } | |
| def _sum_usage( | |
| first: dict[str, int | None], | |
| second: dict[str, int | None], | |
| ) -> dict[str, int | None]: | |
| def _sum_key(key: str) -> int | None: | |
| a = first.get(key) | |
| b = second.get(key) | |
| if isinstance(a, int) and isinstance(b, int): | |
| return a + b | |
| if isinstance(a, int): | |
| return a | |
| if isinstance(b, int): | |
| return b | |
| return None | |
| total = _sum_key("total_tokens") | |
| input_tokens = _sum_key("input_tokens") | |
| output_tokens = _sum_key("output_tokens") | |
| if total is None and isinstance(input_tokens, int) and isinstance(output_tokens, int): | |
| total = input_tokens + output_tokens | |
| return { | |
| "input_tokens": input_tokens, | |
| "output_tokens": output_tokens, | |
| "total_tokens": total, | |
| "cached_input_tokens": _sum_key("cached_input_tokens"), | |
| "reasoning_tokens": _sum_key("reasoning_tokens"), | |
| } | |
| def _parse_executive_names( | |
| *, | |
| names_csv: str | None, | |
| ) -> list[str]: | |
| out: list[str] = [] | |
| if names_csv: | |
| for item in names_csv.split(","): | |
| name = item.strip().strip('"').strip("'") | |
| if name: | |
| out.append(name) | |
| # Preserve order while removing duplicates. | |
| seen = set() | |
| deduped: list[str] = [] | |
| for name in out: | |
| key = name.lower() | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| deduped.append(name) | |
| return deduped | |
| def _build_intro_payload(turns: list[dict[str, Any]], intro_turn_limit: int) -> list[dict[str, Any]]: | |
| sampled = turns[: max(1, intro_turn_limit)] | |
| payload: list[dict[str, Any]] = [] | |
| for idx, turn in enumerate(sampled): | |
| payload.append( | |
| { | |
| "turn_index": idx, | |
| "speaker": turn.get("speaker"), | |
| "start": turn.get("start"), | |
| "end": turn.get("end"), | |
| "text": turn.get("text"), | |
| } | |
| ) | |
| return payload | |
| def _extract_qna_announcements(turns: list[dict[str, Any]], max_items: int = 200) -> list[dict[str, Any]]: | |
| announcements: list[dict[str, Any]] = [] | |
| for idx, turn in enumerate(turns): | |
| text = str(turn.get("text", "")).strip() | |
| if not text: | |
| continue | |
| lowered = text.lower() | |
| if "line of" in lowered and ("please go ahead" in lowered or "question" in lowered): | |
| announcements.append( | |
| { | |
| "turn_index": idx, | |
| "speaker": turn.get("speaker"), | |
| "text": text, | |
| } | |
| ) | |
| if len(announcements) >= max_items: | |
| break | |
| return announcements | |
| def _extract_response_id(response: Any, response_dict: dict[str, Any]) -> str | None: | |
| rid = getattr(response, "id", None) | |
| if isinstance(rid, str) and rid: | |
| return rid | |
| candidate = response_dict.get("id") | |
| if isinstance(candidate, str) and candidate: | |
| return candidate | |
| return None | |
| def run_cleanup_pipeline( | |
| *, | |
| input_file: Path, | |
| api_key: str, | |
| model: str, | |
| output_dir: Path, | |
| intro_turn_limit: int, | |
| executive_names_csv: str | None, | |
| ) -> dict[str, Any]: | |
| try: | |
| from openai import OpenAI | |
| except ImportError as exc: | |
| raise RuntimeError( | |
| "Missing dependency: openai. Install with `pip install openai`." | |
| ) from exc | |
| _log("Loading transcript JSON...") | |
| transcript_json = _load_json(input_file) | |
| turns = transcript_json.get("turns") | |
| if not isinstance(turns, list) or not turns: | |
| raise ValueError("Input JSON must contain a non-empty `turns` list.") | |
| _log("Parsing executive names input...") | |
| executive_names = _parse_executive_names( | |
| names_csv=executive_names_csv, | |
| ) | |
| intro_turns_payload = _build_intro_payload(turns, intro_turn_limit=intro_turn_limit) | |
| qna_announcements = _extract_qna_announcements(turns) | |
| run_dir = output_dir / datetime.now().strftime("%Y%m%d_%H%M%S") | |
| run_dir.mkdir(parents=True, exist_ok=True) | |
| executive_names_out_path = run_dir / "executive_names.json" | |
| _save_json(executive_names_out_path, {"names": executive_names}) | |
| _log(f"Run directory: {run_dir}") | |
| _log(f"Saved executive names file: {executive_names_out_path}") | |
| client = OpenAI(api_key=api_key) | |
| speaker_map_system = ( | |
| "You are a transcript entity-resolution assistant. " | |
| "Return strict JSON only, no markdown. " | |
| "Infer speaker identities from transcript context." | |
| ) | |
| speaker_map_user = json.dumps( | |
| { | |
| "task": "Infer speaker mapping from transcript context (intro + Q&A announcements).", | |
| "rules": [ | |
| "Use explicit or near-explicit intro context ('I now hand over to ...', self-intros, operator intros).", | |
| "Label any conference host/queue-management voice as exactly 'Operator' when they do call control.", | |
| "Do not map Operator to an executive name.", | |
| "Do not guess beyond evidence.", | |
| "Prefer names from `executive_names` when they match context.", | |
| "In Q&A, infer non-executive participant names from operator announcements such as 'line of <name> from <firm>', even if absent in executive list.", | |
| "Keep unknown speakers as null names if evidence is weak.", | |
| ], | |
| "output_schema": { | |
| "speaker_mapping": [ | |
| { | |
| "speaker_label": "SPEAKER_XX", | |
| "inferred_name": "string or null", | |
| "confidence": "number 0..1", | |
| "evidence_turn_indexes": ["int"], | |
| "reason": "short string", | |
| } | |
| ], | |
| "notes": ["string"], | |
| }, | |
| "executive_names": executive_names, | |
| "intro_turns": intro_turns_payload, | |
| "qna_announcements": qna_announcements, | |
| "transcript_turns": turns, | |
| }, | |
| ensure_ascii=False, | |
| ) | |
| _log("OpenAI call 1/2: inferring speaker mapping...") | |
| speaker_map_response = client.responses.create( | |
| model=model, | |
| input=[ | |
| {"role": "system", "content": speaker_map_system}, | |
| {"role": "user", "content": speaker_map_user}, | |
| ], | |
| ) | |
| speaker_map_raw = _response_to_dict(speaker_map_response) | |
| first_response_id = _extract_response_id(speaker_map_response, speaker_map_raw) | |
| speaker_map_usage = _usage_from_response_dict(speaker_map_raw) | |
| speaker_map_text = _response_text(speaker_map_response) | |
| speaker_map_json = _extract_json_object(speaker_map_text) | |
| speaker_map_path = run_dir / "speaker_mapping.json" | |
| speaker_map_raw_path = run_dir / "speaker_mapping_raw_response.json" | |
| _save_json(speaker_map_path, speaker_map_json) | |
| _save_json(speaker_map_raw_path, speaker_map_raw) | |
| cleanup_system = ( | |
| "You are a transcript cleanup and diarization refinement assistant. " | |
| "Return strict JSON only, no markdown." | |
| ) | |
| cleanup_payload_base = { | |
| "task": "Clean transcript and produce final speaker-attributed turns.", | |
| "rules": [ | |
| "Correct likely misspellings and improve punctuation/casing.", | |
| "Remove false starts and repeated filler where safe, but keep meaning.", | |
| "Standardize executive names to the canonical forms in `executive_names` where applicable.", | |
| "Use `speaker_mapping` from call 1, but keep unknown labels if unsupported.", | |
| "Label the conference host/control speaker as exactly 'Operator' when they are handling queue/instructions.", | |
| "In Q&A, infer names not present in `executive_names` from context and operator announcements.", | |
| "If a very short mid-sentence speaker switch is likely diarization noise, merge/reassign using sentence continuity.", | |
| "Preserve turn order and timing progression.", | |
| "Output speaker labels as inferred names when confidence is sufficient; otherwise keep SPEAKER_XX.", | |
| "Do not invent facts not present in transcript context.", | |
| ], | |
| "output_schema": { | |
| "speaker_mapping_final": [ | |
| { | |
| "source_label": "SPEAKER_XX", | |
| "final_label": "Name or SPEAKER_XX", | |
| "confidence": "number 0..1", | |
| "reason": "short string", | |
| } | |
| ], | |
| "turns": [ | |
| { | |
| "speaker": "Name or SPEAKER_XX", | |
| "start": "float", | |
| "end": "float", | |
| "text": "cleaned text", | |
| } | |
| ], | |
| "summary": { | |
| "turn_count": "int", | |
| "speaker_count": "int", | |
| "notes": ["string"], | |
| }, | |
| }, | |
| "executive_names": executive_names, | |
| "speaker_mapping": speaker_map_json.get("speaker_mapping", []), | |
| } | |
| cleanup_payload_with_turns = dict(cleanup_payload_base) | |
| cleanup_payload_with_turns["transcript_turns"] = turns | |
| cleanup_payload_context_only = dict(cleanup_payload_base) | |
| cleanup_payload_context_only["context_hint"] = ( | |
| "Use the transcript context from the previous response. " | |
| "Do not request retransmission." | |
| ) | |
| _log("OpenAI call 2/2: cleaning transcript and refining speaker labels...") | |
| cleanup_response = None | |
| used_context_chaining = False | |
| if first_response_id: | |
| _log("Using previous_response_id context chaining for call 2.") | |
| try: | |
| cleanup_response = client.responses.create( | |
| model=model, | |
| previous_response_id=first_response_id, | |
| input=[ | |
| {"role": "system", "content": cleanup_system}, | |
| {"role": "user", "content": json.dumps(cleanup_payload_context_only, ensure_ascii=False)}, | |
| ], | |
| ) | |
| used_context_chaining = True | |
| except TypeError: | |
| _log("Client does not support previous_response_id; falling back to explicit transcript payload.") | |
| except Exception as exc: | |
| _log(f"Context-chained call failed ({exc}); falling back to explicit transcript payload.") | |
| if cleanup_response is None: | |
| cleanup_response = client.responses.create( | |
| model=model, | |
| input=[ | |
| {"role": "system", "content": cleanup_system}, | |
| {"role": "user", "content": json.dumps(cleanup_payload_with_turns, ensure_ascii=False)}, | |
| ], | |
| ) | |
| cleanup_raw = _response_to_dict(cleanup_response) | |
| cleanup_usage = _usage_from_response_dict(cleanup_raw) | |
| cleanup_text = _response_text(cleanup_response) | |
| cleaned_json = _extract_json_object(cleanup_text) | |
| token_usage = { | |
| "speaker_mapping_call": speaker_map_usage, | |
| "cleanup_call": cleanup_usage, | |
| "combined": _sum_usage(speaker_map_usage, cleanup_usage), | |
| } | |
| cleaned_json["inputs"] = { | |
| "source_file": str(input_file), | |
| "speaker_mapping_file": str(speaker_map_path), | |
| "context_chaining_used_for_cleanup": used_context_chaining, | |
| } | |
| cleaned_json["openai_token_usage"] = token_usage | |
| cleaned_path = run_dir / "cleaned_transcript.json" | |
| cleaned_raw_path = run_dir / "cleanup_raw_response.json" | |
| cleaned_text_path = run_dir / "cleaned_transcript.txt" | |
| _save_json(cleaned_path, cleaned_json) | |
| _save_json(cleaned_raw_path, cleanup_raw) | |
| output_turns = cleaned_json.get("turns", []) | |
| lines: list[str] = [] | |
| if isinstance(output_turns, list): | |
| for turn in output_turns: | |
| if not isinstance(turn, dict): | |
| continue | |
| speaker = str(turn.get("speaker", "SPEAKER_XX")) | |
| text = str(turn.get("text", "")).strip() | |
| if text: | |
| lines.append(f"{speaker}: {text}") | |
| cleaned_text_path.write_text("\n".join(lines), encoding="utf-8") | |
| _log("Saved cleaned transcript outputs.") | |
| run_summary = { | |
| "run_dir": str(run_dir), | |
| "input_file": str(input_file), | |
| "model": model, | |
| "speaker_mapping_file": str(speaker_map_path), | |
| "speaker_mapping_raw_file": str(speaker_map_raw_path), | |
| "cleaned_transcript_file": str(cleaned_path), | |
| "cleaned_transcript_raw_file": str(cleaned_raw_path), | |
| "cleaned_text_file": str(cleaned_text_path), | |
| "intro_turn_limit": intro_turn_limit, | |
| "executive_names_file": str(executive_names_out_path), | |
| "context_chaining_used_for_cleanup": used_context_chaining, | |
| "openai_token_usage": token_usage, | |
| } | |
| _save_json(run_dir / "run_summary.json", run_summary) | |
| _log("Completed.") | |
| return run_summary | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Run two OpenAI calls over a merged transcript JSON: " | |
| "(1) speaker mapping inference, (2) cleaned/re-labeled transcript." | |
| ) | |
| ) | |
| parser.add_argument("--input-file", required=True, help="Path to merged transcript JSON.") | |
| parser.add_argument("--api-key", required=True, help="OpenAI API key.") | |
| parser.add_argument("--model", default="gpt-5", help="OpenAI model ID (default: gpt-5).") | |
| parser.add_argument( | |
| "--intro-turn-limit", | |
| type=int, | |
| default=80, | |
| help="Number of initial turns to use for speaker-introduction inference.", | |
| ) | |
| parser.add_argument( | |
| "--executive-names-csv", | |
| default=None, | |
| help='Comma-separated executive names, e.g. "Name A,Name B,Name C".', | |
| ) | |
| parser.add_argument( | |
| "--output-dir", | |
| default="benchmark_outputs/cleanup_openai", | |
| help="Directory to store outputs.", | |
| ) | |
| args = parser.parse_args() | |
| summary = run_cleanup_pipeline( | |
| input_file=Path(args.input_file), | |
| api_key=args.api_key, | |
| model=args.model, | |
| output_dir=Path(args.output_dir), | |
| intro_turn_limit=args.intro_turn_limit, | |
| executive_names_csv=args.executive_names_csv, | |
| ) | |
| print(json.dumps(summary, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |