PodXplainClone / speaker_detection.py
Bilal140202
Upgrade podcast generator UX and export reliability
4193bcd
"""
Speaker Detection Module for PodXplainClone.
Parses input text into (speaker_id, text) segments using auto, paragraph,
and dialogue strategies.
"""
import re
from typing import Dict, List, Tuple
from text_processing import normalize_text
NAME_PREFIX_PATTERN = re.compile(
r"^(?:[-*\s]*)(?:\*\*)?([A-Za-z][A-Za-z0-9 ._'-]{0,32}?)(?:\*\*)?\s*:\s*(.*)$"
)
EMDASH_PATTERN = re.compile(r"^[--–—]\s*(.*)")
QUOTE_PATTERN = re.compile(r"^[\"'“‘](.*)")
STAGE_DIRECTION_PATTERN = re.compile(r"^\s*\[[^\]]+\]\s*$")
def _compact_segments(segments: List[Tuple[int, str]]) -> List[Tuple[int, str]]:
"""Merge adjacent segments for the same speaker and drop empty text."""
compacted: List[Tuple[int, str]] = []
for speaker_id, text in segments:
text = normalize_text(text)
if not text or STAGE_DIRECTION_PATTERN.match(text):
continue
if compacted and compacted[-1][0] == speaker_id:
compacted[-1] = (speaker_id, f"{compacted[-1][1]} {text}".strip())
else:
compacted.append((speaker_id, text))
return compacted
def detect_speakers_auto(text: str) -> List[Tuple[int, str]]:
"""Use dialogue parsing when markers are present, otherwise paragraph mode."""
text = normalize_text(text)
lines = [line.strip() for line in text.split("\n") if line.strip()]
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
marker_count = 0
for line in lines:
if NAME_PREFIX_PATTERN.match(line) or EMDASH_PATTERN.match(line) or QUOTE_PATTERN.match(line):
marker_count += 1
line_ratio = marker_count / max(1, len(lines))
if marker_count >= 2 or line_ratio > 0.25:
return detect_speakers_dialogue(text)
if len(paragraphs) > 1:
return detect_speakers_paragraph(text)
return [(0, text)] if text else []
def detect_speakers_paragraph(text: str) -> List[Tuple[int, str]]:
"""Assign alternating speakers at paragraph breaks."""
text = normalize_text(text)
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
if not paragraphs:
return [(0, text)] if text else []
return _compact_segments((i % 2, para) for i, para in enumerate(paragraphs))
def detect_speakers_dialogue(text: str) -> List[Tuple[int, str]]:
"""Parse common dialogue forms such as 'Speaker: text' and dash-prefixed turns."""
text = normalize_text(text)
lines = text.split("\n")
segments: List[Tuple[int, str]] = []
speaker_map: Dict[str, int] = {}
next_speaker_id = 0
current_text: List[str] = []
current_speaker = 0
dash_speaker = 0
def speaker_id_for(name: str) -> int:
nonlocal next_speaker_id
key = re.sub(r"\s+", " ", name.strip().lower())
if key not in speaker_map:
speaker_map[key] = next_speaker_id
next_speaker_id += 1
return speaker_map[key]
def flush():
nonlocal current_text
joined = " ".join(current_text).strip()
if joined:
segments.append((current_speaker, joined))
current_text = []
for raw_line in lines:
line = raw_line.strip()
if not line:
flush()
continue
match = NAME_PREFIX_PATTERN.match(line)
if match:
flush()
current_speaker = speaker_id_for(match.group(1))
remaining = match.group(2).strip()
if remaining:
current_text.append(remaining)
continue
match = EMDASH_PATTERN.match(line)
if match:
flush()
current_speaker = dash_speaker
dash_speaker = 1 - dash_speaker
current_text.append(match.group(1).strip())
continue
if QUOTE_PATTERN.match(line):
flush()
current_speaker = 1 - current_speaker if next_speaker_id <= 2 else (current_speaker + 1) % next_speaker_id
current_text.append(line)
continue
current_text.append(line)
flush()
compacted = _compact_segments(segments)
return compacted if compacted else ([(0, text)] if text else [])
def detect_speakers(text: str, mode: str = "auto") -> List[Tuple[int, str]]:
"""Main entry point for speaker detection."""
mode = mode.lower().strip()
if mode == "paragraph":
return detect_speakers_paragraph(text)
if mode == "dialogue":
return detect_speakers_dialogue(text)
return detect_speakers_auto(text)