File size: 9,250 Bytes
54c99ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import re
from typing import Tuple, Dict, List

def validate_extraction(text: str, filename: str) -> Tuple[bool, str]:
    """
    Validate that text extraction was successful
    """
    
    if not text or not text.strip():
        return False, "No text extracted"
    
    # Check for minimum content
    if len(text) < 50:
        return False, f"Extracted text too short ({len(text)} chars)"
    
    # Check for garbled text indicators
    garbled_patterns = [
        (r'[^\x00-\x7F]{50,}', "Contains large blocks of non-ASCII characters"),
        (r'(.)\1{20,}', "Contains suspicious character repetition"),
        (r'^[\W\d\s]+$', "Contains only symbols/numbers/whitespace")
    ]
    
    for pattern, msg in garbled_patterns:
        if re.search(pattern, text):
            return False, msg
    
    # Check word count
    words = text.split()
    if len(words) < 20:
        return False, f"Too few words ({len(words)})"
    
    # Calculate ratio of real words (heuristic)
    potential_words = [w for w in words if re.match(r'^[a-zA-Z]{2,}$', w)]
    word_ratio = len(potential_words) / len(words) if words else 0
    
    if word_ratio < 0.3:
        return False, f"Low word ratio ({word_ratio:.2f}) - possible extraction issue"
    
    return True, f"Valid ({len(words)} words, {len(text)} chars)"


def validate_transcript_quality(
    analyzed_text: str,
    structured_data: Dict,
    interviewee_type: str
) -> Tuple[float, str]:
    """
    Assess quality of analyzed transcript
    
    Returns:
        Tuple of (quality_score [0-1], issues_description)
    """
    
    score = 1.0
    issues = []
    
    # Check 1: Length of analysis
    if len(analyzed_text) < 100:
        score -= 0.3
        issues.append("Analysis too brief")
    elif len(analyzed_text) < 300:
        score -= 0.1
        issues.append("Analysis somewhat brief")
    
    # Check 2: Presence of structured data
    if not structured_data:
        score -= 0.2
        issues.append("No structured data extracted")
    else:
        # Check if structured data has content
        empty_fields = sum(1 for v in structured_data.values() if not v)
        total_fields = len(structured_data)
        
        if empty_fields == total_fields:
            score -= 0.3
            issues.append("All structured fields empty")
        elif empty_fields > total_fields * 0.7:
            score -= 0.2
            issues.append("Most structured fields empty")
    
    # Check 3: Type-specific validation
    if interviewee_type == "HCP":
        # Expect medical terminology
        medical_terms = re.findall(
            r'\b(diagnos\w+|prescri\w+|treatment|medication|patient|clinical|therapy)\b',
            analyzed_text,
            re.IGNORECASE
        )
        
        if len(medical_terms) < 3:
            score -= 0.2
            issues.append("Limited medical terminology for HCP interview")
        
        # Check for key structured fields
        key_fields = ["diagnoses", "prescriptions", "treatment_rationale"]
        missing_fields = [f for f in key_fields if not structured_data.get(f)]
        
        if len(missing_fields) == len(key_fields):
            score -= 0.2
            issues.append("No key HCP data extracted")
    
    elif interviewee_type == "Patient":
        # Expect patient-centric language
        patient_terms = re.findall(
            r'\b(symptom|feel|concern|experience|treatment|side effect|quality of life)\b',
            analyzed_text,
            re.IGNORECASE
        )
        
        if len(patient_terms) < 3:
            score -= 0.2
            issues.append("Limited patient-centric content")
        
        # Check for key structured fields
        key_fields = ["symptoms", "concerns", "treatment_response"]
        missing_fields = [f for f in key_fields if not structured_data.get(f)]
        
        if len(missing_fields) == len(key_fields):
            score -= 0.2
            issues.append("No key patient data extracted")
    
    # Check 4: Error indicators
    error_patterns = [
        r'\[Error\]',
        r'failed to',
        r'could not',
        r'unable to',
        r'timeout'
    ]
    
    for pattern in error_patterns:
        if re.search(pattern, analyzed_text, re.IGNORECASE):
            score -= 0.3
            issues.append("Contains error messages")
            break
    
    # Check 5: Repetitive content (potential LLM failure)
    sentences = analyzed_text.split('.')
    if len(sentences) > 3:
        unique_sentences = set(s.strip().lower() for s in sentences if s.strip())
        repetition_ratio = len(sentences) / len(unique_sentences) if unique_sentences else 1
        
        if repetition_ratio > 1.5:
            score -= 0.2
            issues.append("High content repetition")
    
    # Ensure score is in valid range
    score = max(0.0, min(1.0, score))
    
    issues_text = "; ".join(issues) if issues else "No issues detected"
    
    return score, issues_text


