| import re
|
| from typing import List, Tuple
|
| from collections import Counter
|
|
|
| def detect_speaker_patterns(text: str) -> dict:
|
| """Analyze text to detect speaker patterns and labeling conventions"""
|
|
|
| patterns = {
|
| "colon_based": re.findall(r'^([A-Z][a-z\s]+\d*):\s', text, re.MULTILINE),
|
| "bracket_based": re.findall(r'^\[([^\]]+)\]\s', text, re.MULTILINE),
|
| "dash_based": re.findall(r'^-\s*([A-Z][a-z\s]+):\s', text, re.MULTILINE),
|
| "q_a_based": bool(re.search(r'^(Q|A):\s', text, re.MULTILINE)),
|
| }
|
|
|
|
|
| pattern_counts = {k: len(v) for k, v in patterns.items() if k != "q_a_based"}
|
| pattern_counts["q_a_based"] = 1 if patterns["q_a_based"] else 0
|
|
|
| most_common = max(pattern_counts, key=pattern_counts.get) if any(pattern_counts.values()) else None
|
|
|
|
|
| if most_common == "colon_based":
|
| speakers = list(set(patterns["colon_based"]))
|
| elif most_common == "bracket_based":
|
| speakers = list(set(patterns["bracket_based"]))
|
| elif most_common == "dash_based":
|
| speakers = list(set(patterns["dash_based"]))
|
| elif most_common == "q_a_based":
|
| speakers = ["Q", "A"]
|
| else:
|
| speakers = []
|
|
|
| return {
|
| "pattern_type": most_common,
|
| "speakers_found": speakers,
|
| "speaker_count": len(speakers),
|
| "has_structure": most_common is not None
|
| }
|
|
|
|
|
| def classify_speaker_role(text: str, speaker_label: str, interviewee_type: str) -> str:
|
| """
|
| Use advanced heuristics to classify speaker role
|
| """
|
|
|
| text_lower = text.lower()
|
|
|
|
|
| question_patterns = [
|
| r'\?$',
|
| r'^(what|how|why|when|where|who|can you|could you|would you|do you|have you)',
|
| r'(tell me|explain|describe|walk me through)',
|
| r'(your thoughts|your experience|your perspective)'
|
| ]
|
|
|
| question_score = sum(1 for p in question_patterns if re.search(p, text_lower))
|
|
|
|
|
| clinical_patterns = [
|
| r'\b(prescribe|prescription|rx|medication|drug|dose|dosage|mg|ml)\b',
|
| r'\b(diagnos[ei]s|diagnosed|condition|disease|disorder)\b',
|
| r'\b(treatment|therapy|intervention|protocol)\b',
|
| r'\b(patient|case|clinical|medical|symptom)\b',
|
| r'\b(efficacy|effectiveness|outcome|response|adverse)\b',
|
| r'\b(guideline|recommendation|standard of care|first-line)\b'
|
| ]
|
|
|
| clinical_score = sum(1 for p in clinical_patterns if re.search(p, text_lower))
|
|
|
|
|
| patient_patterns = [
|
| r'\b(I feel|I felt|I\'m experiencing|I have)\b',
|
| r'\b(my symptoms|my condition|my pain|my treatment)\b',
|
| r'\b(it hurts|it bothers|it helps|it doesn\'t work)\b',
|
| r'\b(I tried|I take|I stopped|I started)\b',
|
| r'\b(doctor told me|doctor said|doctor prescribed)\b'
|
| ]
|
|
|
| patient_score = sum(1 for p in patient_patterns if re.search(p, text_lower))
|
|
|
|
|
| neutral_patterns = [
|
| r'\b(thank you|thanks|appreciate|goodbye|bye|closing)\b',
|
| r'\b(that concludes|that\'s all|we\'re done)\b'
|
| ]
|
|
|
| neutral_score = sum(1 for p in neutral_patterns if re.search(p, text_lower))
|
|
|
|
|
| if neutral_score > 0 and len(text.split()) < 15:
|
| return "Neutral"
|
|
|
| if interviewee_type == "HCP":
|
|
|
| if clinical_score >= 3:
|
| return "Doctor"
|
| elif question_score >= 2:
|
| return "Interviewer"
|
| elif clinical_score >= 1:
|
| return "Doctor"
|
| else:
|
| return "Unknown"
|
|
|
| elif interviewee_type == "Patient":
|
|
|
| if patient_score >= 2:
|
| return "Patient"
|
| elif question_score >= 2:
|
| return "Interviewer"
|
| elif clinical_score >= 2:
|
| return "Interviewer"
|
| elif patient_score >= 1:
|
| return "Patient"
|
| else:
|
| return "Unknown"
|
|
|
| else:
|
|
|
| if question_score >= 2:
|
| return "Interviewer"
|
| elif clinical_score >= 2:
|
| return "Respondent"
|
| else:
|
| return "Unknown"
|
|
|
|
|
| def parse_existing_tags(text: str, pattern_info: dict) -> List[Tuple[str, str]]:
|
| """Parse text with existing speaker tags"""
|
|
|
| pattern_type = pattern_info["pattern_type"]
|
| segments = []
|
|
|
| if pattern_type == "colon_based":
|
|
|
| parts = re.split(r'^([A-Z][a-z\s]+\d*):\s', text, flags=re.MULTILINE)
|
| for i in range(1, len(parts), 2):
|
| if i + 1 < len(parts):
|
| speaker = parts[i].strip()
|
| content = parts[i + 1].strip()
|
| if content:
|
| segments.append((speaker, content))
|
|
|
| elif pattern_type == "bracket_based":
|
|
|
| parts = re.split(r'^\[([^\]]+)\]\s', text, flags=re.MULTILINE)
|
| for i in range(1, len(parts), 2):
|
| if i + 1 < len(parts):
|
| speaker = parts[i].strip()
|
| content = parts[i + 1].strip()
|
| if content:
|
| segments.append((speaker, content))
|
|
|
| elif pattern_type == "q_a_based":
|
|
|
| parts = re.split(r'^([QA]):\s', text, flags=re.MULTILINE)
|
| for i in range(1, len(parts), 2):
|
| if i + 1 < len(parts):
|
| speaker = "Interviewer" if parts[i] == "Q" else "Respondent"
|
| content = parts[i + 1].strip()
|
| if content:
|
| segments.append((speaker, content))
|
|
|
| else:
|
|
|
| segments.append(("Unknown", text))
|
|
|
| return segments
|
|
|
|
|
| def tag_speakers_advanced(text: str, role_hint: str = "", interviewee_type: str = "Other") -> str:
|
| """
|
| Advanced speaker tagging with pattern detection and role classification
|
| """
|
|
|
|
|
| pattern_info = detect_speaker_patterns(text)
|
|
|
|
|
| role_mapping = {}
|
| if role_hint:
|
|
|
| hint_parts = re.findall(r'([^,=]+)\s*=\s*([^,=]+)', role_hint)
|
| for original, mapped in hint_parts:
|
| role_mapping[original.strip().lower()] = mapped.strip()
|
|
|
|
|
| if pattern_info["has_structure"]:
|
| segments = parse_existing_tags(text, pattern_info)
|
| else:
|
|
|
| lines = [line.strip() for line in text.split('\n') if line.strip()]
|
| segments = [("Unknown", line) for line in lines]
|
|
|
|
|
| tagged_segments = []
|
|
|
| for speaker_label, content in segments:
|
|
|
| speaker_key = speaker_label.lower()
|
| if speaker_key in role_mapping:
|
| final_role = role_mapping[speaker_key]
|
| else:
|
|
|
| final_role = classify_speaker_role(content, speaker_label, interviewee_type)
|
|
|
|
|
| tagged_segments.append(f"[{final_role}] {content}")
|
|
|
| return "\n\n".join(tagged_segments)
|
|
|
|
|
| def analyze_speaker_distribution(tagged_text: str) -> dict:
|
| """
|
| Analyze the distribution of speakers in tagged text
|
| Useful for quality control
|
| """
|
|
|
| speakers = re.findall(r'^\[([^\]]+)\]', tagged_text, re.MULTILINE)
|
| distribution = Counter(speakers)
|
|
|
| total = len(speakers)
|
|
|
| return {
|
| "total_segments": total,
|
| "unique_speakers": len(distribution),
|
| "distribution": dict(distribution),
|
| "percentages": {k: (v / total * 100) for k, v in distribution.items()} if total > 0 else {}
|
| } |