File size: 15,905 Bytes
b69e9e7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
"""Free-form to CMPO mapping for Anton's pipeline."""

import re
from typing import Dict, List, Tuple, Set
from difflib import SequenceMatcher

def map_to_cmpo(description: str, cmpo_ontology, context: str = None) -> List[Dict]:
    """Convert a free-form description to CMPO terms using semantic mapping."""
    if not description or not cmpo_ontology:
        return []
    
    description_lower = description.lower()
    mappings = []
    
    # 1. Direct name/synonym matching
    direct_matches = _find_direct_matches(description_lower, cmpo_ontology)
    
    # 2. Semantic component matching
    semantic_matches = _find_semantic_matches(description_lower, cmpo_ontology)
    
    # 3. Hierarchical context matching (if context provided)
    context_matches = _find_context_matches(description_lower, cmpo_ontology, context) if context else []
    
    # Combine and score all matches
    all_matches = {}
    
    # Weight direct matches highest (preserve enhanced scoring differences)
    for term_id, confidence, evidence in direct_matches:
        if term_id not in all_matches:
            all_matches[term_id] = {'confidence': 0, 'evidence': []}
        all_matches[term_id]['confidence'] += confidence  # Don't flatten with 0.8 multiplier
        all_matches[term_id]['evidence'].append(f"Direct match: {evidence}")
    
    # Weight semantic matches moderately  
    for term_id, confidence, evidence in semantic_matches:
        if term_id not in all_matches:
            all_matches[term_id] = {'confidence': 0, 'evidence': []}
        all_matches[term_id]['confidence'] += confidence * 0.3  # Lower weight for semantic
        all_matches[term_id]['evidence'].append(f"Semantic: {evidence}")
    
    # Weight context matches lower but still valuable
    for term_id, confidence, evidence in context_matches:
        if term_id not in all_matches:
            all_matches[term_id] = {'confidence': 0, 'evidence': []}
        all_matches[term_id]['confidence'] += confidence * 0.2  # Lower weight for context
        all_matches[term_id]['evidence'].append(f"Context: {evidence}")
    
    # Convert to final format
    for term_id, match_data in all_matches.items():
        term_info = cmpo_ontology.get_term(term_id)
        if term_info:
            mappings.append({
                "CMPO_ID": term_id,
                "term_name": term_info['name'],
                "confidence": match_data['confidence'],  # Preserve full confidence for sorting
                "supporting_evidence": "; ".join(match_data['evidence'][:3]),
                "description": term_info.get('description', ''),
                "hierarchy_path": _get_hierarchy_path(term_id, cmpo_ontology)
            })
    
    # Sort by confidence and return top matches
    mappings.sort(key=lambda x: x['confidence'], reverse=True)
    return mappings[:5]

def _find_direct_matches(description: str, cmpo_ontology) -> List[Tuple[str, float, str]]:
    """Find direct matches with ontology-aware scoring."""
    matches = []
    description_tokens = set(_extract_biological_tokens(description))
    
    for term_id, term_data in cmpo_ontology.ontology.items():
        base_score = 0.0
        matched_evidence = []
        
        # 1. Exact token matches (highest priority)
        term_tokens = set(_extract_biological_tokens(term_data.get('name', '')))
        exact_matches = description_tokens.intersection(term_tokens)
        if exact_matches:
            # Higher score for exact matches
            exact_score = len(exact_matches) / max(len(term_tokens), 1) * 2.0
            base_score += exact_score
            matched_evidence.extend(exact_matches)
        
        # 2. Check term name substring matches
        term_name = term_data.get('name', '').lower()
        if term_name and term_name in description:
            substring_score = len(term_name) / len(description) * 1.5
            base_score += substring_score
            matched_evidence.append(f"name:{term_name}")
        
        # 3. Check synonyms with exact token priority
        for synonym in term_data.get('synonyms', []):
            synonym_tokens = set(_extract_biological_tokens(synonym))
            syn_exact_matches = description_tokens.intersection(synonym_tokens)
            if syn_exact_matches:
                syn_score = len(syn_exact_matches) / max(len(synonym_tokens), 1) * 1.8
                base_score += syn_score
                matched_evidence.extend(syn_exact_matches)
            elif synonym.lower() in description:
                substring_score = len(synonym) / len(description) * 1.2
                base_score += substring_score
                matched_evidence.append(f"synonym:{synonym}")
        
        # 4. Ontology-aware bonuses
        if base_score > 0:
            # Specificity bonus (deeper in hierarchy = more specific = higher score)
            specificity_bonus = _calculate_specificity_bonus(term_id, cmpo_ontology)
            
            # Multi-token exact match bonus (matches multiple key terms)
            multi_token_bonus = 0.0
            if len(exact_matches) > 1:
                multi_token_bonus = len(exact_matches) * 0.5  # Strong bonus for multiple exact matches
            
            # Apply ontology bonuses
            final_score = base_score + specificity_bonus + multi_token_bonus
            
            matches.append((term_id, min(final_score, 5.0), f"exact:{','.join(matched_evidence[:3])}"))
    
    return matches

