import re from typing import List, Tuple from collections import Counter def detect_speaker_patterns(text: str) -> dict: """Analyze text to detect speaker patterns and labeling conventions""" patterns = { "colon_based": re.findall(r'^([A-Z][a-z\s]+\d*):\s', text, re.MULTILINE), # "Speaker 1: text" "bracket_based": re.findall(r'^\[([^\]]+)\]\s', text, re.MULTILINE), # "[Interviewer] text" "dash_based": re.findall(r'^-\s*([A-Z][a-z\s]+):\s', text, re.MULTILINE), # "- Doctor: text" "q_a_based": bool(re.search(r'^(Q|A):\s', text, re.MULTILINE)), # "Q: / A:" } # Determine most likely pattern pattern_counts = {k: len(v) for k, v in patterns.items() if k != "q_a_based"} pattern_counts["q_a_based"] = 1 if patterns["q_a_based"] else 0 most_common = max(pattern_counts, key=pattern_counts.get) if any(pattern_counts.values()) else None # Extract unique speakers if most_common == "colon_based": speakers = list(set(patterns["colon_based"])) elif most_common == "bracket_based": speakers = list(set(patterns["bracket_based"])) elif most_common == "dash_based": speakers = list(set(patterns["dash_based"])) elif most_common == "q_a_based": speakers = ["Q", "A"] else: speakers = [] return { "pattern_type": most_common, "speakers_found": speakers, "speaker_count": len(speakers), "has_structure": most_common is not None } def classify_speaker_role(text: str, speaker_label: str, interviewee_type: str) -> str: """ Use advanced heuristics to classify speaker role """ text_lower = text.lower() # Question patterns (likely interviewer) question_patterns = [ r'\?$', r'^(what|how|why|when|where|who|can you|could you|would you|do you|have you)', r'(tell me|explain|describe|walk me through)', r'(your thoughts|your experience|your perspective)' ] question_score = sum(1 for p in question_patterns if re.search(p, text_lower)) # Medical/clinical patterns clinical_patterns = [ r'\b(prescribe|prescription|rx|medication|drug|dose|dosage|mg|ml)\b', r'\b(diagnos[ei]s|diagnosed|condition|disease|disorder)\b', r'\b(treatment|therapy|intervention|protocol)\b', r'\b(patient|case|clinical|medical|symptom)\b', r'\b(efficacy|effectiveness|outcome|response|adverse)\b', r'\b(guideline|recommendation|standard of care|first-line)\b' ] clinical_score = sum(1 for p in clinical_patterns if re.search(p, text_lower)) # Patient experience patterns patient_patterns = [ r'\b(I feel|I felt|I\'m experiencing|I have)\b', r'\b(my symptoms|my condition|my pain|my treatment)\b', r'\b(it hurts|it bothers|it helps|it doesn\'t work)\b', r'\b(I tried|I take|I stopped|I started)\b', r'\b(doctor told me|doctor said|doctor prescribed)\b' ] patient_score = sum(1 for p in patient_patterns if re.search(p, text_lower)) # Neutral/closing patterns neutral_patterns = [ r'\b(thank you|thanks|appreciate|goodbye|bye|closing)\b', r'\b(that concludes|that\'s all|we\'re done)\b' ] neutral_score = sum(1 for p in neutral_patterns if re.search(p, text_lower)) # Decision logic based on interviewee type if neutral_score > 0 and len(text.split()) < 15: return "Neutral" if interviewee_type == "HCP": # In HCP interviews, high clinical language = interviewee (doctor) if clinical_score >= 3: return "Doctor" elif question_score >= 2: return "Interviewer" elif clinical_score >= 1: return "Doctor" else: return "Unknown" elif interviewee_type == "Patient": # In patient interviews, patient experience language = interviewee if patient_score >= 2: return "Patient" elif question_score >= 2: return "Interviewer" elif clinical_score >= 2: return "Interviewer" # Likely interviewer explaining medical info elif patient_score >= 1: return "Patient" else: return "Unknown" else: # General classification if question_score >= 2: return "Interviewer" elif clinical_score >= 2: return "Respondent" else: return "Unknown" def parse_existing_tags(text: str, pattern_info: dict) -> List[Tuple[str, str]]: """Parse text with existing speaker tags""" pattern_type = pattern_info["pattern_type"] segments = [] if pattern_type == "colon_based": # "Speaker 1: text" parts = re.split(r'^([A-Z][a-z\s]+\d*):\s', text, flags=re.MULTILINE) for i in range(1, len(parts), 2): if i + 1 < len(parts): speaker = parts[i].strip() content = parts[i + 1].strip() if content: segments.append((speaker, content)) elif pattern_type == "bracket_based": # "[Speaker] text" parts = re.split(r'^\[([^\]]+)\]\s', text, flags=re.MULTILINE) for i in range(1, len(parts), 2): if i + 1 < len(parts): speaker = parts[i].strip() content = parts[i + 1].strip() if content: segments.append((speaker, content)) elif pattern_type == "q_a_based": # "Q: / A:" parts = re.split(r'^([QA]):\s', text, flags=re.MULTILINE) for i in range(1, len(parts), 2): if i + 1 < len(parts): speaker = "Interviewer" if parts[i] == "Q" else "Respondent" content = parts[i + 1].strip() if content: segments.append((speaker, content)) else: # No clear pattern - treat as single block segments.append(("Unknown", text)) return segments def tag_speakers_advanced(text: str, role_hint: str = "", interviewee_type: str = "Other") -> str: """ Advanced speaker tagging with pattern detection and role classification """ # Step 1: Detect existing structure pattern_info = detect_speaker_patterns(text) # Step 2: Parse role hints if provided role_mapping = {} if role_hint: # Parse hints like "Speaker 1 = Interviewer, Speaker 2 = Doctor" hint_parts = re.findall(r'([^,=]+)\s*=\s*([^,=]+)', role_hint) for original, mapped in hint_parts: role_mapping[original.strip().lower()] = mapped.strip() # Step 3: Parse segments if pattern_info["has_structure"]: segments = parse_existing_tags(text, pattern_info) else: # No clear structure - split by paragraphs/lines lines = [line.strip() for line in text.split('\n') if line.strip()] segments = [("Unknown", line) for line in lines] # Step 4: Classify and tag each segment tagged_segments = [] for speaker_label, content in segments: # Apply role mapping if available speaker_key = speaker_label.lower() if speaker_key in role_mapping: final_role = role_mapping[speaker_key] else: # Auto-classify based on content final_role = classify_speaker_role(content, speaker_label, interviewee_type) # Format the tagged line tagged_segments.append(f"[{final_role}] {content}") return "\n\n".join(tagged_segments) def analyze_speaker_distribution(tagged_text: str) -> dict: """ Analyze the distribution of speakers in tagged text Useful for quality control """ speakers = re.findall(r'^\[([^\]]+)\]', tagged_text, re.MULTILINE) distribution = Counter(speakers) total = len(speakers) return { "total_segments": total, "unique_speakers": len(distribution), "distribution": dict(distribution), "percentages": {k: (v / total * 100) for k, v in distribution.items()} if total > 0 else {} }