NEUROFLUXULTIMATEREVOLUTION / models /pathology_detector.py
kabsis's picture
Upload 13 files
195ae10 verified
"""
NEUROFLUX ULTIMATE - Pathology Detector
Advanced pathology detection with ensemble methods
"""
import numpy as np
from typing import Dict, Any, List, Tuple
import logging
logger = logging.getLogger(__name__)
class PathologyDetector:
"""
Advanced pathology detection system
"""
def __init__(self, model_type: str = "ensemble_transformer"):
self.model_type = model_type
self.confidence_threshold = 0.85
# Pathology types
self.pathology_types = [
'tumor',
'lesion',
'hemorrhage',
'atrophy',
'inflammation',
'calcification'
]
logger.info(f"PathologyDetector initialized: {model_type}")
def detect_ensemble(
self,
processed_data: Dict[str, Any],
confidence_threshold: float = 0.85,
multi_scale_analysis: bool = True
) -> Dict[str, Any]:
"""
Ensemble pathology detection
Args:
processed_data: Preprocessed medical data
confidence_threshold: Minimum confidence for detection
multi_scale_analysis: Enable multi-scale detection
Returns:
Detection results dictionary
"""
image = processed_data['image']
# Run multiple detection methods
anomaly_map = self._generate_anomaly_map(image)
pathologies = self._classify_pathologies(image, anomaly_map)
localization = self._localize_anomalies(image, anomaly_map)
# Calculate overall confidence
overall_confidence = self._calculate_overall_confidence(pathologies)
# Multi-scale if enabled
if multi_scale_analysis:
multi_scale_detections = self._multi_scale_detection(image)
else:
multi_scale_detections = {}
# Count anomalies
anomalies_found = sum(1 for p in pathologies if p['confidence'] >= confidence_threshold)
return {
'anomaly_map': anomaly_map,
'detected_pathologies': pathologies,
'anomaly_locations': localization,
'overall_confidence': overall_confidence,
'anomalies_found': anomalies_found,
'multi_scale_detections': multi_scale_detections,
'model_type': self.model_type
}
def _generate_anomaly_map(self, image: np.ndarray) -> np.ndarray:
"""
Generate anomaly heatmap
Uses statistical and gradient-based methods
"""
# Calculate local statistics
from scipy import ndimage
# Local mean and std using sliding window
mean_filtered = ndimage.uniform_filter(image, size=15)
mean_sq_filtered = ndimage.uniform_filter(image**2, size=15)
local_std = np.sqrt(mean_sq_filtered - mean_filtered**2)
# Gradient magnitude
gradient_x = ndimage.sobel(image, axis=0)
gradient_y = ndimage.sobel(image, axis=1)
gradient_mag = np.sqrt(gradient_x**2 + gradient_y**2)
# Combine for anomaly map
# Normalize each component
local_std_norm = (local_std - local_std.min()) / (local_std.max() - local_std.min() + 1e-7)
gradient_norm = (gradient_mag - gradient_mag.min()) / (gradient_mag.max() - gradient_mag.min() + 1e-7)
# Weight combination
anomaly_map = 0.6 * local_std_norm + 0.4 * gradient_norm
# Apply threshold to highlight significant anomalies
threshold = np.percentile(anomaly_map, 75)
anomaly_map = np.where(anomaly_map > threshold, anomaly_map, 0)
return anomaly_map
def _classify_pathologies(
self,
image: np.ndarray,
anomaly_map: np.ndarray
) -> List[Dict[str, Any]]:
"""
Classify detected pathologies
"""
pathologies = []
# Analyze anomaly characteristics
if np.max(anomaly_map) > 0:
# Calculate features from anomaly regions
features = self._extract_anomaly_features(image, anomaly_map)
# Simulate pathology classification (in production, use trained models)
# For demonstration, we'll use statistical heuristics
# Check for various pathology types
for pathology_type in self.pathology_types:
confidence = self._estimate_pathology_confidence(
pathology_type,
features,
anomaly_map
)
if confidence > 0.5: # Minimum threshold to report
pathologies.append({
'type': pathology_type,
'confidence': confidence,
'severity': self._estimate_severity(confidence),
'characteristics': self._get_pathology_characteristics(pathology_type)
})
# Sort by confidence
pathologies.sort(key=lambda x: x['confidence'], reverse=True)
# If no pathologies detected, add "normal" finding
if not pathologies:
pathologies.append({
'type': 'normal',
'confidence': 0.95,
'severity': 'none',
'characteristics': 'No significant abnormalities detected'
})
return pathologies
def _localize_anomalies(
self,
image: np.ndarray,
anomaly_map: np.ndarray
) -> List[Dict[str, Any]]:
"""
Localize anomalies in the image
Returns bounding boxes and masks
"""
from scipy import ndimage
locations = []
# Threshold anomaly map
threshold = np.percentile(anomaly_map[anomaly_map > 0], 80) if np.any(anomaly_map > 0) else 0
binary_map = anomaly_map > threshold
# Label connected components
labeled_array, num_features = ndimage.label(binary_map)
# Extract each component
for label_idx in range(1, num_features + 1):
component_mask = labeled_array == label_idx
# Get bounding box
coords = np.argwhere(component_mask)
if len(coords) > 0:
y_min, x_min = coords.min(axis=0)
y_max, x_max = coords.max(axis=0)
# Calculate properties
area = np.sum(component_mask)
mean_intensity = np.mean(image[component_mask])
locations.append({
'bbox': {
'x_min': int(x_min),
'y_min': int(y_min),
'x_max': int(x_max),
'y_max': int(y_max)
},
'area': int(area),
'mean_intensity': float(mean_intensity),
'confidence': float(np.mean(anomaly_map[component_mask]))
})
return locations
def _extract_anomaly_features(
self,
image: np.ndarray,
anomaly_map: np.ndarray
) -> Dict[str, float]:
"""Extract features from anomalous regions"""
# Get anomalous regions
threshold = np.percentile(anomaly_map[anomaly_map > 0], 70) if np.any(anomaly_map > 0) else 0
anomaly_regions = image[anomaly_map > threshold]
if len(anomaly_regions) == 0:
anomaly_regions = image.flatten()
return {
'mean_intensity': float(np.mean(anomaly_regions)),
'std_intensity': float(np.std(anomaly_regions)),
'max_intensity': float(np.max(anomaly_regions)),
'area_ratio': float(np.sum(anomaly_map > threshold) / anomaly_map.size),
'compactness': self._calculate_compactness(anomaly_map > threshold)
}
def _estimate_pathology_confidence(
self,
pathology_type: str,
features: Dict[str, float],
anomaly_map: np.ndarray
) -> float:
"""
Estimate confidence for specific pathology type
(Simplified heuristic for demonstration)
"""
# Base confidence from anomaly intensity
base_confidence = float(np.mean(anomaly_map[anomaly_map > 0])) if np.any(anomaly_map > 0) else 0.1
# Adjust based on pathology type characteristics
adjustments = {
'tumor': features['area_ratio'] * 2.0,
'lesion': features['compactness'],
'hemorrhage': features['max_intensity'],
'atrophy': 1.0 - features['mean_intensity'],
'inflammation': features['std_intensity'],
'calcification': features['max_intensity'] * features['compactness']
}
adjustment = adjustments.get(pathology_type, 0.5)
confidence = base_confidence * adjustment
# Clip to [0, 1]
return float(np.clip(confidence, 0.0, 0.99))
def _estimate_severity(self, confidence: float) -> str:
"""Estimate severity based on confidence"""
if confidence >= 0.9:
return 'high'
elif confidence >= 0.75:
return 'moderate'
elif confidence >= 0.6:
return 'low'
else:
return 'minimal'
def _get_pathology_characteristics(self, pathology_type: str) -> str:
"""Get description of pathology characteristics"""
characteristics = {
'tumor': 'Localized mass with distinct boundaries',
'lesion': 'Area of abnormal tissue damage or change',
'hemorrhage': 'Evidence of bleeding or blood accumulation',
'atrophy': 'Tissue volume reduction or degradation',
'inflammation': 'Signs of tissue inflammation or edema',
'calcification': 'Calcium deposit accumulation',
'normal': 'No significant abnormalities detected'
}
return characteristics.get(pathology_type, 'Unknown pathology type')
def _calculate_compactness(self, binary_mask: np.ndarray) -> float:
"""Calculate compactness of binary region"""
area = np.sum(binary_mask)
if area == 0:
return 0.0
# Simple perimeter estimation
from scipy import ndimage
eroded = ndimage.binary_erosion(binary_mask)
perimeter = np.sum(binary_mask) - np.sum(eroded)
if perimeter == 0:
return 1.0
# Compactness = 4π * area / perimeter²
compactness = 4 * np.pi * area / (perimeter ** 2)
return float(np.clip(compactness, 0.0, 1.0))
def _calculate_overall_confidence(self, pathologies: List[Dict]) -> float:
"""Calculate overall detection confidence"""
if not pathologies:
return 0.5
# Weight by top detections
confidences = [p['confidence'] for p in pathologies[:3]]
return float(np.mean(confidences))
def _multi_scale_detection(self, image: np.ndarray) -> Dict[str, Any]:
"""Run detection at multiple scales"""
scales = [1.0, 0.75, 0.5]
results = {}
for scale in scales:
if scale != 1.0:
import cv2
h, w = image.shape[:2]
scaled = cv2.resize(image, (int(w * scale), int(h * scale)))
else:
scaled = image
anomaly_map = self._generate_anomaly_map(scaled)
results[f'scale_{scale}'] = {
'max_anomaly': float(np.max(anomaly_map)),
'mean_anomaly': float(np.mean(anomaly_map[anomaly_map > 0])) if np.any(anomaly_map > 0) else 0.0
}
return results