def check_data_completeness(csv_rows: List[Dict], interviewee_type: str) -> Dict:
    """
    Analyze completeness of extracted data across all transcripts
    """
    
    if not csv_rows:
        return {"error": "No data to check"}
    
    # Determine key fields based on type
    if interviewee_type == "HCP":
        key_fields = ["Diagnoses", "Prescriptions", "Treatment Strategies"]
    elif interviewee_type == "Patient":
        key_fields = ["Primary Symptoms", "Main Concerns", "Treatment Response"]
    else:
        key_fields = ["Key Insights"]
    
    completeness = {}
    
    for field in key_fields:
        if field in csv_rows[0]:  # Check if field exists
            filled_count = sum(1 for row in csv_rows if row.get(field) and row[field].strip())
            completeness[field] = {
                "filled": filled_count,
                "total": len(csv_rows),
                "percentage": (filled_count / len(csv_rows) * 100) if csv_rows else 0
            }
    
    # Overall completeness
    total_fields = sum(c["total"] for c in completeness.values())
    filled_fields = sum(c["filled"] for c in completeness.values())
    overall_percentage = (filled_fields / total_fields * 100) if total_fields > 0 else 0
    
    return {
        "by_field": completeness,
        "overall": {
            "filled": filled_fields,
            "total": total_fields,
            "percentage": overall_percentage
        },
        "quality_grade": (
            "Excellent" if overall_percentage >= 80 else
            "Good" if overall_percentage >= 60 else
            "Fair" if overall_percentage >= 40 else
            "Poor"
        )
    }


def validate_structured_data_format(data: Dict, interviewee_type: str) -> Tuple[bool, List[str]]:
    """
    Validate that structured data has expected format
    """
    
    issues = []
    
    if not isinstance(data, dict):
        return False, ["Data is not a dictionary"]
    
    # Define expected fields by type
    expected_fields = {
        "HCP": ["diagnoses", "prescriptions", "treatment_rationale"],
        "Patient": ["symptoms", "concerns", "treatment_response"],
        "Other": ["key_insights"]
    }
    
    required = expected_fields.get(interviewee_type, [])
    
    # Check for expected fields
    missing = [f for f in required if f not in data]
    if missing:
        issues.append(f"Missing expected fields: {', '.join(missing)}")
    
    # Check field types (should be lists)
    for key, value in data.items():
        if not isinstance(value, list):
            issues.append(f"Field '{key}' should be a list, got {type(value)}")
    
    # Check for empty lists
    empty_fields = [k for k, v in data.items() if isinstance(v, list) and not v]
    if len(empty_fields) == len(data):
        issues.append("All fields are empty lists")
    
    is_valid = len(issues) == 0
    
    return is_valid, issues

def validate_summary_quality(summary: str, num_transcripts: int) -> Tuple[float, List[str]]:
    """Check summary for rigor and accuracy"""
    issues = []
    score = 1.0
    
    # Check for quantification
    if not re.search(r'\d+\s*(?:out of|of|participants|%)', summary):
        issues.append("No quantified findings (must include counts/percentages)")
        score -= 0.3
    
    # Check for vague claims
    vague_terms = ['many', 'most', 'some', 'several', 'often', 'frequently']
    if any(term in summary.lower() for term in vague_terms):
        issues.append("Contains vague terms - should use specific numbers")
        score -= 0.2
    
    # Check for absolute claims
    absolute_terms = ['all', 'everyone', 'nobody', 'never', 'always']
    for term in absolute_terms:
        if re.search(rf'\b{term}\b', summary.lower()):
            issues.append(f"Absolute claim '{term}' found - likely overgeneralization")
            score -= 0.2
    
    # Check for evidence markers
    if 'consensus' not in summary.lower() and 'majority' not in summary.lower():
        issues.append("Missing consensus indicators")
        score -= 0.1
    
    # Check length is substantial
    if len(summary) < 500:
        issues.append("Summary too brief for thorough analysis")
        score -= 0.2
    
    return max(0.0, score), issues