TranscriptWriting / tagging.py
jmisak's picture
Upload 23 files
54c99ad verified
raw
history blame
8.43 kB
import re
from typing import List, Tuple
from collections import Counter
def detect_speaker_patterns(text: str) -> dict:
"""Analyze text to detect speaker patterns and labeling conventions"""
patterns = {
"colon_based": re.findall(r'^([A-Z][a-z\s]+\d*):\s', text, re.MULTILINE), # "Speaker 1: text"
"bracket_based": re.findall(r'^\[([^\]]+)\]\s', text, re.MULTILINE), # "[Interviewer] text"
"dash_based": re.findall(r'^-\s*([A-Z][a-z\s]+):\s', text, re.MULTILINE), # "- Doctor: text"
"q_a_based": bool(re.search(r'^(Q|A):\s', text, re.MULTILINE)), # "Q: / A:"
}
# Determine most likely pattern
pattern_counts = {k: len(v) for k, v in patterns.items() if k != "q_a_based"}
pattern_counts["q_a_based"] = 1 if patterns["q_a_based"] else 0
most_common = max(pattern_counts, key=pattern_counts.get) if any(pattern_counts.values()) else None
# Extract unique speakers
if most_common == "colon_based":
speakers = list(set(patterns["colon_based"]))
elif most_common == "bracket_based":
speakers = list(set(patterns["bracket_based"]))
elif most_common == "dash_based":
speakers = list(set(patterns["dash_based"]))
elif most_common == "q_a_based":
speakers = ["Q", "A"]
else:
speakers = []
return {
"pattern_type": most_common,
"speakers_found": speakers,
"speaker_count": len(speakers),
"has_structure": most_common is not None
}
def classify_speaker_role(text: str, speaker_label: str, interviewee_type: str) -> str:
"""
Use advanced heuristics to classify speaker role
"""
text_lower = text.lower()
# Question patterns (likely interviewer)
question_patterns = [
r'\?$',
r'^(what|how|why|when|where|who|can you|could you|would you|do you|have you)',
r'(tell me|explain|describe|walk me through)',
r'(your thoughts|your experience|your perspective)'
]
question_score = sum(1 for p in question_patterns if re.search(p, text_lower))
# Medical/clinical patterns
clinical_patterns = [
r'\b(prescribe|prescription|rx|medication|drug|dose|dosage|mg|ml)\b',
r'\b(diagnos[ei]s|diagnosed|condition|disease|disorder)\b',
r'\b(treatment|therapy|intervention|protocol)\b',
r'\b(patient|case|clinical|medical|symptom)\b',
r'\b(efficacy|effectiveness|outcome|response|adverse)\b',
r'\b(guideline|recommendation|standard of care|first-line)\b'
]
clinical_score = sum(1 for p in clinical_patterns if re.search(p, text_lower))
# Patient experience patterns
patient_patterns = [
r'\b(I feel|I felt|I\'m experiencing|I have)\b',
r'\b(my symptoms|my condition|my pain|my treatment)\b',
r'\b(it hurts|it bothers|it helps|it doesn\'t work)\b',
r'\b(I tried|I take|I stopped|I started)\b',
r'\b(doctor told me|doctor said|doctor prescribed)\b'
]
patient_score = sum(1 for p in patient_patterns if re.search(p, text_lower))
# Neutral/closing patterns
neutral_patterns = [
r'\b(thank you|thanks|appreciate|goodbye|bye|closing)\b',
r'\b(that concludes|that\'s all|we\'re done)\b'
]
neutral_score = sum(1 for p in neutral_patterns if re.search(p, text_lower))
# Decision logic based on interviewee type
if neutral_score > 0 and len(text.split()) < 15:
return "Neutral"
if interviewee_type == "HCP":
# In HCP interviews, high clinical language = interviewee (doctor)
if clinical_score >= 3:
return "Doctor"
elif question_score >= 2:
return "Interviewer"
elif clinical_score >= 1:
return "Doctor"
else:
return "Unknown"
elif interviewee_type == "Patient":
# In patient interviews, patient experience language = interviewee
if patient_score >= 2:
return "Patient"
elif question_score >= 2:
return "Interviewer"
elif clinical_score >= 2:
return "Interviewer" # Likely interviewer explaining medical info
elif patient_score >= 1:
return "Patient"
else:
return "Unknown"
else:
# General classification
if question_score >= 2:
return "Interviewer"
elif clinical_score >= 2:
return "Respondent"
else:
return "Unknown"
def parse_existing_tags(text: str, pattern_info: dict) -> List[Tuple[str, str]]:
"""Parse text with existing speaker tags"""
pattern_type = pattern_info["pattern_type"]
segments = []
if pattern_type == "colon_based":
# "Speaker 1: text"
parts = re.split(r'^([A-Z][a-z\s]+\d*):\s', text, flags=re.MULTILINE)
for i in range(1, len(parts), 2):
if i + 1 < len(parts):
speaker = parts[i].strip()
content = parts[i + 1].strip()
if content:
segments.append((speaker, content))
elif pattern_type == "bracket_based":
# "[Speaker] text"
parts = re.split(r'^\[([^\]]+)\]\s', text, flags=re.MULTILINE)
for i in range(1, len(parts), 2):
if i + 1 < len(parts):
speaker = parts[i].strip()
content = parts[i + 1].strip()
if content:
segments.append((speaker, content))
elif pattern_type == "q_a_based":
# "Q: / A:"
parts = re.split(r'^([QA]):\s', text, flags=re.MULTILINE)
for i in range(1, len(parts), 2):
if i + 1 < len(parts):
speaker = "Interviewer" if parts[i] == "Q" else "Respondent"
content = parts[i + 1].strip()
if content:
segments.append((speaker, content))
else:
# No clear pattern - treat as single block
segments.append(("Unknown", text))
return segments
def tag_speakers_advanced(text: str, role_hint: str = "", interviewee_type: str = "Other") -> str:
"""
Advanced speaker tagging with pattern detection and role classification
"""
# Step 1: Detect existing structure
pattern_info = detect_speaker_patterns(text)
# Step 2: Parse role hints if provided
role_mapping = {}
if role_hint:
# Parse hints like "Speaker 1 = Interviewer, Speaker 2 = Doctor"
hint_parts = re.findall(r'([^,=]+)\s*=\s*([^,=]+)', role_hint)
for original, mapped in hint_parts:
role_mapping[original.strip().lower()] = mapped.strip()
# Step 3: Parse segments
if pattern_info["has_structure"]:
segments = parse_existing_tags(text, pattern_info)
else:
# No clear structure - split by paragraphs/lines
lines = [line.strip() for line in text.split('\n') if line.strip()]
segments = [("Unknown", line) for line in lines]
# Step 4: Classify and tag each segment
tagged_segments = []
for speaker_label, content in segments:
# Apply role mapping if available
speaker_key = speaker_label.lower()
if speaker_key in role_mapping:
final_role = role_mapping[speaker_key]
else:
# Auto-classify based on content
final_role = classify_speaker_role(content, speaker_label, interviewee_type)
# Format the tagged line
tagged_segments.append(f"[{final_role}] {content}")
return "\n\n".join(tagged_segments)
def analyze_speaker_distribution(tagged_text: str) -> dict:
"""
Analyze the distribution of speakers in tagged text
Useful for quality control
"""
speakers = re.findall(r'^\[([^\]]+)\]', tagged_text, re.MULTILINE)
distribution = Counter(speakers)
total = len(speakers)
return {
"total_segments": total,
"unique_speakers": len(distribution),
"distribution": dict(distribution),
"percentages": {k: (v / total * 100) for k, v in distribution.items()} if total > 0 else {}
}