import numpy as np import torch import torch.nn as nn import matplotlib.pyplot as plt from matplotlib.patches import Rectangle from typing import Tuple, List, Dict, Optional from tqdm import tqdm from pathlib import Path from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import pickle # ============================================================================== # MODEL ARCHITECTURE # ============================================================================== class ResidualBlock(nn.Module): def __init__(self, channels: int): super().__init__() self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(channels) self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(channels) self.relu = nn.ReLU(inplace=True) def forward(self, x): residual = x out = self.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out += residual out = self.relu(out) return out class EncoderBlock(nn.Module): def __init__(self, in_channels: int, out_channels: int, downsample: bool = True): super().__init__() self.downsample = downsample self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU(inplace=True) self.res_block = ResidualBlock(out_channels) if downsample: self.pool = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=2, padding=1, bias=False) def forward(self, x): x = self.relu(self.bn(self.conv(x))) x = self.res_block(x) if self.downsample: skip = x x = self.pool(x) return x, skip else: return x class DecoderBlock(nn.Module): def __init__(self, in_channels: int, skip_channels: int, out_channels: int): super().__init__() self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True) self.conv = nn.Conv2d(in_channels + skip_channels, out_channels, kernel_size=3, padding=1, bias=False) self.bn = nn.BatchNorm2d(out_channels) self.relu = nn.ReLU(inplace=True) self.res_block = ResidualBlock(out_channels) def forward(self, x, skip): x = self.upsample(x) x = torch.cat([x, skip], dim=1) x = self.relu(self.bn(self.conv(x))) x = self.res_block(x) return x class ResUNetAutoencoder(nn.Module): def __init__(self, in_channels: int = 7, latent_dim: int = 256): super().__init__() # Encoder self.enc1 = EncoderBlock(in_channels, 32, downsample=True) self.enc2 = EncoderBlock(32, 64, downsample=True) self.enc3 = EncoderBlock(64, 128, downsample=True) self.bottleneck = EncoderBlock(128, 256, downsample=False) # Latent self.global_pool = nn.AdaptiveAvgPool2d(1) self.latent_dim = latent_dim # Decoder self.dec1 = DecoderBlock(256, 128, 128) self.dec2 = DecoderBlock(128, 64, 64) self.dec3 = DecoderBlock(64, 32, 32) self.final_conv = nn.Conv2d(32, in_channels, kernel_size=3, padding=1) def forward(self, x): # Encoder x, skip1 = self.enc1(x) x, skip2 = self.enc2(x) x, skip3 = self.enc3(x) x = self.bottleneck(x) # Latent latent = self.global_pool(x) latent = latent.view(latent.size(0), -1) # Decoder x = self.dec1(x, skip3) x = self.dec2(x, skip2) x = self.dec3(x, skip1) # Final Conv (Reduces 32 channels -> 7 channels) x = self.final_conv(x) reconstruction = torch.sigmoid(x) return reconstruction, latent def count_parameters(self): return sum(p.numel() for p in self.parameters() if p.requires_grad) # ============================================================================== # RECONSTRUCTION FUNCTIONS # ============================================================================== def reconstruct_full_aoi_corrected( patches: np.ndarray, metadata: List[Dict], patch_size: int = 64, use_weighted: bool = True ) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]: """Reconstruct full AOI from overlapping patches with proper blending Args: patches: Array of patches with shape (num_patches, channels, height, width) metadata: List of dicts containing 'row' and 'col' positions for each patch patch_size: Size of each patch (default 64) use_weighted: Whether to use weighted blending (default True) Returns: reconstructed: Full AOI reconstruction (channels, height, width) weight_map: Map showing patch overlap counts shape: Tuple of (height, width) of reconstructed AOI """ if len(patches) == 0: raise ValueError("No patches provided for reconstruction") num_patches = patches.shape[0] num_channels = patches.shape[1] print(f" Reconstructing from {num_patches} patches with {num_channels} channels...") # Determine full AOI dimensions max_row = max(m['row'] + patch_size for m in metadata) max_col = max(m['col'] + patch_size for m in metadata) print(f" Target dimensions: {max_row}Γ—{max_col} pixels") # Initialize accumulation arrays reconstructed = np.zeros((num_channels, max_row, max_col), dtype=np.float64) weight_map = np.zeros((max_row, max_col), dtype=np.float64) # Create blending weight template if use_weighted: y, x = np.ogrid[0:patch_size, 0:patch_size] center = patch_size // 2 dist_from_center = np.sqrt((y - center)**2 + (x - center)**2) max_dist = np.sqrt(2) * center blend_weight = 1.0 - (dist_from_center / max_dist) ** 2 blend_weight = np.clip(blend_weight, 0.1, 1.0) else: blend_weight = np.ones((patch_size, patch_size), dtype=np.float64) # Stitch all patches for i, meta in enumerate(metadata): row_start = meta['row'] col_start = meta['col'] row_end = min(row_start + patch_size, max_row) col_end = min(col_start + patch_size, max_col) patch_height = row_end - row_start patch_width = col_end - col_start if patch_height <= 0 or patch_width <= 0: continue patch_data = patches[i, :, :patch_height, :patch_width] patch_weight = blend_weight[:patch_height, :patch_width] for ch in range(num_channels): reconstructed[ch, row_start:row_end, col_start:col_end] += ( patch_data[ch] * patch_weight ) weight_map[row_start:row_end, col_start:col_end] += patch_weight # Normalize by weights weight_map_safe = np.where(weight_map > 0, weight_map, 1.0) for ch in range(num_channels): reconstructed[ch] = reconstructed[ch] / weight_map_safe # Set uncovered areas to NaN uncovered_mask = weight_map == 0 for ch in range(num_channels): reconstructed[ch][uncovered_mask] = np.nan reconstructed = reconstructed.astype(np.float32) weight_map = weight_map.astype(np.float32) # Report coverage statistics coverage = np.sum(weight_map > 0) / weight_map.size * 100 avg_overlap = np.mean(weight_map[weight_map > 0]) if np.any(weight_map > 0) else 0 print(f" Coverage: {coverage:.1f}% | Avg overlap: {avg_overlap:.2f}Γ—") if coverage < 50: print(f" ⚠️ WARNING: Only {coverage:.1f}% coverage!") return reconstructed, weight_map, (max_row, max_col) # ============================================================================== # ANOMALY COMPUTATION # ============================================================================== def compute_autoencoder_probabilities( model: nn.Module, patches: np.ndarray, metadata: List[Dict], device: torch.device, batch_size: int = 32, use_sigmoid_scores: bool = True ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute autoencoder anomaly probabilities Returns: prob_scores: (num_patches,) - Normalized probability scores [0, 1] pixel_scores: (num_patches, H, W) - Per-pixel anomaly maps reconstructed_patches: (num_patches, C, H, W) - Reconstructions """ model.eval() patches_torch = torch.from_numpy(patches).float() anomaly_scores = [] pixel_scores = [] reconstructed_patches = [] with torch.no_grad(): for i in tqdm(range(0, len(patches), batch_size), desc="Model 1 (Autoencoder)", leave=False): batch = patches_torch[i:i+batch_size].to(device) reconstruction, _ = model(batch) if use_sigmoid_scores: # Sigmoid uncertainty: higher = more anomalous sigmoid_uncertainty = 1.0 - torch.abs(reconstruction - 0.5) * 2 pixel_score = sigmoid_uncertainty.mean(dim=1) # Per pixel patch_score = sigmoid_uncertainty.mean(dim=(1, 2, 3)) # Per patch else: # MSE-based approach squared_error = (reconstruction - batch) ** 2 pixel_score = squared_error.mean(dim=1) patch_score = squared_error.mean(dim=(1, 2, 3)) anomaly_scores.append(patch_score.cpu().numpy()) pixel_scores.append(pixel_score.cpu().numpy()) reconstructed_patches.append(reconstruction.cpu().numpy()) anomaly_scores = np.concatenate(anomaly_scores) pixel_scores = np.concatenate(pixel_scores) reconstructed_patches = np.concatenate(reconstructed_patches) # NORMALIZE to [0, 1] probability range prob_scores = (anomaly_scores - anomaly_scores.min()) / (anomaly_scores.max() - anomaly_scores.min() + 1e-10) return prob_scores, pixel_scores, reconstructed_patches # ============================================================================== # TRAIN/TEST SPLIT # ============================================================================== def discover_and_split_aois( patches_dir: Path, train_ratio: float = 0.8 ) -> Tuple[List[str], List[str]]: """Discover all AOIs and split into train/test sets Args: patches_dir: Directory containing AOI_xxxx_important_patches.npz files train_ratio: Fraction of AOIs for training (default 0.8) Returns: train_aoi_names: List of AOI names for training test_aoi_names: List of AOI names for testing """ # Find all important_patches files important_files = sorted(list(patches_dir.glob("AOI_*_important_patches.npz"))) if len(important_files) == 0: raise FileNotFoundError(f"No *_important_patches.npz files found in {patches_dir}") # Extract AOI names (remove suffix) aoi_names = [f.stem.replace('_important_patches', '') for f in important_files] print(f"\nπŸ“‚ Discovered {len(aoi_names)} AOIs in {patches_dir}") print(f" Examples: {', '.join(aoi_names[:5])}") # Shuffle for random split np.random.shuffle(aoi_names) split_idx = int(len(aoi_names) * train_ratio) train_aoi_names = aoi_names[:split_idx] test_aoi_names = aoi_names[split_idx:] print(f"\nπŸ“Š AOI-Level Split:") print(f" Total AOIs: {len(aoi_names)}") print(f" Training AOIs: {len(train_aoi_names)}") print(f" Testing AOIs: {len(test_aoi_names)}") return train_aoi_names, test_aoi_names # ============================================================================== # VISUALIZATION # ============================================================================== def visualize_full_aoi_anomalies( aoi_name: str, model: nn.Module, patches_dir: Path, device: torch.device, top_k: int = 20, channel_names: Optional[List[str]] = None, save_path: Optional[str] = None, use_sigmoid_scores: bool = True # ADD THIS LINE ): """Visualize anomalies on full AOI reconstruction using ALL patches Args: aoi_name: Name of the AOI to visualize model: Trained autoencoder model patches_dir: Directory containing patch files device: torch device to use top_k: Number of top anomalies to highlight channel_names: List of channel names (optional) save_path: Path to save visualization (optional) use_sigmoid_scores: If True, use sigmoid-based scoring; if False, use MSE Returns: anomaly_scores: Anomaly score per patch anomaly_heatmap: Full-resolution anomaly heatmap original_full: Original reconstructed AOI reconstructed_full: Model-reconstructed AOI """ if channel_names is None: channel_names = ['DTM', 'Slope', 'Roughness', 'NDVI', 'NDWI', 'FlowAcc', 'FlowDir'] score_type = "Sigmoid Score" if use_sigmoid_scores else "MSE" print(f"\n{'='*70}") print(f"FULL AOI ANOMALY DETECTION: {aoi_name}") print(f"Scoring Method: {score_type}") print(f"{'='*70}") # Load ALL patches all_patches_file = patches_dir / f"{aoi_name}_all_patches.npz" if not all_patches_file.exists(): raise FileNotFoundError(f"All patches file not found: {all_patches_file}") print(f" Loading ALL patches from: {all_patches_file.name}") with np.load(all_patches_file, allow_pickle=True) as data: patches = data['patches'] metadata = list(data['metadata']) patches = np.nan_to_num(patches, nan=0.0, posinf=0.0, neginf=0.0) print(f" Patches loaded: {len(patches)}") # Compute anomalies print(f" πŸ”¬ Computing anomaly scores ({score_type})...") anomaly_scores, pixel_scores, recon_patches, original_patches = compute_full_aoi_anomalies( model, patches, metadata, device, use_sigmoid_scores=use_sigmoid_scores ) # Reconstruct full AOI print(f" πŸ”¨ Reconstructing original AOI...") original_full, weight_map_orig, shape = reconstruct_full_aoi_corrected( patches, metadata, use_weighted=True ) print(f" πŸ”¨ Reconstructing from model output...") reconstructed_full, weight_map_recon, _ = reconstruct_full_aoi_corrected( recon_patches, metadata, use_weighted=True ) # Create anomaly heatmap print(f" πŸ—ΊοΈ Creating anomaly heatmap...") anomaly_heatmap = np.zeros(shape, dtype=np.float32) anomaly_count = np.zeros(shape, dtype=np.float32) for i, meta in enumerate(metadata): row = meta['row'] col = meta['col'] row_end = min(row + 64, shape[0]) col_end = min(col + 64, shape[1]) patch_h = row_end - row patch_w = col_end - col if patch_h > 0 and patch_w > 0: anomaly_heatmap[row:row_end, col:col_end] += pixel_scores[i, :patch_h, :patch_w] anomaly_count[row:row_end, col:col_end] += 1 anomaly_count = np.maximum(anomaly_count, 1) anomaly_heatmap = anomaly_heatmap / anomaly_count # Find top anomalies top_indices = np.argsort(anomaly_scores)[-top_k:][::-1] # Create visualization print(f" 🎨 Creating visualization...") fig = plt.figure(figsize=(24, 18)) gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3) # Row 1: Original channels for i, (ch_idx, ch_name) in enumerate([(0, 'DTM'), (3, 'NDVI'), (4, 'NDWI')]): ax = fig.add_subplot(gs[0, i]) data = original_full[ch_idx] data_masked = np.ma.masked_invalid(data) cmap = 'terrain' if ch_idx == 0 else 'RdYlGn' im = ax.imshow(data_masked, cmap=cmap, interpolation='bilinear') ax.set_title(f'{aoi_name}\n{ch_name} (Original)', fontsize=12, fontweight='bold') ax.axis('off') plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04) # Row 2: Reconstruction quality ax = fig.add_subplot(gs[1, 0]) recon_masked = np.ma.masked_invalid(reconstructed_full[0]) im = ax.imshow(recon_masked, cmap='terrain', interpolation='bilinear') ax.set_title('DTM Reconstruction', fontsize=12, fontweight='bold') ax.axis('off') plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04) ax = fig.add_subplot(gs[1, 1]) error_map = np.abs(original_full[0] - reconstructed_full[0]) error_masked = np.ma.masked_invalid(error_map) im = ax.imshow(error_masked, cmap='Reds', interpolation='bilinear') ax.set_title('Absolute Error (DTM)', fontsize=12, fontweight='bold') ax.axis('off') plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04) ax = fig.add_subplot(gs[1, 2]) im = ax.imshow(weight_map_orig, cmap='viridis', interpolation='nearest') coverage_pct = np.sum(weight_map_orig > 0) / weight_map_orig.size * 100 ax.set_title(f'Patch Overlap\n(coverage: {coverage_pct:.1f}%)', fontsize=12, fontweight='bold') ax.axis('off') plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04) # Row 3: Anomaly detection ax = fig.add_subplot(gs[2, 0]) im = ax.imshow(anomaly_heatmap, cmap='hot', interpolation='bilinear') title = f'Anomaly Heatmap ({score_type})' ax.set_title(title, fontsize=12, fontweight='bold') ax.axis('off') plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04, label='Anomaly Score') ax = fig.add_subplot(gs[2, 1]) ax.imshow(original_full[0], cmap='terrain', interpolation='bilinear', alpha=0.6) im = ax.imshow(anomaly_heatmap, cmap='hot', interpolation='bilinear', alpha=0.5) for idx in top_indices[:10]: row = metadata[idx]['row'] col = metadata[idx]['col'] rect = Rectangle((col, row), 64, 64, linewidth=2, edgecolor='cyan', facecolor='none', linestyle='--') ax.add_patch(rect) ax.set_title(f'Top-{min(10, top_k)} Anomalies\n(cyan boxes)', fontsize=12, fontweight='bold') ax.axis('off') ax = fig.add_subplot(gs[2, 2]) ax.axis('off') for i in range(min(6, len(top_indices))): idx = top_indices[i] row = metadata[idx]['row'] col = metadata[idx]['col'] row_end = min(row + 64, shape[0]) col_end = min(col + 64, shape[1]) patch_dtm = original_full[0, row:row_end, col:col_end] patch_error = pixel_scores[idx] sub_row = i // 3 sub_col = i % 3 sub_ax = ax.inset_axes([sub_col*0.33, (1-sub_row*0.5)-0.45, 0.3, 0.42]) sub_ax.imshow(patch_dtm, cmap='terrain', interpolation='nearest') sub_ax.imshow(patch_error, cmap='hot', interpolation='nearest', alpha=0.5) sub_ax.set_title(f'#{i+1}: {anomaly_scores[idx]:.4f}', fontsize=8) sub_ax.axis('off') ax.set_title(f'Top-6 Anomalous Patches', fontsize=12, fontweight='bold', pad=10) plt.suptitle(f'{aoi_name} - Full AOI Anomaly Detection ({score_type})\n{shape[0]}Γ—{shape[1]} pixels from {len(patches)} patches', fontsize=14, fontweight='bold', y=0.995) if save_path: plt.savefig(save_path, dpi=150, bbox_inches='tight') print(f" πŸ’Ύ Saved: {save_path}") plt.show() # Print statistics print(f"\n πŸ“Š Anomaly Statistics ({score_type}):") print(f" Mean score: {np.mean(anomaly_scores):.6f}") print(f" Std score: {np.std(anomaly_scores):.6f}") print(f" 95th percentile: {np.percentile(anomaly_scores, 95):.6f}") print(f" 99th percentile: {np.percentile(anomaly_scores, 99):.6f}") print(f"\n πŸ” Top-{min(10, top_k)} Anomalies:") for i, idx in enumerate(top_indices[:10]): row = metadata[idx]['row'] col = metadata[idx]['col'] print(f" {i+1:2d}. Patch at ({row:4d}, {col:4d}) - Score: {anomaly_scores[idx]:.6f}") return anomaly_scores, anomaly_heatmap, original_full, reconstructed_full """ utils.py ======== Reusable utilities for Isolation Forest anomaly detection pipeline. Contains model architectures, data processing, and core operations. """ # ============================================================================== # DATA PROCESSING # ============================================================================== def load_patches(patches_file: Path) -> Tuple[np.ndarray, List[Dict]]: """Load patches from .npz file and sanitize data""" with np.load(patches_file, allow_pickle=True) as data: patches = data['patches'] metadata = list(data['metadata']) patches = np.nan_to_num(patches, nan=0.0, posinf=0.0, neginf=0.0) return patches, metadata def reconstruct_full_aoi_corrected( patches: np.ndarray, metadata: List[Dict], patch_size: int = 64, use_weighted: bool = True ) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]: """Reconstruct full AOI from overlapping patches with proper blending""" if len(patches) == 0: raise ValueError("No patches provided for reconstruction") num_patches = patches.shape[0] num_channels = patches.shape[1] # Determine full AOI dimensions max_row = max(m['row'] + patch_size for m in metadata) max_col = max(m['col'] + patch_size for m in metadata) # Initialize accumulation arrays reconstructed = np.zeros((num_channels, max_row, max_col), dtype=np.float64) weight_map = np.zeros((max_row, max_col), dtype=np.float64) # Create blending weight template if use_weighted: y, x = np.ogrid[0:patch_size, 0:patch_size] center = patch_size // 2 dist_from_center = np.sqrt((y - center)**2 + (x - center)**2) max_dist = np.sqrt(2) * center blend_weight = 1.0 - (dist_from_center / max_dist) ** 2 blend_weight = np.clip(blend_weight, 0.1, 1.0) else: blend_weight = np.ones((patch_size, patch_size), dtype=np.float64) # Stitch all patches for i, meta in enumerate(metadata): row_start = meta['row'] col_start = meta['col'] row_end = min(row_start + patch_size, max_row) col_end = min(col_start + patch_size, max_col) patch_height = row_end - row_start patch_width = col_end - col_start if patch_height <= 0 or patch_width <= 0: continue patch_data = patches[i, :, :patch_height, :patch_width] patch_weight = blend_weight[:patch_height, :patch_width] for ch in range(num_channels): reconstructed[ch, row_start:row_end, col_start:col_end] += ( patch_data[ch] * patch_weight ) weight_map[row_start:row_end, col_start:col_end] += patch_weight # Normalize by weights weight_map_safe = np.where(weight_map > 0, weight_map, 1.0) for ch in range(num_channels): reconstructed[ch] = reconstructed[ch] / weight_map_safe # Set uncovered areas to NaN uncovered_mask = weight_map == 0 for ch in range(num_channels): reconstructed[ch][uncovered_mask] = np.nan reconstructed = reconstructed.astype(np.float32) weight_map = weight_map.astype(np.float32) return reconstructed, weight_map, (max_row, max_col) # ============================================================================== # EMBEDDING EXTRACTION # ============================================================================== def extract_embeddings( encoder: nn.Module, patches: np.ndarray, device: torch.device, batch_size: int = 32 ) -> np.ndarray: """Extract embeddings from patches using encoder""" encoder.eval() patches_torch = torch.from_numpy(patches).float() embeddings = [] with torch.no_grad(): for i in tqdm(range(0, len(patches), batch_size), desc="Extracting embeddings", leave=False): batch = patches_torch[i:i+batch_size].to(device) embedding = encoder(batch) embeddings.append(embedding.cpu().numpy()) embeddings = np.concatenate(embeddings, axis=0) return embeddings # ============================================================================== # ISOLATION FOREST OPERATIONS # ============================================================================== def train_isolation_forest( embeddings: np.ndarray, contamination: float = 0.05, n_estimators: int = 100, max_samples: str = 'auto', random_state: int = 42 ) -> Tuple[IsolationForest, StandardScaler]: """ Train Isolation Forest on embeddings Args: embeddings: (N, embedding_dim) array contamination: Expected proportion of outliers n_estimators: Number of trees max_samples: Samples to draw for each tree random_state: Random seed Returns: Trained IsolationForest model and fitted scaler """ print(f"\n🌲 Training Isolation Forest") print(f" Embeddings shape: {embeddings.shape}") print(f" Contamination: {contamination:.1%}") print(f" Trees: {n_estimators}") # Standardize embeddings scaler = StandardScaler() embeddings_scaled = scaler.fit_transform(embeddings) # Train Isolation Forest iforest = IsolationForest( contamination=contamination, n_estimators=n_estimators, max_samples=max_samples, random_state=random_state, n_jobs=-1, verbose=1 ) iforest.fit(embeddings_scaled) # Get anomaly scores scores = iforest.decision_function(embeddings_scaled) predictions = iforest.predict(embeddings_scaled) n_anomalies = np.sum(predictions == -1) print(f" βœ“ Training complete") print(f" Detected anomalies: {n_anomalies} ({n_anomalies/len(predictions)*100:.2f}%)") print(f" Score range: [{scores.min():.4f}, {scores.max():.4f}]") return iforest, scaler def compute_isolation_forest_anomalies( encoder: nn.Module, iforest: IsolationForest, scaler: StandardScaler, patches: np.ndarray, device: torch.device, batch_size: int = 32 ) -> Tuple[np.ndarray, np.ndarray]: """ Compute anomaly scores and predictions for patches Returns: probability_scores: Normalized anomaly probabilities [0, 1] predictions: Binary predictions (-1 = anomaly, 1 = normal) """ # Extract embeddings embeddings = extract_embeddings(encoder, patches, device, batch_size) embeddings_scaled = scaler.transform(embeddings) # Get raw scores (Negative = Anomaly, Positive = Normal) raw_scores = iforest.decision_function(embeddings_scaled) predictions = iforest.predict(embeddings_scaled) # Invert so Higher = More Anomalous inverted_scores = -raw_scores # Convert to probability (0.0 to 1.0) min_s = inverted_scores.min() max_s = inverted_scores.max() # Min-Max Normalization probability_scores = (inverted_scores - min_s) / (max_s - min_s + 1e-10) return probability_scores, predictions def save_model(iforest: IsolationForest, scaler: StandardScaler, save_path: str): """Save trained Isolation Forest model and scaler""" with open(save_path, 'wb') as f: pickle.dump({'iforest': iforest, 'scaler': scaler}, f) print(f"πŸ’Ύ Saved model: {save_path}") def load_model(model_path: str) -> Tuple[IsolationForest, StandardScaler]: """Load trained Isolation Forest model and scaler""" with open(model_path, 'rb') as f: data = pickle.load(f) print(f"βœ“ Loaded model from: {model_path}") return data['iforest'], data['scaler'] # ============================================================================== # MISSING: ResUNetEncoder Class (for Isolation Forest) # ============================================================================== """ Fixed ResUNetEncoder with proper weight loading """ class ResUNetEncoder(nn.Module): """Encoder-only version for embedding extraction""" def __init__(self, in_channels: int = 7, embedding_dim: int = 128): super().__init__() # Encoder (must match autoencoder architecture) self.enc1 = EncoderBlock(in_channels, 32, downsample=True) self.enc2 = EncoderBlock(32, 64, downsample=True) self.enc3 = EncoderBlock(64, 128, downsample=True) self.bottleneck = EncoderBlock(128, 256, downsample=False) # Embedding projection self.global_pool = nn.AdaptiveAvgPool2d(1) self.embedding_dim = embedding_dim # Bottleneck outputs 256 channels, project to embedding_dim self.projection = nn.Linear(256, embedding_dim) def forward(self, x): # Encoder path x, _ = self.enc1(x) x, _ = self.enc2(x) x, _ = self.enc3(x) x = self.bottleneck(x) # Global pooling: (batch, 256, H, W) -> (batch, 256, 1, 1) x = self.global_pool(x) # Flatten: (batch, 256, 1, 1) -> (batch, 256) x = x.view(x.size(0), -1) # Project to target embedding dimension: (batch, 256) -> (batch, embedding_dim) embedding = self.projection(x) return embedding def load_from_autoencoder(self, autoencoder_path: str): """Load encoder weights from trained autoencoder""" print(f"Loading encoder weights from: {autoencoder_path}") # Load full autoencoder state dict autoencoder_state = torch.load(autoencoder_path, map_location='cpu') # Filter to get only encoder + bottleneck weights encoder_dict = {} for k, v in autoencoder_state.items(): if k.startswith(('enc1.', 'enc2.', 'enc3.', 'bottleneck.')): encoder_dict[k] = v # Load encoder weights (projection layer will remain randomly initialized) missing, unexpected = self.load_state_dict(encoder_dict, strict=False) # Verify what was loaded print(f"βœ“ Loaded {len(encoder_dict)} encoder layers") print(f" Missing keys (expected): {len(missing)} - {missing[:3]}..." if len(missing) > 3 else f" Missing keys: {missing}") # The projection layer should be in missing keys (it's not in autoencoder) if 'projection.weight' not in missing or 'projection.bias' not in missing: print("⚠️ Warning: Projection layer might have been incorrectly loaded") else: print(f"βœ“ Projection layer (256 β†’ {self.embedding_dim}) will be trained from scratch") # ============================================================================== # MISSING: Isolation Forest Probability Computation # ============================================================================== def compute_iforest_probabilities( encoder: nn.Module, iforest: IsolationForest, scaler: StandardScaler, patches: np.ndarray, metadata: List[Dict], device: torch.device, batch_size: int = 32 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute Isolation Forest anomaly probabilities Returns: prob_scores: (num_patches,) - Normalized probability scores [0, 1] embeddings: (num_patches, embedding_dim) - Latent embeddings predictions: (num_patches,) - Binary predictions (-1=anomaly, 1=normal) """ # Extract embeddings embeddings = extract_embeddings(encoder, patches, device, batch_size) embeddings_scaled = scaler.transform(embeddings) # Get raw scores raw_scores = iforest.decision_function(embeddings_scaled) predictions = iforest.predict(embeddings_scaled) # Invert so Higher = More Anomalous inverted_scores = -raw_scores # Normalize to [0, 1] min_s = inverted_scores.min() max_s = inverted_scores.max() prob_scores = (inverted_scores - min_s) / (max_s - min_s + 1e-10) return prob_scores, embeddings, predictions # ============================================================================== # MISSING: Template Matching Probabilities (Model 3 placeholder) # ============================================================================== def compute_template_matching_probabilities( patches: np.ndarray, metadata: List[Dict], template_library: Dict[str, np.ndarray], method: str = 'correlation' ) -> Tuple[np.ndarray, np.ndarray]: """ Compute template matching probabilities (PLACEHOLDER - implement your method) Returns: prob_scores: (num_patches,) - Match probability scores [0, 1] similarity_scores: (num_patches, num_templates) - Raw similarity to each template """ from scipy.signal import correlate2d num_patches = len(patches) num_templates = len(template_library) similarity_scores = np.zeros((num_patches, num_templates)) # Extract DTM channel (channel 0) patch_dtm = patches[:, 0, :, :] for i, patch in enumerate(tqdm(patch_dtm, desc="Model 3 (Template Matching)", leave=False)): for j, (template_name, template) in enumerate(template_library.items()): if method == 'correlation': corr = correlate2d(patch, template, mode='valid') if corr.size > 0: similarity = np.max(corr) / (np.linalg.norm(patch) * np.linalg.norm(template) + 1e-10) else: similarity = 0.0 else: # Add other methods as needed similarity = 0.0 similarity_scores[i, j] = similarity # Use max similarity across templates max_similarity = similarity_scores.max(axis=1) # Normalize to [0, 1] prob_scores = (max_similarity - max_similarity.min()) / (max_similarity.max() - max_similarity.min() + 1e-10) return prob_scores, similarity_scores # ============================================================================== # MISSING: Probability Matrix Storage # ============================================================================== def save_probability_matrix( aoi_name: str, probability_matrix: np.ndarray, metadata: List[Dict], save_dir: Path = Path('./probability_matrices') ): """Save probability matrix for later use by the gate model""" save_dir.mkdir(exist_ok=True, parents=True) save_path = save_dir / f"{aoi_name}_probability_matrix.npz" np.savez_compressed( save_path, probability_matrix=probability_matrix, metadata=metadata, column_names=['prob_autoencoder', 'prob_iforest', 'prob_template', 'sim_score'] ) print(f"πŸ’Ύ Saved probability matrix: {save_path}") def load_probability_matrix( aoi_name: str, load_dir: Path = Path('./probability_matrices') ) -> Tuple[np.ndarray, List[Dict]]: """Load saved probability matrix""" load_path = load_dir / f"{aoi_name}_probability_matrix.npz" with np.load(load_path, allow_pickle=True) as data: probability_matrix = data['probability_matrix'] metadata = data['metadata'] print(f"βœ“ Loaded probability matrix from: {load_path}") print(f" Shape: {probability_matrix.shape}") return probability_matrix, metadata # ============================================================================== # MODEL 3: SIGNATURE MATCHING UTILITIES # ============================================================================== def compute_cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float: """Compute cosine similarity between two vectors""" dot_product = np.dot(vec1, vec2) norm1 = np.linalg.norm(vec1) norm2 = np.linalg.norm(vec2) if norm1 == 0 or norm2 == 0: return 0.0 return dot_product / (norm1 * norm2) def compute_signature_scores( latent: np.ndarray, centroids: np.ndarray, scaler: Optional[StandardScaler] = None ) -> Dict: """Compute signature matching scores for a single latent vector""" if scaler is not None: latent_scaled = scaler.transform(latent.reshape(1, -1))[0] else: latent_scaled = latent n_clusters = centroids.shape[0] similarities = np.zeros(n_clusters) for i in range(n_clusters): similarities[i] = compute_cosine_similarity(latent_scaled, centroids[i]) similarities_normalized = (similarities + 1.0) / 2.0 best_cluster = int(np.argmax(similarities_normalized)) best_score = float(similarities_normalized[best_cluster]) return { "cluster_scores": similarities_normalized.tolist(), "best_cluster": best_cluster, "best_score": best_score, "max_similarity": best_score } def compute_kmeans_probabilities( encoder: nn.Module, kmeans_model, scaler: Optional[StandardScaler], patches: np.ndarray, metadata: List[Dict], device: torch.device, batch_size: int = 32 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute K-Means signature matching probabilities Returns: prob_scores: (num_patches,) - Match probability scores [0, 1] similarity_scores: (num_patches,) - Max similarity to any cluster cluster_assignments: (num_patches,) - Best matching cluster ID """ embeddings = extract_embeddings(encoder, patches, device, batch_size) centroids = kmeans_model.cluster_centers_ n_patches = len(embeddings) prob_scores = np.zeros(n_patches) similarity_scores = np.zeros(n_patches) cluster_assignments = np.zeros(n_patches, dtype=int) for i in tqdm(range(n_patches), desc="Model 3 (K-Means Matching)", leave=False): scores = compute_signature_scores(embeddings[i], centroids, scaler) prob_scores[i] = scores['best_score'] similarity_scores[i] = scores['max_similarity'] cluster_assignments[i] = scores['best_cluster'] return prob_scores, similarity_scores, cluster_assignments def load_kmeans_model(model_path: str): """Load trained K-Means model and scaler""" with open(model_path, 'rb') as f: data = pickle.load(f) print(f"βœ“ Loaded K-Means model from: {model_path}") # Handle different storage formats if isinstance(data, dict): kmeans = data.get('kmeans', data.get('model', None)) scaler = data.get('scaler', None) else: # Assume it's the KMeans object directly kmeans = data scaler = None print("⚠️ Warning: No scaler found, loaded KMeans object only") if kmeans is None: raise ValueError(f"Could not find KMeans model in {model_path}") return kmeans, scaler # ============================================================================== # ADD THESE FUNCTIONS TO YOUR EXISTING utils.py # ============================================================================== def load_unified_probability_matrix( aoi_name: str, load_dir: Path = Path('./unified_probability_matrices') ) -> Tuple[np.ndarray, List[Dict], List[str]]: """ Load saved unified probability matrix Args: aoi_name: Name of AOI (e.g., "AOI_0001") load_dir: Directory containing saved matrices Returns: unified_matrix: Shape (num_patches, 64, 64, 4) metadata: List of patch metadata dicts channel_names: List of channel names """ load_path = load_dir / f"{aoi_name}_unified_prob_matrix.npz" if not load_path.exists(): raise FileNotFoundError(f"Unified matrix not found: {load_path}") with np.load(load_path, allow_pickle=True) as data: unified_matrix = data['unified_matrix'] metadata = data['metadata'] channel_names = data['channel_names'] print(f"βœ“ Loaded unified probability matrix from: {load_path}") print(f" Shape: {unified_matrix.shape}") print(f" Channels: {list(channel_names)}") return unified_matrix, metadata, channel_names def broadcast_patch_scores_to_pixels( patch_scores: np.ndarray, patch_size: int = 64 ) -> np.ndarray: """ Broadcast patch-level scores to pixel-level Args: patch_scores: Shape (num_patches,) or (num_patches, 1) patch_size: Size of patches (default 64) Returns: pixel_scores: Shape (num_patches, patch_size, patch_size) """ if patch_scores.ndim == 1: patch_scores = patch_scores[:, np.newaxis, np.newaxis] elif patch_scores.ndim == 2 and patch_scores.shape[1] == 1: patch_scores = patch_scores[:, :, np.newaxis] num_patches = patch_scores.shape[0] pixel_scores = np.broadcast_to( patch_scores, (num_patches, patch_size, patch_size) ).copy() return pixel_scores def normalize_scores(scores: np.ndarray) -> np.ndarray: """ Normalize scores to [0, 1] range using min-max scaling Args: scores: Input scores of any shape Returns: normalized_scores: Same shape as input, values in [0, 1] """ min_val = scores.min() max_val = scores.max() if max_val - min_val < 1e-10: # All values are the same, return zeros or ones return np.zeros_like(scores) normalized = (scores - min_val) / (max_val - min_val) return normalized.astype(np.float32) def save_unified_probability_matrix( aoi_name: str, unified_matrix: np.ndarray, metadata: List[Dict], save_dir: Path = Path('./unified_probability_matrices'), channel_names: List[str] = None ): """ Save unified probability matrix to disk Args: aoi_name: Name of AOI unified_matrix: Shape (num_patches, 64, 64, 4) metadata: Patch metadata save_dir: Directory to save to channel_names: Names of the 4 channels """ if channel_names is None: channel_names = ['prob_autoencoder', 'prob_iforest', 'prob_kmeans', 'sim_score'] save_dir.mkdir(exist_ok=True, parents=True) save_path = save_dir / f"{aoi_name}_unified_prob_matrix.npz" np.savez_compressed( save_path, unified_matrix=unified_matrix, metadata=metadata, channel_names=channel_names, aoi_name=aoi_name ) print(f"πŸ’Ύ Saved unified probability matrix: {save_path}") print(f" Shape: {unified_matrix.shape}") print(f" Channels: {channel_names}") # Add this function to utils.py to fix the missing function issue def compute_full_aoi_anomalies( model: nn.Module, patches: np.ndarray, metadata: List[Dict], device: torch.device, batch_size: int = 32, use_sigmoid_scores: bool = True ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Compute autoencoder anomaly scores for full AOI visualization Returns: anomaly_scores: (num_patches,) - Per-patch anomaly scores pixel_scores: (num_patches, H, W) - Per-pixel anomaly maps reconstructed_patches: (num_patches, C, H, W) - Model reconstructions original_patches: (num_patches, C, H, W) - Original input patches """ model.eval() patches_torch = torch.from_numpy(patches).float() anomaly_scores = [] pixel_scores = [] reconstructed_patches = [] with torch.no_grad(): for i in tqdm(range(0, len(patches), batch_size), desc="Computing anomalies", leave=False): batch = patches_torch[i:i+batch_size].to(device) reconstruction, _ = model(batch) if use_sigmoid_scores: # Sigmoid uncertainty: higher = more anomalous sigmoid_uncertainty = 1.0 - torch.abs(reconstruction - 0.5) * 2 pixel_score = sigmoid_uncertainty.mean(dim=1) # Per pixel patch_score = sigmoid_uncertainty.mean(dim=(1, 2, 3)) # Per patch else: # MSE-based approach squared_error = (reconstruction - batch) ** 2 pixel_score = squared_error.mean(dim=1) patch_score = squared_error.mean(dim=(1, 2, 3)) anomaly_scores.append(patch_score.cpu().numpy()) pixel_scores.append(pixel_score.cpu().numpy()) reconstructed_patches.append(reconstruction.cpu().numpy()) anomaly_scores = np.concatenate(anomaly_scores) pixel_scores = np.concatenate(pixel_scores) reconstructed_patches = np.concatenate(reconstructed_patches) # Return original patches as well (convert to numpy) original_patches = patches return anomaly_scores, pixel_scores, reconstructed_patches, original_patches