Spaces:
Sleeping
Sleeping
| import re | |
| from typing import Tuple, Dict, List | |
| def validate_extraction(text: str, filename: str) -> Tuple[bool, str]: | |
| """ | |
| Validate that text extraction was successful | |
| """ | |
| if not text or not text.strip(): | |
| return False, "No text extracted" | |
| # Check for minimum content | |
| if len(text) < 50: | |
| return False, f"Extracted text too short ({len(text)} chars)" | |
| # Check for garbled text indicators | |
| garbled_patterns = [ | |
| (r'[^\x00-\x7F]{50,}', "Contains large blocks of non-ASCII characters"), | |
| (r'(.)\1{20,}', "Contains suspicious character repetition"), | |
| (r'^[\W\d\s]+$', "Contains only symbols/numbers/whitespace") | |
| ] | |
| for pattern, msg in garbled_patterns: | |
| if re.search(pattern, text): | |
| return False, msg | |
| # Check word count | |
| words = text.split() | |
| if len(words) < 20: | |
| return False, f"Too few words ({len(words)})" | |
| # Calculate ratio of real words (heuristic) | |
| potential_words = [w for w in words if re.match(r'^[a-zA-Z]{2,}$', w)] | |
| word_ratio = len(potential_words) / len(words) if words else 0 | |
| if word_ratio < 0.3: | |
| return False, f"Low word ratio ({word_ratio:.2f}) - possible extraction issue" | |
| return True, f"Valid ({len(words)} words, {len(text)} chars)" | |
| def validate_transcript_quality( | |
| analyzed_text: str, | |
| structured_data: Dict, | |
| interviewee_type: str | |
| ) -> Tuple[float, str]: | |
| """ | |
| Assess quality of analyzed transcript | |
| Returns: | |
| Tuple of (quality_score [0-1], issues_description) | |
| """ | |
| score = 1.0 | |
| issues = [] | |
| # Check 1: Length of analysis | |
| if len(analyzed_text) < 100: | |
| score -= 0.3 | |
| issues.append("Analysis too brief") | |
| elif len(analyzed_text) < 300: | |
| score -= 0.1 | |
| issues.append("Analysis somewhat brief") | |
| # Check 2: Presence of structured data | |
| if not structured_data: | |
| score -= 0.2 | |
| issues.append("No structured data extracted") | |
| else: | |
| # Check if structured data has content | |
| empty_fields = sum(1 for v in structured_data.values() if not v) | |
| total_fields = len(structured_data) | |
| if empty_fields == total_fields: | |
| score -= 0.3 | |
| issues.append("All structured fields empty") | |
| elif empty_fields > total_fields * 0.7: | |
| score -= 0.2 | |
| issues.append("Most structured fields empty") | |
| # Check 3: Type-specific validation | |
| if interviewee_type == "HCP": | |
| # Expect medical terminology | |
| medical_terms = re.findall( | |
| r'\b(diagnos\w+|prescri\w+|treatment|medication|patient|clinical|therapy)\b', | |
| analyzed_text, | |
| re.IGNORECASE | |
| ) | |
| if len(medical_terms) < 3: | |
| score -= 0.2 | |
| issues.append("Limited medical terminology for HCP interview") | |
| # Check for key structured fields | |
| key_fields = ["diagnoses", "prescriptions", "treatment_rationale"] | |
| missing_fields = [f for f in key_fields if not structured_data.get(f)] | |
| if len(missing_fields) == len(key_fields): | |
| score -= 0.2 | |
| issues.append("No key HCP data extracted") | |
| elif interviewee_type == "Patient": | |
| # Expect patient-centric language | |
| patient_terms = re.findall( | |
| r'\b(symptom|feel|concern|experience|treatment|side effect|quality of life)\b', | |
| analyzed_text, | |
| re.IGNORECASE | |
| ) | |
| if len(patient_terms) < 3: | |
| score -= 0.2 | |
| issues.append("Limited patient-centric content") | |
| # Check for key structured fields | |
| key_fields = ["symptoms", "concerns", "treatment_response"] | |
| missing_fields = [f for f in key_fields if not structured_data.get(f)] | |
| if len(missing_fields) == len(key_fields): | |
| score -= 0.2 | |
| issues.append("No key patient data extracted") | |
| # Check 4: Error indicators | |
| error_patterns = [ | |
| r'\[Error\]', | |
| r'failed to', | |
| r'could not', | |
| r'unable to', | |
| r'timeout' | |
| ] | |
| for pattern in error_patterns: | |
| if re.search(pattern, analyzed_text, re.IGNORECASE): | |
| score -= 0.3 | |
| issues.append("Contains error messages") | |
| break | |
| # Check 5: Repetitive content (potential LLM failure) | |
| sentences = analyzed_text.split('.') | |
| if len(sentences) > 3: | |
| unique_sentences = set(s.strip().lower() for s in sentences if s.strip()) | |
| repetition_ratio = len(sentences) / len(unique_sentences) if unique_sentences else 1 | |
| if repetition_ratio > 1.5: | |
| score -= 0.2 | |
| issues.append("High content repetition") | |
| # Ensure score is in valid range | |
| score = max(0.0, min(1.0, score)) | |
| issues_text = "; ".join(issues) if issues else "No issues detected" | |
| return score, issues_text | |
| def check_data_completeness(csv_rows: List[Dict], interviewee_type: str) -> Dict: | |
| """ | |
| Analyze completeness of extracted data across all transcripts | |
| """ | |
| if not csv_rows: | |
| return {"error": "No data to check"} | |
| # Determine key fields based on type | |
| if interviewee_type == "HCP": | |
| key_fields = ["Diagnoses", "Prescriptions", "Treatment Strategies"] | |
| elif interviewee_type == "Patient": | |
| key_fields = ["Primary Symptoms", "Main Concerns", "Treatment Response"] | |
| else: | |
| key_fields = ["Key Insights"] | |
| completeness = {} | |
| for field in key_fields: | |
| if field in csv_rows[0]: # Check if field exists | |
| filled_count = sum(1 for row in csv_rows if row.get(field) and row[field].strip()) | |
| completeness[field] = { | |
| "filled": filled_count, | |
| "total": len(csv_rows), | |
| "percentage": (filled_count / len(csv_rows) * 100) if csv_rows else 0 | |
| } | |
| # Overall completeness | |
| total_fields = sum(c["total"] for c in completeness.values()) | |
| filled_fields = sum(c["filled"] for c in completeness.values()) | |
| overall_percentage = (filled_fields / total_fields * 100) if total_fields > 0 else 0 | |
| return { | |
| "by_field": completeness, | |
| "overall": { | |
| "filled": filled_fields, | |
| "total": total_fields, | |
| "percentage": overall_percentage | |
| }, | |
| "quality_grade": ( | |
| "Excellent" if overall_percentage >= 80 else | |
| "Good" if overall_percentage >= 60 else | |
| "Fair" if overall_percentage >= 40 else | |
| "Poor" | |
| ) | |
| } | |
| def validate_structured_data_format(data: Dict, interviewee_type: str) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate that structured data has expected format | |
| """ | |
| issues = [] | |
| if not isinstance(data, dict): | |
| return False, ["Data is not a dictionary"] | |
| # Define expected fields by type | |
| expected_fields = { | |
| "HCP": ["diagnoses", "prescriptions", "treatment_rationale"], | |
| "Patient": ["symptoms", "concerns", "treatment_response"], | |
| "Other": ["key_insights"] | |
| } | |
| required = expected_fields.get(interviewee_type, []) | |
| # Check for expected fields | |
| missing = [f for f in required if f not in data] | |
| if missing: | |
| issues.append(f"Missing expected fields: {', '.join(missing)}") | |
| # Check field types (should be lists) | |
| for key, value in data.items(): | |
| if not isinstance(value, list): | |
| issues.append(f"Field '{key}' should be a list, got {type(value)}") | |
| # Check for empty lists | |
| empty_fields = [k for k, v in data.items() if isinstance(v, list) and not v] | |
| if len(empty_fields) == len(data): | |
| issues.append("All fields are empty lists") | |
| is_valid = len(issues) == 0 | |
| return is_valid, issues | |
| def validate_summary_quality(summary: str, num_transcripts: int) -> Tuple[float, List[str]]: | |
| """Check summary for rigor and accuracy""" | |
| issues = [] | |
| score = 1.0 | |
| # Check for quantification | |
| if not re.search(r'\d+\s*(?:out of|of|participants|%)', summary): | |
| issues.append("No quantified findings (must include counts/percentages)") | |
| score -= 0.3 | |
| # Check for vague claims | |
| vague_terms = ['many', 'most', 'some', 'several', 'often', 'frequently'] | |
| if any(term in summary.lower() for term in vague_terms): | |
| issues.append("Contains vague terms - should use specific numbers") | |
| score -= 0.2 | |
| # Check for absolute claims | |
| absolute_terms = ['all', 'everyone', 'nobody', 'never', 'always'] | |
| for term in absolute_terms: | |
| if re.search(rf'\b{term}\b', summary.lower()): | |
| issues.append(f"Absolute claim '{term}' found - likely overgeneralization") | |
| score -= 0.2 | |
| # Check for evidence markers | |
| if 'consensus' not in summary.lower() and 'majority' not in summary.lower(): | |
| issues.append("Missing consensus indicators") | |
| score -= 0.1 | |
| # Check length is substantial | |
| if len(summary) < 500: | |
| issues.append("Summary too brief for thorough analysis") | |
| score -= 0.2 | |
| return max(0.0, score), issues |