Ratnesh-dev commited on
Commit
993d3cf
·
1 Parent(s): 79b12d3

Use Chunking For OpenAI API Calls

Browse files
Files changed (2) hide show
  1. app.py +3 -2
  2. src/openai_cleanup_service.py +212 -192
app.py CHANGED
@@ -164,9 +164,10 @@ def run_complete_pipeline(
164
  merged_transcript=merged_transcript,
165
  openai_api_key=openai_api_key,
166
  executive_names_csv=executive_names_csv,
167
- speaker_map_model="gpt-5-mini",
168
  cleanup_model="gpt-5",
169
- timeout_seconds=1800.0,
 
 
170
  )
171
  cleaned_transcript = openai_result["cleaned_transcript"]
172
 
 
164
  merged_transcript=merged_transcript,
165
  openai_api_key=openai_api_key,
166
  executive_names_csv=executive_names_csv,
 
167
  cleanup_model="gpt-5",
168
+ timeout_seconds=600.0,
169
+ max_turns_per_chunk=80,
170
+ max_chars_per_chunk=22000,
171
  )
172
  cleaned_transcript = openai_result["cleaned_transcript"]
173
 
src/openai_cleanup_service.py CHANGED
@@ -1,5 +1,4 @@
1
  import json
2
- import re
3
  from typing import Any
4
 
5
 
@@ -132,99 +131,105 @@ def _parse_executive_names(names_csv: str | None) -> list[str]:
132
  return deduped
133
 
134
 
135
- def _trim_text(value: str, max_chars: int) -> str:
136
- if max_chars <= 0 or len(value) <= max_chars:
137
- return value
138
- return value[:max_chars]
139
-
140
-
141
- def _build_intro_payload(turns: list[dict[str, Any]], intro_turn_limit: int, intro_text_char_limit: int) -> list[dict[str, Any]]:
142
- sampled = turns[: max(1, intro_turn_limit)]
143
- payload: list[dict[str, Any]] = []
144
- for idx, turn in enumerate(sampled):
145
- text = str(turn.get("text", ""))
146
- payload.append(
147
- {
148
- "turn_index": idx,
149
- "speaker": turn.get("speaker"),
150
- "start": turn.get("start"),
151
- "end": turn.get("end"),
152
- "text": _trim_text(text, intro_text_char_limit),
153
- }
154
- )
155
- return payload
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
 
158
- def _extract_qna_announcements(
159
- turns: list[dict[str, Any]],
160
- max_items: int = 80,
161
- text_char_limit: int = 280,
162
  ) -> list[dict[str, Any]]:
163
- announcements: list[dict[str, Any]] = []
164
- for idx, turn in enumerate(turns):
165
- text = str(turn.get("text", "")).strip()
166
- if not text:
167
- continue
168
- lowered = text.lower()
169
- if "line of" in lowered and ("please go ahead" in lowered or "question" in lowered):
170
- announcements.append(
171
- {
172
- "turn_index": idx,
173
- "speaker": turn.get("speaker"),
174
- "text": _trim_text(text, text_char_limit),
175
- }
176
- )
177
- if len(announcements) >= max_items:
178
- break
179
- return announcements
180
-
181
-
182
- def _extract_qna_name_candidates(qna_announcements: list[dict[str, Any]]) -> list[dict[str, Any]]:
183
- patterns = [
184
- r"line of\s+(.+?)\s+from\s+(.+?)(?:\.|,|please go ahead|$)",
185
- r"question (?:comes|is)\s+from\s+the line of\s+(.+?)\s+from\s+(.+?)(?:\.|,|please go ahead|$)",
186
- r"question (?:comes|is)\s+(.+?)\s+from\s+(.+?)(?:\.|,|please go ahead|$)",
187
- ]
188
  out: list[dict[str, Any]] = []
