Spaces:
Sleeping
Sleeping
| """Qualitative analysis tools for Anton's pipeline.""" | |
| import asyncio | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| logger = logging.getLogger(__name__) | |
| class QualitativeAnalyzer: | |
| def __init__(self, vlm_interface, cmpo_mapper): | |
| self.vlm = vlm_interface | |
| self.cmpo_mapper = cmpo_mapper | |
| self.cache = {} | |
| async def extract_qualitative_features(self, image_path, regions, config): | |
| """Main qualitative analysis pipeline with multi-stage CMPO integration.""" | |
| # Stage 1: Global scene understanding + CMPO mapping | |
| global_context = await self.vlm.analyze_global_scene(image_path, config.get('channels')) | |
| global_cmpo = await self._map_global_context_to_cmpo(global_context) | |
| # Stage 2: Object-level guidance (if needed) | |
| segmentation_guidance = await self._get_segmentation_guidance(image_path, global_context) | |
| # Stage 3: Feature extraction from regions + CMPO mapping | |
| region_features = await self._analyze_region_features(regions, config) | |
| region_cmpo = await self._map_region_features_to_cmpo(region_features) | |
| # Stage 4: Population-level insights + CMPO mapping | |
| population_insights = await self._generate_population_insights(region_features, global_context) | |
| population_cmpo = await self._map_population_insights_to_cmpo(population_insights) | |
| return { | |
| 'global_context': global_context, | |
| 'global_cmpo': global_cmpo, | |
| 'segmentation_guidance': segmentation_guidance, | |
| 'region_features': region_features, | |
| 'region_cmpo': region_cmpo, | |
| 'population_insights': population_insights, | |
| 'population_cmpo': population_cmpo, | |
| 'cmpo_summary': self._create_cmpo_summary(global_cmpo, region_cmpo, population_cmpo) | |
| } | |
| async def _get_segmentation_guidance(self, image_path, global_context): | |
| """Get guidance for segmentation based on global context.""" | |
| try: | |
| # Use VLM to provide segmentation guidance based on global context | |
| guidance = await self.vlm.detect_objects_and_guide(image_path, global_context) | |
| return { | |
| 'recommended_method': guidance.get('segmentation_guidance', 'threshold'), | |
| 'object_types': [obj.get('type', 'unknown') for obj in guidance.get('detected_objects', [])], | |
| 'confidence': guidance.get('object_count_estimate', 0), | |
| 'guidance_details': guidance | |
| } | |
| except Exception as e: | |
| logger.error(f"Segmentation guidance failed: {e}") | |
| return { | |
| 'recommended_method': 'threshold', | |
| 'object_types': ['cell'], | |
| 'confidence': 0.5, | |
| 'guidance_details': {} | |
| } | |
| async def _analyze_region_features(self, regions, config): | |
| """Analyze individual regions for texture-based features.""" | |
| batch_size = config.get('batch_size', 10) | |
| features = [] | |
| # Process regions in batches for efficiency | |
| for i in range(0, len(regions), batch_size): | |
| batch = regions[i:i+batch_size] | |
| batch_patches = [self._extract_patch(region) for region in batch] | |
| # Convert patches to VLM-analyzable format and analyze | |
| batch_features = [] | |
| for patch in batch_patches: | |
| # For now, create mock feature analysis since we don't have actual image patches | |
| feature = { | |
| 'patch_id': patch.get('patch_id', 0), | |
| 'features': self._extract_texture_features_from_patch(patch), | |
| 'confidence': 0.7, | |
| 'type': 'region_analysis', | |
| 'properties': patch.get('properties', {}) | |
| } | |
| batch_features.append(feature) | |
| features.extend(batch_features) | |
| # Cache results to avoid re-analysis | |
| self._cache_features(batch, batch_features) | |
| return features | |
| def _extract_patch(self, region, padding=10): | |
| """Extract a patch from a region.""" | |
| try: | |
| if not hasattr(region, 'bbox') or not hasattr(region, 'image'): | |
| # If region doesn't have proper properties, return a mock patch | |
| return { | |
| 'patch_id': getattr(region, 'label', 0), | |
| 'bbox': getattr(region, 'bbox', (0, 0, 50, 50)), | |
| 'area': getattr(region, 'area', 100), | |
| 'centroid': getattr(region, 'centroid', (25, 25)), | |
| 'patch_data': None # Would normally contain image data | |
| } | |
| # Extract bounding box with padding | |
| minr, minc, maxr, maxc = region.bbox | |
| minr = max(0, minr - padding) | |
| minc = max(0, minc - padding) | |
| # Create patch info | |
| patch_info = { | |
| 'patch_id': region.label, | |
| 'bbox': (minr, minc, maxr + padding, maxc + padding), | |
| 'area': region.area, | |
| 'centroid': region.centroid, | |
| 'patch_data': None, # Could store actual image patch here | |
| 'properties': { | |
| 'eccentricity': getattr(region, 'eccentricity', 0), | |
| 'solidity': getattr(region, 'solidity', 0), | |
| 'extent': getattr(region, 'extent', 0) | |
| } | |
| } | |
| return patch_info | |
| except Exception as e: | |
| logger.error(f"Patch extraction failed: {e}") | |
| return { | |
| 'patch_id': 0, | |
| 'bbox': (0, 0, 50, 50), | |
| 'area': 100, | |
| 'centroid': (25, 25), | |
| 'patch_data': None | |
| } | |
| def _cache_features(self, regions, features): | |
| """Cache features for regions to avoid re-analysis.""" | |
| for region, feature in zip(regions, features): | |
| self.cache[region.label] = feature | |
| async def _generate_population_insights(self, region_features, global_context): | |
| """Generate insights at the population level.""" | |
| try: | |
| # Aggregate feature data for population analysis | |
| population_data = { | |
| 'total_regions': len(region_features), | |
| 'feature_distribution': self._analyze_feature_distribution(region_features), | |
| 'global_context': global_context | |
| } | |
| # Use VLM to generate population-level insights | |
| insights = await self.vlm.generate_population_insights(region_features) | |
| # Combine with quantitative summary | |
| population_summary = { | |
| 'total_objects': population_data['total_regions'], | |
| 'feature_summary': population_data['feature_distribution'], | |
| 'vlm_insights': insights, | |
| 'quality_metrics': { | |
| 'confidence_mean': self._calculate_mean_confidence(region_features), | |
| 'feature_diversity': len(set([f.get('type', 'unknown') for f in region_features])) | |
| } | |
| } | |
| return population_summary | |
| except Exception as e: | |
| logger.error(f"Population insights generation failed: {e}") | |
| return { | |
| 'total_objects': len(region_features), | |
| 'summary': f'Detected {len(region_features)} regions', | |
| 'error': str(e) | |
| } | |
| async def _map_global_context_to_cmpo(self, global_context): | |
| """Map global scene context to population-level and general CMPO terms.""" | |
| try: | |
| from ..cmpo.mapping import map_to_cmpo, validate_mappings_with_vlm | |
| if not global_context or not isinstance(global_context, dict): | |
| return [] | |
| # Extract description for mapping | |
| description = global_context.get('description', '') | |
| if not description: | |
| return [] | |
| # Stage 1: Ontology-aware mapping | |
| mappings = map_to_cmpo(description, self.cmpo_mapper, context='cell_population') | |
| # Stage 2: VLM biological reasoning validation (always apply) | |
| if mappings: | |
| try: | |
| validated_mappings = await validate_mappings_with_vlm( | |
| description, mappings, self.vlm, max_candidates=5 | |
| ) | |
| mappings = validated_mappings if validated_mappings else mappings | |
| logger.info(f"VLM biological reasoning applied to global context mappings") | |
| except Exception as vlm_error: | |
| logger.warning(f"VLM validation failed, using ontology mappings: {vlm_error}") | |
| # Add stage information | |
| for mapping in mappings: | |
| mapping['stage'] = 'global_context' | |
| mapping['source'] = 'global_scene_analysis' | |
| mapping['validated'] = True # Mark as VLM-validated | |
| logger.info(f"Global context mapped to {len(mappings)} CMPO terms") | |
| return mappings | |
| except Exception as e: | |
| logger.error(f"Global context CMPO mapping failed: {e}") | |
| return [] | |
| async def _map_region_features_to_cmpo(self, region_features): | |
| """Map individual region features to cellular phenotype CMPO terms.""" | |
| try: | |
| from ..cmpo.mapping import map_to_cmpo | |
| cmpo_mappings = [] | |
| for i, feature in enumerate(region_features): | |
| if isinstance(feature, dict): | |
| # Extract meaningful descriptions from region features | |
| descriptions = self._extract_region_descriptions(feature) | |
| for desc_type, description in descriptions.items(): | |
| if description: | |
| # Stage 1: Map with cellular phenotype context | |
| mappings = map_to_cmpo(description, self.cmpo_mapper, context='cellular_phenotype') | |
| # Stage 2: VLM biological reasoning validation (always apply) | |
| if mappings: | |
| try: | |
| validated_mappings = await validate_mappings_with_vlm( | |
| description, mappings, self.vlm, max_candidates=3 | |
| ) | |
| mappings = validated_mappings if validated_mappings else mappings | |
| except Exception as vlm_error: | |
| logger.warning(f"VLM validation failed for region {i}: {vlm_error}") | |
| # Add region and stage information | |
| for mapping in mappings: | |
| mapping['stage'] = 'region_features' | |
| mapping['source'] = f'region_{i}_{desc_type}' | |
| mapping['region_id'] = i | |
| mapping['validated'] = True | |
| cmpo_mappings.extend(mappings) | |
| logger.info(f"Region features mapped to {len(cmpo_mappings)} CMPO terms") | |
| return cmpo_mappings | |
| except Exception as e: | |
| logger.error(f"Region features CMPO mapping failed: {e}") | |
| return [] | |
| async def _map_population_insights_to_cmpo(self, population_insights): | |
| """Map population-level insights to cell population phenotype CMPO terms.""" | |
| try: | |
| from ..cmpo.mapping import map_to_cmpo | |
| if not population_insights or not isinstance(population_insights, dict): | |
| return [] | |
| cmpo_mappings = [] | |
| # Map different aspects of population insights | |
| insight_aspects = { | |
| 'summary': population_insights.get('summary', ''), | |
| 'phenotypes': ', '.join(population_insights.get('phenotypes', [])), | |
| 'characteristics': population_insights.get('characteristics', ''), | |
| 'technical_notes': population_insights.get('technical_notes', '') | |
| } | |
| for aspect_type, description in insight_aspects.items(): | |
| if description: | |
| # Stage 1: Map with appropriate context | |
| context = 'cell_population' if aspect_type in ['summary', 'characteristics'] else 'cellular_phenotype' | |
| mappings = map_to_cmpo(description, self.cmpo_mapper, context=context) | |
| # Stage 2: VLM biological reasoning validation (always apply) | |
| if mappings: | |
| try: | |
| validated_mappings = await validate_mappings_with_vlm( | |
| description, mappings, self.vlm, max_candidates=3 | |
| ) | |
| mappings = validated_mappings if validated_mappings else mappings | |
| except Exception as vlm_error: | |
| logger.warning(f"VLM validation failed for population {aspect_type}: {vlm_error}") | |
| # Add population and stage information | |
| for mapping in mappings: | |
| mapping['stage'] = 'population_insights' | |
| mapping['source'] = f'population_{aspect_type}' | |
| mapping['validated'] = True | |
| cmpo_mappings.extend(mappings) | |
| logger.info(f"Population insights mapped to {len(cmpo_mappings)} CMPO terms") | |
| return cmpo_mappings | |
| except Exception as e: | |
| logger.error(f"Population insights CMPO mapping failed: {e}") | |
| return [] | |
| def _extract_region_descriptions(self, feature): | |
| """Extract meaningful descriptions from region features for CMPO mapping.""" | |
| descriptions = {} | |
| # Extract different types of descriptive information | |
| if 'properties' in feature: | |
| props = feature['properties'] | |
| # Morphological descriptions | |
| if 'morphology' in props: | |
| descriptions['morphology'] = props['morphology'] | |
| # Phenotypic characteristics | |
| if 'phenotype' in props: | |
| descriptions['phenotype'] = props['phenotype'] | |
| # General characteristics | |
| if 'characteristics' in props: | |
| descriptions['characteristics'] = props['characteristics'] | |
| # Extract from feature type/classification | |
| if 'type' in feature: | |
| descriptions['cell_type'] = f"{feature['type']} cell" | |
| # Extract from confidence-based features | |
| if 'features' in feature: | |
| feat_list = feature['features'] | |
| if isinstance(feat_list, list) and feat_list: | |
| descriptions['features'] = ', '.join(str(f) for f in feat_list[:3]) # Top 3 features | |
| return descriptions | |
| def _create_cmpo_summary(self, global_cmpo, region_cmpo, population_cmpo): | |
| """Create a comprehensive CMPO summary across all stages.""" | |
| try: | |
| all_mappings = [] | |
| # Collect all mappings | |
| if global_cmpo: | |
| all_mappings.extend(global_cmpo) | |
| if region_cmpo: | |
| all_mappings.extend(region_cmpo) | |
| if population_cmpo: | |
| all_mappings.extend(population_cmpo) | |
| if not all_mappings: | |
| return {'summary': 'No CMPO mappings found', 'mappings': []} | |
| # Group by CMPO ID to avoid duplicates | |
| unique_mappings = {} | |
| for mapping in all_mappings: | |
| cmpo_id = mapping.get('CMPO_ID') | |
| if cmpo_id: | |
| if cmpo_id not in unique_mappings: | |
| unique_mappings[cmpo_id] = mapping.copy() | |
| unique_mappings[cmpo_id]['sources'] = [] | |
| # Track which stages contributed to this mapping | |
| source_info = { | |
| 'stage': mapping.get('stage'), | |
| 'source': mapping.get('source'), | |
| 'confidence': mapping.get('confidence', 0) | |
| } | |
| unique_mappings[cmpo_id]['sources'].append(source_info) | |
| # Update confidence to highest across stages | |
| current_conf = unique_mappings[cmpo_id].get('confidence', 0) | |
| new_conf = mapping.get('confidence', 0) | |
| if new_conf > current_conf: | |
| unique_mappings[cmpo_id]['confidence'] = new_conf | |
| # Sort by confidence | |
| sorted_mappings = sorted(unique_mappings.values(), | |
| key=lambda x: x.get('confidence', 0), reverse=True) | |
| # Create summary statistics | |
| stage_counts = {} | |
| for mapping in all_mappings: | |
| stage = mapping.get('stage', 'unknown') | |
| stage_counts[stage] = stage_counts.get(stage, 0) + 1 | |
| summary = { | |
| 'total_unique_terms': len(unique_mappings), | |
| 'total_mappings': len(all_mappings), | |
| 'stage_breakdown': stage_counts, | |
| 'top_terms': [ | |
| { | |
| 'term': mapping.get('term_name'), | |
| 'cmpo_id': mapping.get('CMPO_ID'), | |
| 'confidence': mapping.get('confidence', 0), | |
| 'stages': [s['stage'] for s in mapping.get('sources', [])] | |
| } | |
| for mapping in sorted_mappings[:5] | |
| ], | |
| 'mappings': sorted_mappings | |
| } | |
| return summary | |
| except Exception as e: | |
| logger.error(f"CMPO summary creation failed: {e}") | |
| return {'summary': f'Error creating CMPO summary: {str(e)}', 'mappings': []} | |
| def _extract_mappable_features(self, feature): | |
| """Extract features that can be mapped to CMPO terms (legacy function).""" | |
| mappable = {} | |
| # Extract common feature types | |
| if 'features' in feature: | |
| for feat in feature['features']: | |
| mappable[feat] = feature.get('confidence', 0.5) | |
| if 'type' in feature: | |
| mappable[feature['type']] = feature.get('confidence', 0.5) | |
| # Extract morphological features if present | |
| for key in ['shape', 'texture', 'intensity', 'size']: | |
| if key in feature: | |
| mappable[key] = feature[key] | |
| return mappable | |
| def _deduplicate_mappings(self, mappings): | |
| """Remove duplicate CMPO mappings and sort by confidence.""" | |
| seen = set() | |
| unique = [] | |
| for mapping in mappings: | |
| if isinstance(mapping, dict): | |
| cmpo_id = mapping.get('cmpo_id', '') | |
| if cmpo_id and cmpo_id not in seen: | |
| seen.add(cmpo_id) | |
| unique.append(mapping) | |
| # Sort by confidence score | |
| return sorted(unique, key=lambda x: x.get('confidence', 0), reverse=True) | |
| def _analyze_feature_distribution(self, features): | |
| """Analyze the distribution of features across regions.""" | |
| distribution = {} | |
| for feature in features: | |
| if isinstance(feature, dict): | |
| feat_type = feature.get('type', 'unknown') | |
| if feat_type in distribution: | |
| distribution[feat_type] += 1 | |
| else: | |
| distribution[feat_type] = 1 | |
| return distribution | |
| def _calculate_mean_confidence(self, features): | |
| """Calculate mean confidence across all features.""" | |
| confidences = [] | |
| for feature in features: | |
| if isinstance(feature, dict) and 'confidence' in feature: | |
| confidences.append(feature['confidence']) | |
| return sum(confidences) / len(confidences) if confidences else 0.0 | |
| def _extract_texture_features_from_patch(self, patch): | |
| """Extract basic texture features from a patch.""" | |
| features = [] | |
| # Extract features based on patch properties | |
| properties = patch.get('properties', {}) | |
| area = patch.get('area', 0) | |
| # Classify based on morphological properties | |
| if properties.get('eccentricity', 0) > 0.8: | |
| features.append('elongated') | |
| elif properties.get('eccentricity', 0) < 0.3: | |
| features.append('round') | |
| else: | |
| features.append('oval') | |
| if properties.get('solidity', 0) > 0.9: | |
| features.append('smooth_boundary') | |
| elif properties.get('solidity', 0) < 0.7: | |
| features.append('irregular_boundary') | |
| if area > 2000: | |
| features.append('large') | |
| elif area < 500: | |
| features.append('small') | |
| else: | |
| features.append('medium') | |
| # Add texture descriptors (would normally come from image analysis) | |
| features.extend(['textured', 'cellular']) | |
| return features |