| import re |
| from typing import Tuple, Dict, List |
|
|
| def validate_extraction(text: str, filename: str) -> Tuple[bool, str]: |
| """ |
| Validate that text extraction was successful |
| """ |
| |
| if not text or not text.strip(): |
| return False, "No text extracted" |
| |
| |
| if len(text) < 50: |
| return False, f"Extracted text too short ({len(text)} chars)" |
| |
| |
| garbled_patterns = [ |
| (r'[^\x00-\x7F]{50,}', "Contains large blocks of non-ASCII characters"), |
| (r'(.)\1{20,}', "Contains suspicious character repetition"), |
| (r'^[\W\d\s]+$', "Contains only symbols/numbers/whitespace") |
| ] |
| |
| for pattern, msg in garbled_patterns: |
| if re.search(pattern, text): |
| return False, msg |
| |
| |
| words = text.split() |
| if len(words) < 20: |
| return False, f"Too few words ({len(words)})" |
| |
| |
| potential_words = [w for w in words if re.match(r'^[a-zA-Z]{2,}$', w)] |
| word_ratio = len(potential_words) / len(words) if words else 0 |
| |
| if word_ratio < 0.3: |
| return False, f"Low word ratio ({word_ratio:.2f}) - possible extraction issue" |
| |
| return True, f"Valid ({len(words)} words, {len(text)} chars)" |
|
|
|
|
| def validate_transcript_quality( |
| analyzed_text: str, |
| structured_data: Dict, |
| interviewee_type: str |
| ) -> Tuple[float, str]: |
| """ |
| Assess quality of analyzed transcript |
| |
| Returns: |
| Tuple of (quality_score [0-1], issues_description) |
| """ |
| |
| score = 1.0 |
| issues = [] |
| |
| |
| if len(analyzed_text) < 100: |
| score -= 0.3 |
| issues.append("Analysis too brief") |
| elif len(analyzed_text) < 300: |
| score -= 0.1 |
| issues.append("Analysis somewhat brief") |
| |
| |
| if not structured_data: |
| score -= 0.2 |
| issues.append("No structured data extracted") |
| else: |
| |
| empty_fields = sum(1 for v in structured_data.values() if not v) |
| total_fields = len(structured_data) |
| |
| if empty_fields == total_fields: |
| score -= 0.3 |
| issues.append("All structured fields empty") |
| elif empty_fields > total_fields * 0.7: |
| score -= 0.2 |
| issues.append("Most structured fields empty") |
| |
| |
| if interviewee_type == "HCP": |
| |
| medical_terms = re.findall( |
| r'\b(diagnos\w+|prescri\w+|treatment|medication|patient|clinical|therapy)\b', |
| analyzed_text, |
| re.IGNORECASE |
| ) |
| |
| if len(medical_terms) < 3: |
| score -= 0.2 |
| issues.append("Limited medical terminology for HCP interview") |
| |
| |
| key_fields = ["diagnoses", "prescriptions", "treatment_rationale"] |
| missing_fields = [f for f in key_fields if not structured_data.get(f)] |
| |
| if len(missing_fields) == len(key_fields): |
| score -= 0.2 |
| issues.append("No key HCP data extracted") |
| |
| elif interviewee_type == "Patient": |
| |
| patient_terms = re.findall( |
| r'\b(symptom|feel|concern|experience|treatment|side effect|quality of life)\b', |
| analyzed_text, |
| re.IGNORECASE |
| ) |
| |
| if len(patient_terms) < 3: |
| score -= 0.2 |
| issues.append("Limited patient-centric content") |
| |
| |
| key_fields = ["symptoms", "concerns", "treatment_response"] |
| missing_fields = [f for f in key_fields if not structured_data.get(f)] |
| |
| if len(missing_fields) == len(key_fields): |
| score -= 0.2 |
| issues.append("No key patient data extracted") |
| |
| |
| error_patterns = [ |
| r'\[Error\]', |
| r'failed to', |
| r'could not', |
| r'unable to', |
| r'timeout' |
| ] |
| |
| for pattern in error_patterns: |
| if re.search(pattern, analyzed_text, re.IGNORECASE): |
| score -= 0.3 |
| issues.append("Contains error messages") |
| break |
| |
| |
| sentences = analyzed_text.split('.') |
| if len(sentences) > 3: |
| unique_sentences = set(s.strip().lower() for s in sentences if s.strip()) |
| repetition_ratio = len(sentences) / len(unique_sentences) if unique_sentences else 1 |
| |
| if repetition_ratio > 1.5: |
| score -= 0.2 |
| issues.append("High content repetition") |
| |
| |
| score = max(0.0, min(1.0, score)) |
| |
| issues_text = "; ".join(issues) if issues else "No issues detected" |
| |
| return score, issues_text |
|
|
|
|
| def check_data_completeness(csv_rows: List[Dict], interviewee_type: str) -> Dict: |
| """ |
| Analyze completeness of extracted data across all transcripts |
| """ |
| |
| if not csv_rows: |
| return {"error": "No data to check"} |
| |
| |
| if interviewee_type == "HCP": |
| key_fields = ["Diagnoses", "Prescriptions", "Treatment Strategies"] |
| elif interviewee_type == "Patient": |
| key_fields = ["Primary Symptoms", "Main Concerns", "Treatment Response"] |
| else: |
| key_fields = ["Key Insights"] |
| |
| completeness = {} |
| |
| for field in key_fields: |
| if field in csv_rows[0]: |
| filled_count = sum(1 for row in csv_rows if row.get(field) and row[field].strip()) |
| completeness[field] = { |
| "filled": filled_count, |
| "total": len(csv_rows), |
| "percentage": (filled_count / len(csv_rows) * 100) if csv_rows else 0 |
| } |
| |
| |
| total_fields = sum(c["total"] for c in completeness.values()) |
| filled_fields = sum(c["filled"] for c in completeness.values()) |
| overall_percentage = (filled_fields / total_fields * 100) if total_fields > 0 else 0 |
| |
| return { |
| "by_field": completeness, |
| "overall": { |
| "filled": filled_fields, |
| "total": total_fields, |
| "percentage": overall_percentage |
| }, |
| "quality_grade": ( |
| "Excellent" if overall_percentage >= 80 else |
| "Good" if overall_percentage >= 60 else |
| "Fair" if overall_percentage >= 40 else |
| "Poor" |
| ) |
| } |
|
|
|
|
| def validate_structured_data_format(data: Dict, interviewee_type: str) -> Tuple[bool, List[str]]: |
| """ |
| Validate that structured data has expected format |
| """ |
| |
| issues = [] |
| |
| if not isinstance(data, dict): |
| return False, ["Data is not a dictionary"] |
| |
| |
| expected_fields = { |
| "HCP": ["diagnoses", "prescriptions", "treatment_rationale"], |
| "Patient": ["symptoms", "concerns", "treatment_response"], |
| "Other": ["key_insights"] |
| } |
| |
| required = expected_fields.get(interviewee_type, []) |
| |
| |
| missing = [f for f in required if f not in data] |
| if missing: |
| issues.append(f"Missing expected fields: {', '.join(missing)}") |
| |
| |
| for key, value in data.items(): |
| if not isinstance(value, list): |
| issues.append(f"Field '{key}' should be a list, got {type(value)}") |
| |
| |
| empty_fields = [k for k, v in data.items() if isinstance(v, list) and not v] |
| if len(empty_fields) == len(data): |
| issues.append("All fields are empty lists") |
| |
| is_valid = len(issues) == 0 |
| |
| return is_valid, issues |
|
|
| def validate_summary_quality(summary: str, num_transcripts: int) -> Tuple[float, List[str]]: |
| """Check summary for rigor and accuracy""" |
| issues = [] |
| score = 1.0 |
| |
| |
| if not re.search(r'\d+\s*(?:out of|of|participants|%)', summary): |
| issues.append("No quantified findings (must include counts/percentages)") |
| score -= 0.3 |
| |
| |
| vague_terms = ['many', 'most', 'some', 'several', 'often', 'frequently'] |
| if any(term in summary.lower() for term in vague_terms): |
| issues.append("Contains vague terms - should use specific numbers") |
| score -= 0.2 |
| |
| |
| absolute_terms = ['all', 'everyone', 'nobody', 'never', 'always'] |
| for term in absolute_terms: |
| if re.search(rf'\b{term}\b', summary.lower()): |
| issues.append(f"Absolute claim '{term}' found - likely overgeneralization") |
| score -= 0.2 |
| |
| |
| if 'consensus' not in summary.lower() and 'majority' not in summary.lower(): |
| issues.append("Missing consensus indicators") |
| score -= 0.1 |
| |
| |
| if len(summary) < 500: |
| issues.append("Summary too brief for thorough analysis") |
| score -= 0.2 |
| |
| return max(0.0, score), issues |