189
- seen = set()
190
- for item in qna_announcements:
191
- text = str(item.get("text", ""))
192
- lowered = text.lower()
193
- for p in patterns:
194
- match = re.search(p, lowered, flags=re.IGNORECASE)
195
- if not match:
196
- continue
197
- raw_name = text[match.start(1) : match.end(1)].strip(" .,:;")
198
- raw_firm = text[match.start(2) : match.end(2)].strip(" .,:;")
199
- key = (raw_name.lower(), raw_firm.lower())
200
- if key in seen:
201
- break
202
- seen.add(key)
203
- out.append({"name": raw_name, "firm": raw_firm})
204
- break
205
- return out
206
 
 
 
 
207
 
208
- def _build_speaker_label_map(turns: list[dict[str, Any]], speaker_mapping: list[dict[str, Any]] | Any) -> dict[str, str]:
209
- label_map: dict[str, str] = {}
210
- for turn in turns:
211
- speaker = str(turn.get("speaker", "")).strip()
212
- if speaker:
213
- label_map.setdefault(speaker, speaker)
214
-
215
- if isinstance(speaker_mapping, list):
216
- for item in speaker_mapping:
217
- if not isinstance(item, dict):
218
- continue
219
- source = str(item.get("speaker_label", "")).strip()
220
- inferred = str(item.get("inferred_name", "")).strip()
221
- if not source:
222
- continue
223
- if inferred:
224
- label_map[source] = inferred
225
- else:
226
- label_map.setdefault(source, source)
227
- return label_map
228
 
229
 