def _find_semantic_matches(description: str, cmpo_ontology) -> List[Tuple[str, float, str]]:
    """Find matches based on semantic component analysis."""
    matches = []
    
    # Extract meaningful terms from description
    desc_tokens = _extract_biological_tokens(description)
    
    for term_id, term_data in cmpo_ontology.ontology.items():
        # Analyze equivalent_to relations for semantic components
        for equiv in term_data.get('equivalent_to', []):
            semantic_score = _score_semantic_overlap(desc_tokens, equiv)
            if semantic_score > 0.3:
                matches.append((term_id, semantic_score, f"Semantic components in {equiv}"))
        
        # Check description overlap
        term_desc = term_data.get('description', '').lower()
        if term_desc:
            desc_overlap = _calculate_text_similarity(description, term_desc)
            if desc_overlap > 0.4:
                matches.append((term_id, desc_overlap, "Description similarity"))
    
    return matches

def _find_context_matches(description: str, cmpo_ontology, context: str) -> List[Tuple[str, float, str]]:
    """Find matches considering hierarchical context."""
    matches = []
    
    # Define context-based subgraph priorities
    context_subgraphs = {
        'cell_cycle': ['cell_cycle_phenotype', 'mitotic_process_phenotype'],
        'apoptosis': ['cell_death_phenotype', 'apoptotic'],
        'morphology': ['cellular_component_phenotype', 'abnormal_cell_morphology'],
        'process': ['cell_process_phenotype', 'biological_process']
    }
    
    relevant_subgraphs = []
    context_lower = context.lower() if context else ""
    
    for ctx_key, subgraphs in context_subgraphs.items():
        if ctx_key in context_lower:
            relevant_subgraphs.extend(subgraphs)
    
    # Score terms within relevant subgraphs higher
    for term_id, term_data in cmpo_ontology.ontology.items():
        for subgraph in relevant_subgraphs:
            if _term_in_subgraph(term_id, subgraph, cmpo_ontology):
                base_score = 0.5
                # Boost if term also matches description
                term_name = term_data.get('name', '').lower()
                if any(token in term_name for token in description.split()):
                    base_score += 0.3
                matches.append((term_id, base_score, f"Context subgraph: {subgraph}"))
    
    return matches

def _extract_biological_tokens(text: str) -> Set[str]:
    """Extract biologically relevant tokens from text."""
    # Common biological stop words to exclude
    bio_stop_words = {'cell', 'cells', 'cellular', 'the', 'and', 'or', 'with', 'in', 'of'}
    
    # Extract tokens
    tokens = set(re.findall(r'\b\w+\b', text.lower()))
    
    # Filter for biological relevance (length > 3, not stop words)
    bio_tokens = {token for token in tokens 
                 if len(token) > 3 and token not in bio_stop_words}
    
    return bio_tokens

def _score_semantic_overlap(desc_tokens: Set[str], equivalent_to: str) -> float:
    """Score overlap between description tokens and semantic definition."""
    equiv_tokens = _extract_biological_tokens(equivalent_to)
    
    if not equiv_tokens:
        return 0.0
    
    overlap = len(desc_tokens.intersection(equiv_tokens))
    return overlap / max(len(equiv_tokens), 1)

def _calculate_text_similarity(text1: str, text2: str) -> float:
    """Calculate text similarity using sequence matching."""
    return SequenceMatcher(None, text1, text2).ratio()

def _term_in_subgraph(term_id: str, subgraph_name: str, cmpo_ontology) -> bool:
    """Check if a term belongs to a specific subgraph via hierarchy."""
    term_data = cmpo_ontology.get_term(term_id)
    if not term_data:
        return False
    
    # Check if term name contains subgraph keyword
    term_name = term_data.get('name', '').lower()
    if subgraph_name.lower() in term_name:
        return True
    
    # Check parent terms recursively (simple implementation)
    for parent in term_data.get('parent_terms', []):
        parent_data = cmpo_ontology.get_term(parent)
        if parent_data and subgraph_name.lower() in parent_data.get('name', '').lower():
            return True
    
    return False

def _get_hierarchy_path(term_id: str, cmpo_ontology) -> List[str]:
    """Get the hierarchical path for a term."""
    path = []
    current_term = cmpo_ontology.get_term(term_id)
    
    if current_term:
        path.append(current_term.get('name', term_id))
        
        # Add immediate parents (simplified - could be recursive)
        for parent_id in current_term.get('parent_terms', [])[:2]:  # Limit to 2 parents
            parent_term = cmpo_ontology.get_term(parent_id)
            if parent_term:
                path.append(parent_term.get('name', parent_id))
    
    return path

def _calculate_specificity_bonus(term_id: str, cmpo_ontology) -> float:
    """Calculate specificity bonus based on hierarchy depth."""
    try:
        depth = _calculate_hierarchy_depth(term_id, cmpo_ontology)
        # Deeper terms are more specific, get higher bonus
        # Max bonus of 0.5 for terms at depth 4+
        return min(depth * 0.1, 0.5)
    except:
        return 0.0

