transcribe-diarize / cleanup_transcript_openai.py
Ratnesh-dev's picture
Add Infer Speaker And Transcript Cleanup using OpenAI GPT 5
cf57473
#!/usr/bin/env python3
import argparse
import json
from datetime import datetime
from pathlib import Path
from typing import Any
def _log(message: str) -> None:
print(f"[cleanup] {message}", flush=True)
def _load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _save_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
def _extract_json_object(text: str) -> dict[str, Any]:
text = text.strip()
if not text:
raise ValueError("Model returned empty text.")
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return parsed
except Exception:
pass
start = text.find("{")
while start >= 0:
depth = 0
for idx in range(start, len(text)):
ch = text[idx]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
candidate = text[start : idx + 1]
try:
parsed = json.loads(candidate)
if isinstance(parsed, dict):
return parsed
except Exception:
break
start = text.find("{", start + 1)
raise ValueError("Could not parse a JSON object from model output.")
def _response_to_dict(response: Any) -> dict[str, Any]:
if hasattr(response, "model_dump") and callable(response.model_dump):
return response.model_dump()
if hasattr(response, "to_dict") and callable(response.to_dict):
return response.to_dict()
return {"raw_response": str(response)}
def _response_text(response: Any) -> str:
output_text = getattr(response, "output_text", None)
if isinstance(output_text, str) and output_text.strip():
return output_text
data = _response_to_dict(response)
if isinstance(data, dict):
for key in ("output_text", "text"):
val = data.get(key)
if isinstance(val, str) and val.strip():
return val
return ""
def _usage_from_response_dict(payload: dict[str, Any]) -> dict[str, int | None]:
usage = payload.get("usage")
if not isinstance(usage, dict):
return {
"input_tokens": None,
"output_tokens": None,
"total_tokens": None,
"cached_input_tokens": None,
"reasoning_tokens": None,
}
input_details = usage.get("input_tokens_details", {})
output_details = usage.get("output_tokens_details", {})
return {
"input_tokens": usage.get("input_tokens"),
"output_tokens": usage.get("output_tokens"),
"total_tokens": usage.get("total_tokens"),
"cached_input_tokens": input_details.get("cached_tokens") if isinstance(input_details, dict) else None,
"reasoning_tokens": output_details.get("reasoning_tokens") if isinstance(output_details, dict) else None,
}
def _sum_usage(
first: dict[str, int | None],
second: dict[str, int | None],
) -> dict[str, int | None]:
def _sum_key(key: str) -> int | None:
a = first.get(key)
b = second.get(key)
if isinstance(a, int) and isinstance(b, int):
return a + b
if isinstance(a, int):
return a
if isinstance(b, int):
return b
return None
total = _sum_key("total_tokens")
input_tokens = _sum_key("input_tokens")
output_tokens = _sum_key("output_tokens")
if total is None and isinstance(input_tokens, int) and isinstance(output_tokens, int):
total = input_tokens + output_tokens
return {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total,
"cached_input_tokens": _sum_key("cached_input_tokens"),
"reasoning_tokens": _sum_key("reasoning_tokens"),
}
def _parse_executive_names(
*,
names_csv: str | None,
) -> list[str]:
out: list[str] = []
if names_csv:
for item in names_csv.split(","):
name = item.strip().strip('"').strip("'")
if name:
out.append(name)
# Preserve order while removing duplicates.
seen = set()
deduped: list[str] = []
for name in out:
key = name.lower()
if key in seen:
continue
seen.add(key)
deduped.append(name)
return deduped
def _build_intro_payload(turns: list[dict[str, Any]], intro_turn_limit: int) -> list[dict[str, Any]]:
sampled = turns[: max(1, intro_turn_limit)]
payload: list[dict[str, Any]] = []
for idx, turn in enumerate(sampled):
payload.append(
{
"turn_index": idx,
"speaker": turn.get("speaker"),
"start": turn.get("start"),
"end": turn.get("end"),
"text": turn.get("text"),
}
)
return payload
def _extract_qna_announcements(turns: list[dict[str, Any]], max_items: int = 200) -> list[dict[str, Any]]:
announcements: list[dict[str, Any]] = []
for idx, turn in enumerate(turns):
text = str(turn.get("text", "")).strip()
if not text:
continue
lowered = text.lower()
if "line of" in lowered and ("please go ahead" in lowered or "question" in lowered):
announcements.append(
{
"turn_index": idx,
"speaker": turn.get("speaker"),
"text": text,
}
)
if len(announcements) >= max_items:
break
return announcements
def _extract_response_id(response: Any, response_dict: dict[str, Any]) -> str | None:
rid = getattr(response, "id", None)
if isinstance(rid, str) and rid:
return rid
candidate = response_dict.get("id")
if isinstance(candidate, str) and candidate:
return candidate
return None
def run_cleanup_pipeline(
*,
input_file: Path,
api_key: str,
model: str,
output_dir: Path,
intro_turn_limit: int,
executive_names_csv: str | None,
) -> dict[str, Any]:
try:
from openai import OpenAI
except ImportError as exc:
raise RuntimeError(
"Missing dependency: openai. Install with `pip install openai`."
) from exc
_log("Loading transcript JSON...")
transcript_json = _load_json(input_file)
turns = transcript_json.get("turns")
if not isinstance(turns, list) or not turns:
raise ValueError("Input JSON must contain a non-empty `turns` list.")
_log("Parsing executive names input...")
executive_names = _parse_executive_names(
names_csv=executive_names_csv,
)
intro_turns_payload = _build_intro_payload(turns, intro_turn_limit=intro_turn_limit)
qna_announcements = _extract_qna_announcements(turns)
run_dir = output_dir / datetime.now().strftime("%Y%m%d_%H%M%S")
run_dir.mkdir(parents=True, exist_ok=True)
executive_names_out_path = run_dir / "executive_names.json"
_save_json(executive_names_out_path, {"names": executive_names})
_log(f"Run directory: {run_dir}")
_log(f"Saved executive names file: {executive_names_out_path}")
client = OpenAI(api_key=api_key)
speaker_map_system = (
"You are a transcript entity-resolution assistant. "
"Return strict JSON only, no markdown. "
"Infer speaker identities from transcript context."
)
speaker_map_user = json.dumps(
{
"task": "Infer speaker mapping from transcript context (intro + Q&A announcements).",
"rules": [
"Use explicit or near-explicit intro context ('I now hand over to ...', self-intros, operator intros).",
"Label any conference host/queue-management voice as exactly 'Operator' when they do call control.",
"Do not map Operator to an executive name.",
"Do not guess beyond evidence.",
"Prefer names from `executive_names` when they match context.",
"In Q&A, infer non-executive participant names from operator announcements such as 'line of <name> from <firm>', even if absent in executive list.",
"Keep unknown speakers as null names if evidence is weak.",
],
"output_schema": {
"speaker_mapping": [
{
"speaker_label": "SPEAKER_XX",
"inferred_name": "string or null",
"confidence": "number 0..1",
"evidence_turn_indexes": ["int"],
"reason": "short string",
}
],
"notes": ["string"],
},
"executive_names": executive_names,
"intro_turns": intro_turns_payload,
"qna_announcements": qna_announcements,
"transcript_turns": turns,
},
ensure_ascii=False,
)
_log("OpenAI call 1/2: inferring speaker mapping...")
speaker_map_response = client.responses.create(
model=model,
input=[
{"role": "system", "content": speaker_map_system},
{"role": "user", "content": speaker_map_user},
],
)
speaker_map_raw = _response_to_dict(speaker_map_response)
first_response_id = _extract_response_id(speaker_map_response, speaker_map_raw)
speaker_map_usage = _usage_from_response_dict(speaker_map_raw)
speaker_map_text = _response_text(speaker_map_response)
speaker_map_json = _extract_json_object(speaker_map_text)
speaker_map_path = run_dir / "speaker_mapping.json"
speaker_map_raw_path = run_dir / "speaker_mapping_raw_response.json"
_save_json(speaker_map_path, speaker_map_json)
_save_json(speaker_map_raw_path, speaker_map_raw)
cleanup_system = (
"You are a transcript cleanup and diarization refinement assistant. "
"Return strict JSON only, no markdown."
)
cleanup_payload_base = {
"task": "Clean transcript and produce final speaker-attributed turns.",
"rules": [
"Correct likely misspellings and improve punctuation/casing.",
"Remove false starts and repeated filler where safe, but keep meaning.",
"Standardize executive names to the canonical forms in `executive_names` where applicable.",
"Use `speaker_mapping` from call 1, but keep unknown labels if unsupported.",
"Label the conference host/control speaker as exactly 'Operator' when they are handling queue/instructions.",
"In Q&A, infer names not present in `executive_names` from context and operator announcements.",
"If a very short mid-sentence speaker switch is likely diarization noise, merge/reassign using sentence continuity.",
"Preserve turn order and timing progression.",
"Output speaker labels as inferred names when confidence is sufficient; otherwise keep SPEAKER_XX.",
"Do not invent facts not present in transcript context.",
],
"output_schema": {
"speaker_mapping_final": [
{
"source_label": "SPEAKER_XX",
"final_label": "Name or SPEAKER_XX",
"confidence": "number 0..1",
"reason": "short string",
}
],
"turns": [
{
"speaker": "Name or SPEAKER_XX",
"start": "float",
"end": "float",
"text": "cleaned text",
}
],
"summary": {
"turn_count": "int",
"speaker_count": "int",
"notes": ["string"],
},
},
"executive_names": executive_names,
"speaker_mapping": speaker_map_json.get("speaker_mapping", []),
}
cleanup_payload_with_turns = dict(cleanup_payload_base)
cleanup_payload_with_turns["transcript_turns"] = turns
cleanup_payload_context_only = dict(cleanup_payload_base)
cleanup_payload_context_only["context_hint"] = (
"Use the transcript context from the previous response. "
"Do not request retransmission."
)
_log("OpenAI call 2/2: cleaning transcript and refining speaker labels...")
cleanup_response = None
used_context_chaining = False
if first_response_id:
_log("Using previous_response_id context chaining for call 2.")
try:
cleanup_response = client.responses.create(
model=model,
previous_response_id=first_response_id,
input=[
{"role": "system", "content": cleanup_system},
{"role": "user", "content": json.dumps(cleanup_payload_context_only, ensure_ascii=False)},
],
)
used_context_chaining = True
except TypeError:
_log("Client does not support previous_response_id; falling back to explicit transcript payload.")
except Exception as exc:
_log(f"Context-chained call failed ({exc}); falling back to explicit transcript payload.")
if cleanup_response is None:
cleanup_response = client.responses.create(
model=model,
input=[
{"role": "system", "content": cleanup_system},
{"role": "user", "content": json.dumps(cleanup_payload_with_turns, ensure_ascii=False)},
],
)
cleanup_raw = _response_to_dict(cleanup_response)
cleanup_usage = _usage_from_response_dict(cleanup_raw)
cleanup_text = _response_text(cleanup_response)
cleaned_json = _extract_json_object(cleanup_text)
token_usage = {
"speaker_mapping_call": speaker_map_usage,
"cleanup_call": cleanup_usage,
"combined": _sum_usage(speaker_map_usage, cleanup_usage),
}
cleaned_json["inputs"] = {
"source_file": str(input_file),
"speaker_mapping_file": str(speaker_map_path),
"context_chaining_used_for_cleanup": used_context_chaining,
}
cleaned_json["openai_token_usage"] = token_usage
cleaned_path = run_dir / "cleaned_transcript.json"
cleaned_raw_path = run_dir / "cleanup_raw_response.json"
cleaned_text_path = run_dir / "cleaned_transcript.txt"
_save_json(cleaned_path, cleaned_json)
_save_json(cleaned_raw_path, cleanup_raw)
output_turns = cleaned_json.get("turns", [])
lines: list[str] = []
if isinstance(output_turns, list):
for turn in output_turns:
if not isinstance(turn, dict):
continue
speaker = str(turn.get("speaker", "SPEAKER_XX"))
text = str(turn.get("text", "")).strip()
if text:
lines.append(f"{speaker}: {text}")
cleaned_text_path.write_text("\n".join(lines), encoding="utf-8")
_log("Saved cleaned transcript outputs.")
run_summary = {
"run_dir": str(run_dir),
"input_file": str(input_file),
"model": model,
"speaker_mapping_file": str(speaker_map_path),
"speaker_mapping_raw_file": str(speaker_map_raw_path),
"cleaned_transcript_file": str(cleaned_path),
"cleaned_transcript_raw_file": str(cleaned_raw_path),
"cleaned_text_file": str(cleaned_text_path),
"intro_turn_limit": intro_turn_limit,
"executive_names_file": str(executive_names_out_path),
"context_chaining_used_for_cleanup": used_context_chaining,
"openai_token_usage": token_usage,
}
_save_json(run_dir / "run_summary.json", run_summary)
_log("Completed.")
return run_summary
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Run two OpenAI calls over a merged transcript JSON: "
"(1) speaker mapping inference, (2) cleaned/re-labeled transcript."
)
)
parser.add_argument("--input-file", required=True, help="Path to merged transcript JSON.")
parser.add_argument("--api-key", required=True, help="OpenAI API key.")
parser.add_argument("--model", default="gpt-5", help="OpenAI model ID (default: gpt-5).")
parser.add_argument(
"--intro-turn-limit",
type=int,
default=80,
help="Number of initial turns to use for speaker-introduction inference.",
)
parser.add_argument(
"--executive-names-csv",
default=None,
help='Comma-separated executive names, e.g. "Name A,Name B,Name C".',
)
parser.add_argument(
"--output-dir",
default="benchmark_outputs/cleanup_openai",
help="Directory to store outputs.",
)
args = parser.parse_args()
summary = run_cleanup_pipeline(
input_file=Path(args.input_file),
api_key=args.api_key,
model=args.model,
output_dir=Path(args.output_dir),
intro_turn_limit=args.intro_turn_limit,
executive_names_csv=args.executive_names_csv,
)
print(json.dumps(summary, indent=2))
if __name__ == "__main__":
main()