""" Speaker Detection Module for PodXplainClone. Parses input text into (speaker_id, text) segments using auto, paragraph, and dialogue strategies. """ import re from typing import Dict, List, Tuple from text_processing import normalize_text NAME_PREFIX_PATTERN = re.compile( r"^(?:[-*\s]*)(?:\*\*)?([A-Za-z][A-Za-z0-9 ._'-]{0,32}?)(?:\*\*)?\s*:\s*(.*)$" ) EMDASH_PATTERN = re.compile(r"^[--–—]\s*(.*)") QUOTE_PATTERN = re.compile(r"^[\"'“‘](.*)") STAGE_DIRECTION_PATTERN = re.compile(r"^\s*\[[^\]]+\]\s*$") def _compact_segments(segments: List[Tuple[int, str]]) -> List[Tuple[int, str]]: """Merge adjacent segments for the same speaker and drop empty text.""" compacted: List[Tuple[int, str]] = [] for speaker_id, text in segments: text = normalize_text(text) if not text or STAGE_DIRECTION_PATTERN.match(text): continue if compacted and compacted[-1][0] == speaker_id: compacted[-1] = (speaker_id, f"{compacted[-1][1]} {text}".strip()) else: compacted.append((speaker_id, text)) return compacted def detect_speakers_auto(text: str) -> List[Tuple[int, str]]: """Use dialogue parsing when markers are present, otherwise paragraph mode.""" text = normalize_text(text) lines = [line.strip() for line in text.split("\n") if line.strip()] paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] marker_count = 0 for line in lines: if NAME_PREFIX_PATTERN.match(line) or EMDASH_PATTERN.match(line) or QUOTE_PATTERN.match(line): marker_count += 1 line_ratio = marker_count / max(1, len(lines)) if marker_count >= 2 or line_ratio > 0.25: return detect_speakers_dialogue(text) if len(paragraphs) > 1: return detect_speakers_paragraph(text) return [(0, text)] if text else [] def detect_speakers_paragraph(text: str) -> List[Tuple[int, str]]: """Assign alternating speakers at paragraph breaks.""" text = normalize_text(text) paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] if not paragraphs: return [(0, text)] if text else [] return _compact_segments((i % 2, para) for i, para in enumerate(paragraphs)) def detect_speakers_dialogue(text: str) -> List[Tuple[int, str]]: """Parse common dialogue forms such as 'Speaker: text' and dash-prefixed turns.""" text = normalize_text(text) lines = text.split("\n") segments: List[Tuple[int, str]] = [] speaker_map: Dict[str, int] = {} next_speaker_id = 0 current_text: List[str] = [] current_speaker = 0 dash_speaker = 0 def speaker_id_for(name: str) -> int: nonlocal next_speaker_id key = re.sub(r"\s+", " ", name.strip().lower()) if key not in speaker_map: speaker_map[key] = next_speaker_id next_speaker_id += 1 return speaker_map[key] def flush(): nonlocal current_text joined = " ".join(current_text).strip() if joined: segments.append((current_speaker, joined)) current_text = [] for raw_line in lines: line = raw_line.strip() if not line: flush() continue match = NAME_PREFIX_PATTERN.match(line) if match: flush() current_speaker = speaker_id_for(match.group(1)) remaining = match.group(2).strip() if remaining: current_text.append(remaining) continue match = EMDASH_PATTERN.match(line) if match: flush() current_speaker = dash_speaker dash_speaker = 1 - dash_speaker current_text.append(match.group(1).strip()) continue if QUOTE_PATTERN.match(line): flush() current_speaker = 1 - current_speaker if next_speaker_id <= 2 else (current_speaker + 1) % next_speaker_id current_text.append(line) continue current_text.append(line) flush() compacted = _compact_segments(segments) return compacted if compacted else ([(0, text)] if text else []) def detect_speakers(text: str, mode: str = "auto") -> List[Tuple[int, str]]: """Main entry point for speaker detection.""" mode = mode.lower().strip() if mode == "paragraph": return detect_speakers_paragraph(text) if mode == "dialogue": return detect_speakers_dialogue(text) return detect_speakers_auto(text)