def _calculate_hierarchy_depth(term_id: str, cmpo_ontology, visited=None) -> int:
    """Calculate depth of term in CMPO hierarchy."""
    if visited is None:
        visited = set()
    
    if term_id in visited:  # Avoid cycles
        return 0
    
    visited.add(term_id)
    term_data = cmpo_ontology.get_term(term_id)
    
    if not term_data or not term_data.get('parent_terms'):
        return 1  # Root level
    
    # Find maximum depth among parents
    max_parent_depth = 0
    for parent_id in term_data.get('parent_terms', []):
        parent_depth = _calculate_hierarchy_depth(parent_id, cmpo_ontology, visited.copy())
        max_parent_depth = max(max_parent_depth, parent_depth)
    
    return max_parent_depth + 1

def _detect_mutual_exclusion(term1_id: str, term2_id: str, cmpo_ontology) -> bool:
    """Detect if two terms are mutually exclusive based on ontology structure."""
    term1 = cmpo_ontology.get_term(term1_id)
    term2 = cmpo_ontology.get_term(term2_id)
    
    if not term1 or not term2:
        return False
    
    # Check if they share the same immediate parent (sibling terms often mutually exclusive)
    term1_parents = set(term1.get('parent_terms', []))
    term2_parents = set(term2.get('parent_terms', []))
    
    shared_parents = term1_parents.intersection(term2_parents)
    
    # If they share parents and are both specific (depth > 2), likely mutually exclusive
    if shared_parents and len(shared_parents) > 0:
        depth1 = _calculate_hierarchy_depth(term1_id, cmpo_ontology)
        depth2 = _calculate_hierarchy_depth(term2_id, cmpo_ontology)
        
        # Heuristic: sibling terms at depth 3+ often mutually exclusive
        if depth1 > 2 and depth2 > 2:
            return True
    
    return False

# Add VLM validation function for the two-stage pipeline
async def validate_mappings_with_vlm(description: str, candidate_mappings: List[Dict], vlm_interface, max_candidates: int = 5) -> List[Dict]:
    """Stage 2: VLM biological reasoning and pruning."""
    if len(candidate_mappings) <= 1:
        return candidate_mappings
    
    # Format candidates for VLM review
    candidates_text = "\n".join([
        f"{i+1}. {mapping['term_name']} (CMPO:{mapping['CMPO_ID']}) - Confidence: {mapping['confidence']:.3f}"
        for i, mapping in enumerate(candidate_mappings[:max_candidates])
    ])
    
    validation_prompt = f"""Original biological description: "{description}"

Candidate CMPO term mappings:
{candidates_text}

Task: Evaluate biological plausibility and ranking of these mappings.

Consider:
- Biological consistency and logical compatibility
- Temporal/spatial relationships in biological processes  
- Phenotypic co-occurrence patterns
- Mechanistic plausibility
- Specificity vs generality trade-offs

Provide:
1. Biologically valid mappings with updated confidence (0-1)
2. Brief scientific reasoning for each acceptance/rejection
3. Final ranked list

Focus on biological accuracy over textual similarity.

Format your response as:
VALID: [term_name] - confidence: [0-1] - reasoning: [brief explanation]
INVALID: [term_name] - reasoning: [brief explanation]
"""
    
    try:
        # This would be implemented as part of VLM interface
        reasoning_result = await vlm_interface.analyze_biological_reasoning(validation_prompt)
        
        # Parse VLM response and update mappings
        validated_mappings = _parse_vlm_validation_response(reasoning_result, candidate_mappings)
        
        return validated_mappings
        
    except Exception as e:
        # Fallback to original mappings if VLM validation fails
        logging.warning(f"VLM validation failed: {e}, using original mappings")
        return candidate_mappings

def _parse_vlm_validation_response(vlm_response: str, original_mappings: List[Dict]) -> List[Dict]:
    """Parse VLM validation response and update mapping confidences."""
    validated = []
    
    # Simple parsing - in production would be more robust
    for line in vlm_response.split('\n'):
        if line.startswith('VALID:'):
            # Extract confidence and reasoning
            parts = line.split(' - ')
            if len(parts) >= 3:
                term_name = parts[0].replace('VALID: ', '').strip()
                confidence_str = parts[1].replace('confidence: ', '').strip()
                reasoning = parts[2].replace('reasoning: ', '').strip()
                
                # Find corresponding original mapping
                for mapping in original_mappings:
                    if mapping['term_name'].lower() == term_name.lower():
                        updated_mapping = mapping.copy()
                        try:
                            updated_mapping['confidence'] = float(confidence_str)
                            updated_mapping['vlm_reasoning'] = reasoning
                            validated.append(updated_mapping)
                        except ValueError:
                            validated.append(mapping)  # Keep original if parsing fails
                        break
    
    # Sort by updated confidence
    validated.sort(key=lambda x: x['confidence'], reverse=True)
    return validated