Spaces:
Sleeping
Sleeping
| import re | |
| from typing import List, Tuple | |
| from collections import Counter | |
| def detect_speaker_patterns(text: str) -> dict: | |
| """Analyze text to detect speaker patterns and labeling conventions""" | |
| patterns = { | |
| "colon_based": re.findall(r'^([A-Z][a-z\s]+\d*):\s', text, re.MULTILINE), # "Speaker 1: text" | |
| "bracket_based": re.findall(r'^\[([^\]]+)\]\s', text, re.MULTILINE), # "[Interviewer] text" | |
| "dash_based": re.findall(r'^-\s*([A-Z][a-z\s]+):\s', text, re.MULTILINE), # "- Doctor: text" | |
| "q_a_based": bool(re.search(r'^(Q|A):\s', text, re.MULTILINE)), # "Q: / A:" | |
| } | |
| # Determine most likely pattern | |
| pattern_counts = {k: len(v) for k, v in patterns.items() if k != "q_a_based"} | |
| pattern_counts["q_a_based"] = 1 if patterns["q_a_based"] else 0 | |
| most_common = max(pattern_counts, key=pattern_counts.get) if any(pattern_counts.values()) else None | |
| # Extract unique speakers | |
| if most_common == "colon_based": | |
| speakers = list(set(patterns["colon_based"])) | |
| elif most_common == "bracket_based": | |
| speakers = list(set(patterns["bracket_based"])) | |
| elif most_common == "dash_based": | |
| speakers = list(set(patterns["dash_based"])) | |
| elif most_common == "q_a_based": | |
| speakers = ["Q", "A"] | |
| else: | |
| speakers = [] | |
| return { | |
| "pattern_type": most_common, | |
| "speakers_found": speakers, | |
| "speaker_count": len(speakers), | |
| "has_structure": most_common is not None | |
| } | |
| def classify_speaker_role(text: str, speaker_label: str, interviewee_type: str) -> str: | |
| """ | |
| Use advanced heuristics to classify speaker role | |
| """ | |
| text_lower = text.lower() | |
| # Question patterns (likely interviewer) | |
| question_patterns = [ | |
| r'\?$', | |
| r'^(what|how|why|when|where|who|can you|could you|would you|do you|have you)', | |
| r'(tell me|explain|describe|walk me through)', | |
| r'(your thoughts|your experience|your perspective)' | |
| ] | |
| question_score = sum(1 for p in question_patterns if re.search(p, text_lower)) | |
| # Medical/clinical patterns | |
| clinical_patterns = [ | |
| r'\b(prescribe|prescription|rx|medication|drug|dose|dosage|mg|ml)\b', | |
| r'\b(diagnos[ei]s|diagnosed|condition|disease|disorder)\b', | |
| r'\b(treatment|therapy|intervention|protocol)\b', | |
| r'\b(patient|case|clinical|medical|symptom)\b', | |
| r'\b(efficacy|effectiveness|outcome|response|adverse)\b', | |
| r'\b(guideline|recommendation|standard of care|first-line)\b' | |
| ] | |
| clinical_score = sum(1 for p in clinical_patterns if re.search(p, text_lower)) | |
| # Patient experience patterns | |
| patient_patterns = [ | |
| r'\b(I feel|I felt|I\'m experiencing|I have)\b', | |
| r'\b(my symptoms|my condition|my pain|my treatment)\b', | |
| r'\b(it hurts|it bothers|it helps|it doesn\'t work)\b', | |
| r'\b(I tried|I take|I stopped|I started)\b', | |
| r'\b(doctor told me|doctor said|doctor prescribed)\b' | |
| ] | |
| patient_score = sum(1 for p in patient_patterns if re.search(p, text_lower)) | |
| # Neutral/closing patterns | |
| neutral_patterns = [ | |
| r'\b(thank you|thanks|appreciate|goodbye|bye|closing)\b', | |
| r'\b(that concludes|that\'s all|we\'re done)\b' | |
| ] | |
| neutral_score = sum(1 for p in neutral_patterns if re.search(p, text_lower)) | |
| # Decision logic based on interviewee type | |
| if neutral_score > 0 and len(text.split()) < 15: | |
| return "Neutral" | |
| if interviewee_type == "HCP": | |
| # In HCP interviews, high clinical language = interviewee (doctor) | |
| if clinical_score >= 3: | |
| return "Doctor" | |
| elif question_score >= 2: | |
| return "Interviewer" | |
| elif clinical_score >= 1: | |
| return "Doctor" | |
| else: | |
| return "Unknown" | |
| elif interviewee_type == "Patient": | |
| # In patient interviews, patient experience language = interviewee | |
| if patient_score >= 2: | |
| return "Patient" | |
| elif question_score >= 2: | |
| return "Interviewer" | |
| elif clinical_score >= 2: | |
| return "Interviewer" # Likely interviewer explaining medical info | |
| elif patient_score >= 1: | |
| return "Patient" | |
| else: | |
| return "Unknown" | |
| else: | |
| # General classification | |
| if question_score >= 2: | |
| return "Interviewer" | |
| elif clinical_score >= 2: | |
| return "Respondent" | |
| else: | |
| return "Unknown" | |
| def parse_existing_tags(text: str, pattern_info: dict) -> List[Tuple[str, str]]: | |
| """Parse text with existing speaker tags""" | |
| pattern_type = pattern_info["pattern_type"] | |
| segments = [] | |
| if pattern_type == "colon_based": | |
| # "Speaker 1: text" | |
| parts = re.split(r'^([A-Z][a-z\s]+\d*):\s', text, flags=re.MULTILINE) | |
| for i in range(1, len(parts), 2): | |
| if i + 1 < len(parts): | |
| speaker = parts[i].strip() | |
| content = parts[i + 1].strip() | |
| if content: | |
| segments.append((speaker, content)) | |
| elif pattern_type == "bracket_based": | |
| # "[Speaker] text" | |
| parts = re.split(r'^\[([^\]]+)\]\s', text, flags=re.MULTILINE) | |
| for i in range(1, len(parts), 2): | |
| if i + 1 < len(parts): | |
| speaker = parts[i].strip() | |
| content = parts[i + 1].strip() | |
| if content: | |
| segments.append((speaker, content)) | |
| elif pattern_type == "q_a_based": | |
| # "Q: / A:" | |
| parts = re.split(r'^([QA]):\s', text, flags=re.MULTILINE) | |
| for i in range(1, len(parts), 2): | |
| if i + 1 < len(parts): | |
| speaker = "Interviewer" if parts[i] == "Q" else "Respondent" | |
| content = parts[i + 1].strip() | |
| if content: | |
| segments.append((speaker, content)) | |
| else: | |
| # No clear pattern - treat as single block | |
| segments.append(("Unknown", text)) | |
| return segments | |
| def tag_speakers_advanced(text: str, role_hint: str = "", interviewee_type: str = "Other") -> str: | |
| """ | |
| Advanced speaker tagging with pattern detection and role classification | |
| """ | |
| # Step 1: Detect existing structure | |
| pattern_info = detect_speaker_patterns(text) | |
| # Step 2: Parse role hints if provided | |
| role_mapping = {} | |
| if role_hint: | |
| # Parse hints like "Speaker 1 = Interviewer, Speaker 2 = Doctor" | |
| hint_parts = re.findall(r'([^,=]+)\s*=\s*([^,=]+)', role_hint) | |
| for original, mapped in hint_parts: | |
| role_mapping[original.strip().lower()] = mapped.strip() | |
| # Step 3: Parse segments | |
| if pattern_info["has_structure"]: | |
| segments = parse_existing_tags(text, pattern_info) | |
| else: | |
| # No clear structure - split by paragraphs/lines | |
| lines = [line.strip() for line in text.split('\n') if line.strip()] | |
| segments = [("Unknown", line) for line in lines] | |
| # Step 4: Classify and tag each segment | |
| tagged_segments = [] | |
| for speaker_label, content in segments: | |
| # Apply role mapping if available | |
| speaker_key = speaker_label.lower() | |
| if speaker_key in role_mapping: | |
| final_role = role_mapping[speaker_key] | |
| else: | |
| # Auto-classify based on content | |
| final_role = classify_speaker_role(content, speaker_label, interviewee_type) | |
| # Format the tagged line | |
| tagged_segments.append(f"[{final_role}] {content}") | |
| return "\n\n".join(tagged_segments) | |
| def analyze_speaker_distribution(tagged_text: str) -> dict: | |
| """ | |
| Analyze the distribution of speakers in tagged text | |
| Useful for quality control | |
| """ | |
| speakers = re.findall(r'^\[([^\]]+)\]', tagged_text, re.MULTILINE) | |
| distribution = Counter(speakers) | |
| total = len(speakers) | |
| return { | |
| "total_segments": total, | |
| "unique_speakers": len(distribution), | |
| "distribution": dict(distribution), | |
| "percentages": {k: (v / total * 100) for k, v in distribution.items()} if total > 0 else {} | |
| } |