Spaces:
Sleeping
Sleeping
File size: 8,429 Bytes
54c99ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import re
from typing import List, Tuple
from collections import Counter
def detect_speaker_patterns(text: str) -> dict:
"""Analyze text to detect speaker patterns and labeling conventions"""
patterns = {
"colon_based": re.findall(r'^([A-Z][a-z\s]+\d*):\s', text, re.MULTILINE), # "Speaker 1: text"
"bracket_based": re.findall(r'^\[([^\]]+)\]\s', text, re.MULTILINE), # "[Interviewer] text"
"dash_based": re.findall(r'^-\s*([A-Z][a-z\s]+):\s', text, re.MULTILINE), # "- Doctor: text"
"q_a_based": bool(re.search(r'^(Q|A):\s', text, re.MULTILINE)), # "Q: / A:"
}
# Determine most likely pattern
pattern_counts = {k: len(v) for k, v in patterns.items() if k != "q_a_based"}
pattern_counts["q_a_based"] = 1 if patterns["q_a_based"] else 0
most_common = max(pattern_counts, key=pattern_counts.get) if any(pattern_counts.values()) else None
# Extract unique speakers
if most_common == "colon_based":
speakers = list(set(patterns["colon_based"]))
elif most_common == "bracket_based":
speakers = list(set(patterns["bracket_based"]))
elif most_common == "dash_based":
speakers = list(set(patterns["dash_based"]))
elif most_common == "q_a_based":
speakers = ["Q", "A"]
else:
speakers = []
return {
"pattern_type": most_common,
"speakers_found": speakers,
"speaker_count": len(speakers),
"has_structure": most_common is not None
}
def classify_speaker_role(text: str, speaker_label: str, interviewee_type: str) -> str:
"""
Use advanced heuristics to classify speaker role
"""
text_lower = text.lower()
# Question patterns (likely interviewer)
question_patterns = [
r'\?$',
r'^(what|how|why|when|where|who|can you|could you|would you|do you|have you)',
r'(tell me|explain|describe|walk me through)',
r'(your thoughts|your experience|your perspective)'
]
question_score = sum(1 for p in question_patterns if re.search(p, text_lower))
# Medical/clinical patterns
clinical_patterns = [
r'\b(prescribe|prescription|rx|medication|drug|dose|dosage|mg|ml)\b',
r'\b(diagnos[ei]s|diagnosed|condition|disease|disorder)\b',
r'\b(treatment|therapy|intervention|protocol)\b',
r'\b(patient|case|clinical|medical|symptom)\b',
r'\b(efficacy|effectiveness|outcome|response|adverse)\b',
r'\b(guideline|recommendation|standard of care|first-line)\b'
]
clinical_score = sum(1 for p in clinical_patterns if re.search(p, text_lower))
# Patient experience patterns
patient_patterns = [
r'\b(I feel|I felt|I\'m experiencing|I have)\b',
r'\b(my symptoms|my condition|my pain|my treatment)\b',
r'\b(it hurts|it bothers|it helps|it doesn\'t work)\b',
r'\b(I tried|I take|I stopped|I started)\b',
r'\b(doctor told me|doctor said|doctor prescribed)\b'
]
patient_score = sum(1 for p in patient_patterns if re.search(p, text_lower))
# Neutral/closing patterns
neutral_patterns = [
r'\b(thank you|thanks|appreciate|goodbye|bye|closing)\b',
r'\b(that concludes|that\'s all|we\'re done)\b'
]
neutral_score = sum(1 for p in neutral_patterns if re.search(p, text_lower))
# Decision logic based on interviewee type
if neutral_score > 0 and len(text.split()) < 15:
return "Neutral"
if interviewee_type == "HCP":
# In HCP interviews, high clinical language = interviewee (doctor)
if clinical_score >= 3:
return "Doctor"
elif question_score >= 2:
return "Interviewer"
elif clinical_score >= 1:
return "Doctor"
else:
return "Unknown"
elif interviewee_type == "Patient":
# In patient interviews, patient experience language = interviewee
if patient_score >= 2:
return "Patient"
elif question_score >= 2:
return "Interviewer"
elif clinical_score >= 2:
return "Interviewer" # Likely interviewer explaining medical info
elif patient_score >= 1:
return "Patient"
else:
return "Unknown"
else:
# General classification
if question_score >= 2:
return "Interviewer"
elif clinical_score >= 2:
return "Respondent"
else:
return "Unknown"
def parse_existing_tags(text: str, pattern_info: dict) -> List[Tuple[str, str]]:
"""Parse text with existing speaker tags"""
pattern_type = pattern_info["pattern_type"]
segments = []
if pattern_type == "colon_based":
# "Speaker 1: text"
parts = re.split(r'^([A-Z][a-z\s]+\d*):\s', text, flags=re.MULTILINE)
for i in range(1, len(parts), 2):
if i + 1 < len(parts):
speaker = parts[i].strip()
content = parts[i + 1].strip()
if content:
segments.append((speaker, content))
elif pattern_type == "bracket_based":
# "[Speaker] text"
parts = re.split(r'^\[([^\]]+)\]\s', text, flags=re.MULTILINE)
for i in range(1, len(parts), 2):
if i + 1 < len(parts):
speaker = parts[i].strip()
content = parts[i + 1].strip()
if content:
segments.append((speaker, content))
elif pattern_type == "q_a_based":
# "Q: / A:"
parts = re.split(r'^([QA]):\s', text, flags=re.MULTILINE)
for i in range(1, len(parts), 2):
if i + 1 < len(parts):
speaker = "Interviewer" if parts[i] == "Q" else "Respondent"
content = parts[i + 1].strip()
if content:
segments.append((speaker, content))
else:
# No clear pattern - treat as single block
segments.append(("Unknown", text))
return segments
def tag_speakers_advanced(text: str, role_hint: str = "", interviewee_type: str = "Other") -> str:
"""
Advanced speaker tagging with pattern detection and role classification
"""
# Step 1: Detect existing structure
pattern_info = detect_speaker_patterns(text)
# Step 2: Parse role hints if provided
role_mapping = {}
if role_hint:
# Parse hints like "Speaker 1 = Interviewer, Speaker 2 = Doctor"
hint_parts = re.findall(r'([^,=]+)\s*=\s*([^,=]+)', role_hint)
for original, mapped in hint_parts:
role_mapping[original.strip().lower()] = mapped.strip()
# Step 3: Parse segments
if pattern_info["has_structure"]:
segments = parse_existing_tags(text, pattern_info)
else:
# No clear structure - split by paragraphs/lines
lines = [line.strip() for line in text.split('\n') if line.strip()]
segments = [("Unknown", line) for line in lines]
# Step 4: Classify and tag each segment
tagged_segments = []
for speaker_label, content in segments:
# Apply role mapping if available
speaker_key = speaker_label.lower()
if speaker_key in role_mapping:
final_role = role_mapping[speaker_key]
else:
# Auto-classify based on content
final_role = classify_speaker_role(content, speaker_label, interviewee_type)
# Format the tagged line
tagged_segments.append(f"[{final_role}] {content}")
return "\n\n".join(tagged_segments)
def analyze_speaker_distribution(tagged_text: str) -> dict:
"""
Analyze the distribution of speakers in tagged text
Useful for quality control
"""
speakers = re.findall(r'^\[([^\]]+)\]', tagged_text, re.MULTILINE)
distribution = Counter(speakers)
total = len(speakers)
return {
"total_segments": total,
"unique_speakers": len(distribution),
"distribution": dict(distribution),
"percentages": {k: (v / total * 100) for k, v in distribution.items()} if total > 0 else {}
} |