Spaces:
Runtime error
Runtime error
| """ | |
| NEUROFLUX ULTIMATE - Pathology Detector | |
| Advanced pathology detection with ensemble methods | |
| """ | |
| import numpy as np | |
| from typing import Dict, Any, List, Tuple | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class PathologyDetector: | |
| """ | |
| Advanced pathology detection system | |
| """ | |
| def __init__(self, model_type: str = "ensemble_transformer"): | |
| self.model_type = model_type | |
| self.confidence_threshold = 0.85 | |
| # Pathology types | |
| self.pathology_types = [ | |
| 'tumor', | |
| 'lesion', | |
| 'hemorrhage', | |
| 'atrophy', | |
| 'inflammation', | |
| 'calcification' | |
| ] | |
| logger.info(f"PathologyDetector initialized: {model_type}") | |
| def detect_ensemble( | |
| self, | |
| processed_data: Dict[str, Any], | |
| confidence_threshold: float = 0.85, | |
| multi_scale_analysis: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Ensemble pathology detection | |
| Args: | |
| processed_data: Preprocessed medical data | |
| confidence_threshold: Minimum confidence for detection | |
| multi_scale_analysis: Enable multi-scale detection | |
| Returns: | |
| Detection results dictionary | |
| """ | |
| image = processed_data['image'] | |
| # Run multiple detection methods | |
| anomaly_map = self._generate_anomaly_map(image) | |
| pathologies = self._classify_pathologies(image, anomaly_map) | |
| localization = self._localize_anomalies(image, anomaly_map) | |
| # Calculate overall confidence | |
| overall_confidence = self._calculate_overall_confidence(pathologies) | |
| # Multi-scale if enabled | |
| if multi_scale_analysis: | |
| multi_scale_detections = self._multi_scale_detection(image) | |
| else: | |
| multi_scale_detections = {} | |
| # Count anomalies | |
| anomalies_found = sum(1 for p in pathologies if p['confidence'] >= confidence_threshold) | |
| return { | |
| 'anomaly_map': anomaly_map, | |
| 'detected_pathologies': pathologies, | |
| 'anomaly_locations': localization, | |
| 'overall_confidence': overall_confidence, | |
| 'anomalies_found': anomalies_found, | |
| 'multi_scale_detections': multi_scale_detections, | |
| 'model_type': self.model_type | |
| } | |
| def _generate_anomaly_map(self, image: np.ndarray) -> np.ndarray: | |
| """ | |
| Generate anomaly heatmap | |
| Uses statistical and gradient-based methods | |
| """ | |
| # Calculate local statistics | |
| from scipy import ndimage | |
| # Local mean and std using sliding window | |
| mean_filtered = ndimage.uniform_filter(image, size=15) | |
| mean_sq_filtered = ndimage.uniform_filter(image**2, size=15) | |
| local_std = np.sqrt(mean_sq_filtered - mean_filtered**2) | |
| # Gradient magnitude | |
| gradient_x = ndimage.sobel(image, axis=0) | |
| gradient_y = ndimage.sobel(image, axis=1) | |
| gradient_mag = np.sqrt(gradient_x**2 + gradient_y**2) | |
| # Combine for anomaly map | |
| # Normalize each component | |
| local_std_norm = (local_std - local_std.min()) / (local_std.max() - local_std.min() + 1e-7) | |
| gradient_norm = (gradient_mag - gradient_mag.min()) / (gradient_mag.max() - gradient_mag.min() + 1e-7) | |
| # Weight combination | |
| anomaly_map = 0.6 * local_std_norm + 0.4 * gradient_norm | |
| # Apply threshold to highlight significant anomalies | |
| threshold = np.percentile(anomaly_map, 75) | |
| anomaly_map = np.where(anomaly_map > threshold, anomaly_map, 0) | |
| return anomaly_map | |
| def _classify_pathologies( | |
| self, | |
| image: np.ndarray, | |
| anomaly_map: np.ndarray | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Classify detected pathologies | |
| """ | |
| pathologies = [] | |
| # Analyze anomaly characteristics | |
| if np.max(anomaly_map) > 0: | |
| # Calculate features from anomaly regions | |
| features = self._extract_anomaly_features(image, anomaly_map) | |
| # Simulate pathology classification (in production, use trained models) | |
| # For demonstration, we'll use statistical heuristics | |
| # Check for various pathology types | |
| for pathology_type in self.pathology_types: | |
| confidence = self._estimate_pathology_confidence( | |
| pathology_type, | |
| features, | |
| anomaly_map | |
| ) | |
| if confidence > 0.5: # Minimum threshold to report | |
| pathologies.append({ | |
| 'type': pathology_type, | |
| 'confidence': confidence, | |
| 'severity': self._estimate_severity(confidence), | |
| 'characteristics': self._get_pathology_characteristics(pathology_type) | |
| }) | |
| # Sort by confidence | |
| pathologies.sort(key=lambda x: x['confidence'], reverse=True) | |
| # If no pathologies detected, add "normal" finding | |
| if not pathologies: | |
| pathologies.append({ | |
| 'type': 'normal', | |
| 'confidence': 0.95, | |
| 'severity': 'none', | |
| 'characteristics': 'No significant abnormalities detected' | |
| }) | |
| return pathologies | |
| def _localize_anomalies( | |
| self, | |
| image: np.ndarray, | |
| anomaly_map: np.ndarray | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Localize anomalies in the image | |
| Returns bounding boxes and masks | |
| """ | |
| from scipy import ndimage | |
| locations = [] | |
| # Threshold anomaly map | |
| threshold = np.percentile(anomaly_map[anomaly_map > 0], 80) if np.any(anomaly_map > 0) else 0 | |
| binary_map = anomaly_map > threshold | |
| # Label connected components | |
| labeled_array, num_features = ndimage.label(binary_map) | |
| # Extract each component | |
| for label_idx in range(1, num_features + 1): | |
| component_mask = labeled_array == label_idx | |
| # Get bounding box | |
| coords = np.argwhere(component_mask) | |
| if len(coords) > 0: | |
| y_min, x_min = coords.min(axis=0) | |
| y_max, x_max = coords.max(axis=0) | |
| # Calculate properties | |
| area = np.sum(component_mask) | |
| mean_intensity = np.mean(image[component_mask]) | |
| locations.append({ | |
| 'bbox': { | |
| 'x_min': int(x_min), | |
| 'y_min': int(y_min), | |
| 'x_max': int(x_max), | |
| 'y_max': int(y_max) | |
| }, | |
| 'area': int(area), | |
| 'mean_intensity': float(mean_intensity), | |
| 'confidence': float(np.mean(anomaly_map[component_mask])) | |
| }) | |
| return locations | |
| def _extract_anomaly_features( | |
| self, | |
| image: np.ndarray, | |
| anomaly_map: np.ndarray | |
| ) -> Dict[str, float]: | |
| """Extract features from anomalous regions""" | |
| # Get anomalous regions | |
| threshold = np.percentile(anomaly_map[anomaly_map > 0], 70) if np.any(anomaly_map > 0) else 0 | |
| anomaly_regions = image[anomaly_map > threshold] | |
| if len(anomaly_regions) == 0: | |
| anomaly_regions = image.flatten() | |
| return { | |
| 'mean_intensity': float(np.mean(anomaly_regions)), | |
| 'std_intensity': float(np.std(anomaly_regions)), | |
| 'max_intensity': float(np.max(anomaly_regions)), | |
| 'area_ratio': float(np.sum(anomaly_map > threshold) / anomaly_map.size), | |
| 'compactness': self._calculate_compactness(anomaly_map > threshold) | |
| } | |
| def _estimate_pathology_confidence( | |
| self, | |
| pathology_type: str, | |
| features: Dict[str, float], | |
| anomaly_map: np.ndarray | |
| ) -> float: | |
| """ | |
| Estimate confidence for specific pathology type | |
| (Simplified heuristic for demonstration) | |
| """ | |
| # Base confidence from anomaly intensity | |
| base_confidence = float(np.mean(anomaly_map[anomaly_map > 0])) if np.any(anomaly_map > 0) else 0.1 | |
| # Adjust based on pathology type characteristics | |
| adjustments = { | |
| 'tumor': features['area_ratio'] * 2.0, | |
| 'lesion': features['compactness'], | |
| 'hemorrhage': features['max_intensity'], | |
| 'atrophy': 1.0 - features['mean_intensity'], | |
| 'inflammation': features['std_intensity'], | |
| 'calcification': features['max_intensity'] * features['compactness'] | |
| } | |
| adjustment = adjustments.get(pathology_type, 0.5) | |
| confidence = base_confidence * adjustment | |
| # Clip to [0, 1] | |
| return float(np.clip(confidence, 0.0, 0.99)) | |
| def _estimate_severity(self, confidence: float) -> str: | |
| """Estimate severity based on confidence""" | |
| if confidence >= 0.9: | |
| return 'high' | |
| elif confidence >= 0.75: | |
| return 'moderate' | |
| elif confidence >= 0.6: | |
| return 'low' | |
| else: | |
| return 'minimal' | |
| def _get_pathology_characteristics(self, pathology_type: str) -> str: | |
| """Get description of pathology characteristics""" | |
| characteristics = { | |
| 'tumor': 'Localized mass with distinct boundaries', | |
| 'lesion': 'Area of abnormal tissue damage or change', | |
| 'hemorrhage': 'Evidence of bleeding or blood accumulation', | |
| 'atrophy': 'Tissue volume reduction or degradation', | |
| 'inflammation': 'Signs of tissue inflammation or edema', | |
| 'calcification': 'Calcium deposit accumulation', | |
| 'normal': 'No significant abnormalities detected' | |
| } | |
| return characteristics.get(pathology_type, 'Unknown pathology type') | |
| def _calculate_compactness(self, binary_mask: np.ndarray) -> float: | |
| """Calculate compactness of binary region""" | |
| area = np.sum(binary_mask) | |
| if area == 0: | |
| return 0.0 | |
| # Simple perimeter estimation | |
| from scipy import ndimage | |
| eroded = ndimage.binary_erosion(binary_mask) | |
| perimeter = np.sum(binary_mask) - np.sum(eroded) | |
| if perimeter == 0: | |
| return 1.0 | |
| # Compactness = 4π * area / perimeter² | |
| compactness = 4 * np.pi * area / (perimeter ** 2) | |
| return float(np.clip(compactness, 0.0, 1.0)) | |
| def _calculate_overall_confidence(self, pathologies: List[Dict]) -> float: | |
| """Calculate overall detection confidence""" | |
| if not pathologies: | |
| return 0.5 | |
| # Weight by top detections | |
| confidences = [p['confidence'] for p in pathologies[:3]] | |
| return float(np.mean(confidences)) | |
| def _multi_scale_detection(self, image: np.ndarray) -> Dict[str, Any]: | |
| """Run detection at multiple scales""" | |
| scales = [1.0, 0.75, 0.5] | |
| results = {} | |
| for scale in scales: | |
| if scale != 1.0: | |
| import cv2 | |
| h, w = image.shape[:2] | |
| scaled = cv2.resize(image, (int(w * scale), int(h * scale))) | |
| else: | |
| scaled = image | |
| anomaly_map = self._generate_anomaly_map(scaled) | |
| results[f'scale_{scale}'] = { | |
| 'max_anomaly': float(np.max(anomaly_map)), | |
| 'mean_anomaly': float(np.mean(anomaly_map[anomaly_map > 0])) if np.any(anomaly_map > 0) else 0.0 | |
| } | |
| return results | |