230
  def run_openai_cleanup_pipeline(
@@ -232,12 +237,15 @@ def run_openai_cleanup_pipeline(
232
  openai_api_key: str,
233
  executive_names_csv: str | None,
234
  *,
235
- speaker_map_model: str = "gpt-5-mini",
236
  cleanup_model: str = "gpt-5",
237
- timeout_seconds: float = 1800.0,
238
- intro_turn_limit: int = 24,
239
- intro_text_char_limit: int = 600,
240
  ) -> dict[str, Any]:
 
 
 
 
241
  try:
242
  from openai import OpenAI
243
  except ImportError as exc:
@@ -248,129 +256,141 @@ def run_openai_cleanup_pipeline(
248
  raise ValueError("Merged transcript must contain a non-empty `turns` list.")
249
 
250
  executive_names = _parse_executive_names(executive_names_csv)
251
- intro_turns_payload = _build_intro_payload(turns, intro_turn_limit=intro_turn_limit, intro_text_char_limit=intro_text_char_limit)
252
- qna_announcements = _extract_qna_announcements(turns)
253
- qna_name_candidates = _extract_qna_name_candidates(qna_announcements)
 
 
254
 
255
  client = OpenAI(api_key=openai_api_key, timeout=timeout_seconds, max_retries=0)
256
 
257
- speaker_map_system = (
258
- "You map speaker labels to names. Return strict JSON only. "
259
- "Infer identities from transcript context."
260
- )
261
- speaker_map_user = _dumps_compact(
262
- {
263
- "task": "Infer speaker names from intro and Q&A context snippets.",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
264
  "rules": [
265
- "Use provided intro turns and Q&A announcements only.",
266
- "If a speaker is clearly queue-management/call-control voice, label as Operator.",
267
- "Do not guess beyond evidence.",
268
- "Prefer names from `executive_names` when they match context.",
269
- "If first name matches in `executive_names` but last name is missing/uncertain, use just the first name as the speaker label.",
270
- "Q&A participant names may be absent from executive list; infer only if explicit in announcement/context.",
271
- "If uncertain, return null inferred_name.",
272
- "If inferred_name is present, it must be only a person/role name and must not include SPEAKER_XX or separator forms like Name|SPEAKER_XX.",
 
 
 
 
 
273
  ],
274
  "output_schema": {
275
- "speaker_mapping": [
 
 
 
276
  {
277
- "speaker_label": "SPEAKER_XX",
278
- "inferred_name": "string|null",
279
- "confidence": "0..1",
280
- "reason": "short",
 
281
  }
282
  ],
283
  "notes": ["string"],
284
  },
285
  "executive_names": executive_names,
286
- "intro_turns": intro_turns_payload,
287
- "qna_announcements": qna_announcements,
288
- "qna_name_candidates": qna_name_candidates,
 
289
  }
290
- )
291
 
292
- speaker_map_response = client.responses.create(
293
- model=speaker_map_model,
294
- input=[
295
- {"role": "system", "content": speaker_map_system},
296
- {"role": "user", "content": speaker_map_user},
297
- ],
298
- )
299
- speaker_map_raw = _response_to_dict(speaker_map_response)
300
- speaker_map_usage = _usage_from_response_dict(speaker_map_raw)
301
- speaker_map_json = _extract_json_object(_response_text(speaker_map_response))
302
- speaker_label_map = _build_speaker_label_map(turns, speaker_map_json.get("speaker_mapping", []))
303
-
304
- cleanup_system = "You are a transcript cleanup and diarization refinement assistant. Return strict JSON only, no markdown."
305
- cleanup_payload = {
306
- "task": "Clean transcript and produce final speaker-attributed turns.",
307
- "rules": [
308
- "Correct likely misspellings and improve punctuation/casing.",
309
- "Only remove filler words (for example: uh, um, you know, like) and clear false-start words/phrases.",
310
- "Do not aggressively summarize, compress, or paraphrase full sentences.",
311
- "Preserve substantive wording and as much original content as possible while cleaning.",
312
- "When uncertain whether text is filler, keep the text.",
313
- "Standardize executive names to the canonical forms in `executive_names` where applicable.",
314
- "Use `speaker_label_map` from call 1 as the single source of truth for speaker labels.",
315
- "Do not infer any new speaker identities in this call.",
316
- "If a source label is not mapped to a name, keep the original generic label (for example SPEAKER_02).",
317
- "If a very short mid-sentence speaker switch is likely diarization noise, merge/reassign using sentence continuity.",
318
- "Preserve turn order and timing progression.",
319
- "Never output combined labels like 'Name|SPEAKER_XX' or 'Name (SPEAKER_XX)'.",
320
- "Do not invent facts not present in transcript context.",
321
- ],
322
- "output_schema": {
323
- "speaker_mapping_final": [
324
- {
325
- "source_label": "SPEAKER_XX",
326
- "final_label": "string (either inferred name only OR SPEAKER_XX only)",
327
- "confidence": "0..1",
328
- "reason": "short",
329
- }
330
- ],
331
- "turns": [
332
  {
333
- "speaker": "string (either inferred name only OR SPEAKER_XX only)",
334
- "start": "float",
335
- "end": "float",
336
- "text": "cleaned text",
337
- }
338
  ],
339
- "summary": {"turn_count": "int", "speaker_count": "int", "notes": ["string"]},
340
- },
341
- "executive_names": executive_names,
342
- "speaker_label_map": speaker_label_map,
343
- "transcript_turns": turns,
344
- }
345
 
346
- cleanup_response = client.responses.create(
347
- model=cleanup_model,
348
- input=[
349
- {"role": "system", "content": cleanup_system},
350
- {"role": "user", "content": _dumps_compact(cleanup_payload)},
351
- ],
352
- )
353
- cleanup_raw = _response_to_dict(cleanup_response)
354
- cleanup_usage = _usage_from_response_dict(cleanup_raw)
355
- cleaned_json = _extract_json_object(_response_text(cleanup_response))
356
-
357
- token_usage = {
358
- "speaker_mapping_call": speaker_map_usage,
359
- "cleanup_call": cleanup_usage,
360
- "combined": _sum_usage(speaker_map_usage, cleanup_usage),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
361
  }
362
- cleaned_json["openai_token_usage"] = token_usage
363
 
364
  return {
365
  "cleaned_transcript": cleaned_json,
366
  "debug": {
367
- "speaker_map_model": speaker_map_model,
368
  "cleanup_model": cleanup_model,
369
  "executive_names": executive_names,
370
- "speaker_mapping": speaker_map_json,
371
- "speaker_label_map": speaker_label_map,
372
- "speaker_mapping_raw_response": speaker_map_raw,
373
- "cleanup_raw_response": cleanup_raw,
374
- "openai_token_usage": token_usage,
375
  },
376
  }
 
1
  import json
 
2
  from typing import Any
3
 
4
 
 
131
  return deduped
132
 
133
 
134
+ def _build_chunk_plan(
135
+ turns: list[dict[str, Any]],
136
+ max_turns_per_chunk: int,
137
+ max_chars_per_chunk: int,
138
+ ) -> list[dict[str, int]]:
139
+ if max_turns_per_chunk <= 0:
140
+ max_turns_per_chunk = 1
141
+ if max_chars_per_chunk <= 0:
142
+ max_chars_per_chunk = 12000
143
+
144
+ plan: list[dict[str, int]] = []
145
+ n = len(turns)
146
+ start = 0
147
+ while start < n:
148
+ end = start
149
+ turns_count = 0
150
+ chars_count = 0
151
+ while end < n:
152
+ t = turns[end]
153
+ text_len = len(str(t.get("text", "")))
154
+ est = text_len + 60
155
+ if turns_count > 0 and (turns_count >= max_turns_per_chunk or chars_count + est > max_chars_per_chunk):
156
+ break
157
+ turns_count += 1
158
+ chars_count += est
159
+ end += 1
160
+ if end == start:
161
+ end = min(n, start + 1)
162
+ plan.append({"start": start, "end": end})
163
+ start = end
164
+ return plan
165
+
166
+
167
+ def _normalize_final_label(final_label: str, source_label: str) -> str:
168
+ label = str(final_label or "").strip()
169
+ if not label:
170
+ return source_label
171
+ if "|" in label:
172
+ left = label.split("|", 1)[0].strip()
173
+ if left:
174
+ label = left
175
+ suffix = f"({source_label})"
176
+ if label.endswith(suffix):
177
+ label = label[: -len(suffix)].strip()
178
+ if not label:
179
+ return source_label
180
+ return label
181
+
182
+
183
+ def _extract_map_updates(parsed: dict[str, Any]) -> list[dict[str, str]]:
184
+ candidates = parsed.get("speaker_label_map_updates")
185
+ if not isinstance(candidates, list):
186
+ candidates = parsed.get("speaker_mapping_final")
187
+ if not isinstance(candidates, list):
188
+ return []
189
+
190
+ updates: list[dict[str, str]] = []
191
+ for item in candidates:
192
+ if not isinstance(item, dict):
193
+ continue
194
+ source = str(item.get("source_label") or item.get("speaker_label") or "").strip()
195
+ final = str(item.get("final_label") or item.get("inferred_name") or "").strip()
196
+ if not source:
197
+ continue
198
+ updates.append({"source_label": source, "final_label": final})
199
+ return updates
200
 
201
 
202
+ def _coerce_turns(
203
+ source_turns: list[dict[str, Any]],
204
+ parsed_turns: Any,
205
+ speaker_label_map: dict[str, str],
206
  ) -> list[dict[str, Any]]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
207
  out: list[dict[str, Any]] = []
208
+ parsed_list = parsed_turns if isinstance(parsed_turns, list) else []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
 
210
+ for idx, source in enumerate(source_turns):
211
+ source_speaker = str(source.get("speaker", "SPEAKER_XX"))
212
+ mapped_default = speaker_label_map.get(source_speaker, source_speaker)
213
 
214
+ parsed_item = parsed_list[idx] if idx < len(parsed_list) and isinstance(parsed_list[idx], dict) else {}
215
+ candidate_speaker = _normalize_final_label(str(parsed_item.get("speaker", "")), source_speaker)
216
+ final_speaker = candidate_speaker or mapped_default
217
+ if final_speaker == source_speaker:
218
+ final_speaker = mapped_default
219
+
220
+ text = str(parsed_item.get("text", "")).strip() or str(source.get("text", "")).strip()
221
+ start = parsed_item.get("start", source.get("start"))
222
+ end = parsed_item.get("end", source.get("end"))
223
+
224
+ out.append(
225
+ {
226
+ "speaker": final_speaker,
227
+ "start": start,
228
+ "end": end,
229
+ "text": text,
230
+ }
231
+ )
232
+ return out
 
233
 
234
 
235
  def run_openai_cleanup_pipeline(
 
237
  openai_api_key: str,
238
  executive_names_csv: str | None,
239
  *,
 
240
  cleanup_model: str = "gpt-5",
241
+ timeout_seconds: float = 600.0,
242
+ max_turns_per_chunk: int = 80,
243
+ max_chars_per_chunk: int = 22000,
244
  ) -> dict[str, Any]:
245
+ """
246
+ Single-pass per chunk: each OpenAI call does both speaker naming and transcript cleanup.
247
+ Avoids a separate full-document speaker inference pass for long audio reliability.
248
+ """
249
  try:
250
  from openai import OpenAI
251
  except ImportError as exc:
 
256
  raise ValueError("Merged transcript must contain a non-empty `turns` list.")
257
 
258
  executive_names = _parse_executive_names(executive_names_csv)
259
+ chunk_plan = _build_chunk_plan(
260
+ turns=turns,
261
+ max_turns_per_chunk=max_turns_per_chunk,
262
+ max_chars_per_chunk=max_chars_per_chunk,
263
+ )
264
 
265
  client = OpenAI(api_key=openai_api_key, timeout=timeout_seconds, max_retries=0)
266
 
267
+ # Global mapping across chunks.
268
+ speaker_label_map: dict[str, str] = {}
269
+ for turn in turns:
270
+ source = str(turn.get("speaker", "")).strip()
271
+ if source:
272
+ speaker_label_map.setdefault(source, source)
273
+
274
+ combined_usage = {
275
+ "input_tokens": 0,
276
+ "output_tokens": 0,
277
+ "total_tokens": 0,
278
+ "cached_input_tokens": 0,
279
+ "reasoning_tokens": 0,
280
+ }
281
+ per_chunk_usage: list[dict[str, Any]] = []
282
+ cleaned_turns: list[dict[str, Any]] = []
283
+ chunk_notes: list[str] = []
284
+ chunk_raw_responses: list[dict[str, Any]] = []
285
+
286
+ for i, chunk in enumerate(chunk_plan):
287
+ start = chunk["start"]
288
+ end = chunk["end"]
289
+ source_chunk_turns = turns[start:end]
290
+
291
+ payload = {
292
+ "task": "For this chunk only: infer speaker names and clean transcript text in one pass.",
293
  "rules": [
294
+ "Keep turn order and count exactly the same as input chunk.",
295
+ "Keep start/end timestamps aligned to input turns.",
296
+ "Correct misspellings and punctuation/casing.",
297
+ "Only remove filler words (uh, um, you know, like) and clear false-start words/phrases.",
298
+ "Do not aggressively summarize, compress, or paraphrase full sentences.",
299
+ "Preserve substantive wording and as much original content as possible.",
300
+ "If uncertain whether text is filler, keep it.",
301
+ "Infer speaker names from this chunk context only; do not guess beyond evidence.",
302
+ "If first name matches in `executive_names` but last name is uncertain, first name alone is allowed.",
303
+ "If speaker is call-control voice, label as Operator.",
304
+ "If speaker name is unknown, keep generic label SPEAKER_XX.",
305
+ "Never output combined labels like Name|SPEAKER_XX.",
306
+ "Use `existing_speaker_label_map` as source of truth for labels already resolved in prior chunks.",
307
  ],
308
  "output_schema": {
309
+ "speaker_label_map_updates": [
310
+ {"source_label": "SPEAKER_XX", "final_label": "Name or SPEAKER_XX", "reason": "short"}
311
+ ],
312
+ "turns": [
313
  {
314
+ "source_speaker": "SPEAKER_XX",
315
+ "speaker": "Name or SPEAKER_XX",
316
+ "start": "float",
317
+ "end": "float",
318
+ "text": "cleaned text",
319
  }
320
  ],
321
  "notes": ["string"],
322
  },
323
  "executive_names": executive_names,
324
+ "existing_speaker_label_map": speaker_label_map,
325
+ "chunk_index": i,
326
+ "chunk_start_turn_index": start,
327
+ "chunk_turns": source_chunk_turns,
328
  }
 
329
 
330
+ response = client.responses.create(
331
+ model=cleanup_model,
332
+ input=[
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
333
  {
334
+ "role": "system",
335
+ "content": "You are a transcript cleanup and speaker-label assistant. Return strict JSON only.",
336
+ },
337
+ {"role": "user", "content": _dumps_compact(payload)},
 
338
  ],
339
+ )
 
 
 
 
 
340
 
341
+ raw = _response_to_dict(response)
342
+ parsed = _extract_json_object(_response_text(response))
343
+ usage = _usage_from_response_dict(raw)
344
+ for k in combined_usage:
345
+ combined_usage[k] += int(usage.get(k) or 0)
346
+ per_chunk_usage.append({"chunk_index": i, "usage": usage, "turn_range": [start, end]})
347
+ chunk_raw_responses.append({"chunk_index": i, "raw_response": raw})
348
+
349
+ for upd in _extract_map_updates(parsed):
350
+ source_label = upd["source_label"]
351
+ final_label = _normalize_final_label(upd["final_label"], source_label)
352
+ speaker_label_map[source_label] = final_label
353
+
354
+ notes = parsed.get("notes", [])
355
+ if isinstance(notes, list):
356
+ chunk_notes.extend([str(n) for n in notes if str(n).strip()])
357
+
358
+ cleaned_chunk_turns = _coerce_turns(
359
+ source_turns=source_chunk_turns,
360
+ parsed_turns=parsed.get("turns"),
361
+ speaker_label_map=speaker_label_map,
362
+ )
363
+ cleaned_turns.extend(cleaned_chunk_turns)
364
+
365
+ final_mapping = [
366
+ {"source_label": source, "final_label": final}
367
+ for source, final in sorted(speaker_label_map.items(), key=lambda x: x[0])
368
+ ]
369
+
370
+ summary = {
371
+ "turn_count": len(cleaned_turns),
372
+ "speaker_count": len({str(t.get("speaker", "")) for t in cleaned_turns}),
373
+ "chunk_count": len(chunk_plan),
374
+ "notes": chunk_notes[:200],
375
+ }
376
+ cleaned_json = {
377
+ "speaker_mapping_final": final_mapping,
378
+ "turns": cleaned_turns,
379
+ "summary": summary,
380
+ "openai_token_usage": {
381
+ "combined": combined_usage,
382
+ "per_chunk": per_chunk_usage,
383
+ },
384
  }
 
385
 
386
  return {
387
  "cleaned_transcript": cleaned_json,
388
  "debug": {
 
389
  "cleanup_model": cleanup_model,
390
  "executive_names": executive_names,
391
+ "chunk_plan": chunk_plan,
392
+ "speaker_label_map_final": speaker_label_map,
393
+ "openai_token_usage": cleaned_json["openai_token_usage"],
394
+ "openai_raw_responses": chunk_raw_responses,
 
395
  },
396
  }