Spaces:
Sleeping
Sleeping
File size: 9,250 Bytes
54c99ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
import re
from typing import Tuple, Dict, List
def validate_extraction(text: str, filename: str) -> Tuple[bool, str]:
"""
Validate that text extraction was successful
"""
if not text or not text.strip():
return False, "No text extracted"
# Check for minimum content
if len(text) < 50:
return False, f"Extracted text too short ({len(text)} chars)"
# Check for garbled text indicators
garbled_patterns = [
(r'[^\x00-\x7F]{50,}', "Contains large blocks of non-ASCII characters"),
(r'(.)\1{20,}', "Contains suspicious character repetition"),
(r'^[\W\d\s]+$', "Contains only symbols/numbers/whitespace")
]
for pattern, msg in garbled_patterns:
if re.search(pattern, text):
return False, msg
# Check word count
words = text.split()
if len(words) < 20:
return False, f"Too few words ({len(words)})"
# Calculate ratio of real words (heuristic)
potential_words = [w for w in words if re.match(r'^[a-zA-Z]{2,}$', w)]
word_ratio = len(potential_words) / len(words) if words else 0
if word_ratio < 0.3:
return False, f"Low word ratio ({word_ratio:.2f}) - possible extraction issue"
return True, f"Valid ({len(words)} words, {len(text)} chars)"
def validate_transcript_quality(
analyzed_text: str,
structured_data: Dict,
interviewee_type: str
) -> Tuple[float, str]:
"""
Assess quality of analyzed transcript
Returns:
Tuple of (quality_score [0-1], issues_description)
"""
score = 1.0
issues = []
# Check 1: Length of analysis
if len(analyzed_text) < 100:
score -= 0.3
issues.append("Analysis too brief")
elif len(analyzed_text) < 300:
score -= 0.1
issues.append("Analysis somewhat brief")
# Check 2: Presence of structured data
if not structured_data:
score -= 0.2
issues.append("No structured data extracted")
else:
# Check if structured data has content
empty_fields = sum(1 for v in structured_data.values() if not v)
total_fields = len(structured_data)
if empty_fields == total_fields:
score -= 0.3
issues.append("All structured fields empty")
elif empty_fields > total_fields * 0.7:
score -= 0.2
issues.append("Most structured fields empty")
# Check 3: Type-specific validation
if interviewee_type == "HCP":
# Expect medical terminology
medical_terms = re.findall(
r'\b(diagnos\w+|prescri\w+|treatment|medication|patient|clinical|therapy)\b',
analyzed_text,
re.IGNORECASE
)
if len(medical_terms) < 3:
score -= 0.2
issues.append("Limited medical terminology for HCP interview")
# Check for key structured fields
key_fields = ["diagnoses", "prescriptions", "treatment_rationale"]
missing_fields = [f for f in key_fields if not structured_data.get(f)]
if len(missing_fields) == len(key_fields):
score -= 0.2
issues.append("No key HCP data extracted")
elif interviewee_type == "Patient":
# Expect patient-centric language
patient_terms = re.findall(
r'\b(symptom|feel|concern|experience|treatment|side effect|quality of life)\b',
analyzed_text,
re.IGNORECASE
)
if len(patient_terms) < 3:
score -= 0.2
issues.append("Limited patient-centric content")
# Check for key structured fields
key_fields = ["symptoms", "concerns", "treatment_response"]
missing_fields = [f for f in key_fields if not structured_data.get(f)]
if len(missing_fields) == len(key_fields):
score -= 0.2
issues.append("No key patient data extracted")
# Check 4: Error indicators
error_patterns = [
r'\[Error\]',
r'failed to',
r'could not',
r'unable to',
r'timeout'
]
for pattern in error_patterns:
if re.search(pattern, analyzed_text, re.IGNORECASE):
score -= 0.3
issues.append("Contains error messages")
break
# Check 5: Repetitive content (potential LLM failure)
sentences = analyzed_text.split('.')
if len(sentences) > 3:
unique_sentences = set(s.strip().lower() for s in sentences if s.strip())
repetition_ratio = len(sentences) / len(unique_sentences) if unique_sentences else 1
if repetition_ratio > 1.5:
score -= 0.2
issues.append("High content repetition")
# Ensure score is in valid range
score = max(0.0, min(1.0, score))
issues_text = "; ".join(issues) if issues else "No issues detected"
return score, issues_text
def check_data_completeness(csv_rows: List[Dict], interviewee_type: str) -> Dict:
"""
Analyze completeness of extracted data across all transcripts
"""
if not csv_rows:
return {"error": "No data to check"}
# Determine key fields based on type
if interviewee_type == "HCP":
key_fields = ["Diagnoses", "Prescriptions", "Treatment Strategies"]
elif interviewee_type == "Patient":
key_fields = ["Primary Symptoms", "Main Concerns", "Treatment Response"]
else:
key_fields = ["Key Insights"]
completeness = {}
for field in key_fields:
if field in csv_rows[0]: # Check if field exists
filled_count = sum(1 for row in csv_rows if row.get(field) and row[field].strip())
completeness[field] = {
"filled": filled_count,
"total": len(csv_rows),
"percentage": (filled_count / len(csv_rows) * 100) if csv_rows else 0
}
# Overall completeness
total_fields = sum(c["total"] for c in completeness.values())
filled_fields = sum(c["filled"] for c in completeness.values())
overall_percentage = (filled_fields / total_fields * 100) if total_fields > 0 else 0
return {
"by_field": completeness,
"overall": {
"filled": filled_fields,
"total": total_fields,
"percentage": overall_percentage
},
"quality_grade": (
"Excellent" if overall_percentage >= 80 else
"Good" if overall_percentage >= 60 else
"Fair" if overall_percentage >= 40 else
"Poor"
)
}
def validate_structured_data_format(data: Dict, interviewee_type: str) -> Tuple[bool, List[str]]:
"""
Validate that structured data has expected format
"""
issues = []
if not isinstance(data, dict):
return False, ["Data is not a dictionary"]
# Define expected fields by type
expected_fields = {
"HCP": ["diagnoses", "prescriptions", "treatment_rationale"],
"Patient": ["symptoms", "concerns", "treatment_response"],
"Other": ["key_insights"]
}
required = expected_fields.get(interviewee_type, [])
# Check for expected fields
missing = [f for f in required if f not in data]
if missing:
issues.append(f"Missing expected fields: {', '.join(missing)}")
# Check field types (should be lists)
for key, value in data.items():
if not isinstance(value, list):
issues.append(f"Field '{key}' should be a list, got {type(value)}")
# Check for empty lists
empty_fields = [k for k, v in data.items() if isinstance(v, list) and not v]
if len(empty_fields) == len(data):
issues.append("All fields are empty lists")
is_valid = len(issues) == 0
return is_valid, issues
def validate_summary_quality(summary: str, num_transcripts: int) -> Tuple[float, List[str]]:
"""Check summary for rigor and accuracy"""
issues = []
score = 1.0
# Check for quantification
if not re.search(r'\d+\s*(?:out of|of|participants|%)', summary):
issues.append("No quantified findings (must include counts/percentages)")
score -= 0.3
# Check for vague claims
vague_terms = ['many', 'most', 'some', 'several', 'often', 'frequently']
if any(term in summary.lower() for term in vague_terms):
issues.append("Contains vague terms - should use specific numbers")
score -= 0.2
# Check for absolute claims
absolute_terms = ['all', 'everyone', 'nobody', 'never', 'always']
for term in absolute_terms:
if re.search(rf'\b{term}\b', summary.lower()):
issues.append(f"Absolute claim '{term}' found - likely overgeneralization")
score -= 0.2
# Check for evidence markers
if 'consensus' not in summary.lower() and 'majority' not in summary.lower():
issues.append("Missing consensus indicators")
score -= 0.1
# Check length is substantial
if len(summary) < 500:
issues.append("Summary too brief for thorough analysis")
score -= 0.2
return max(0.0, score), issues |