SONAR / utils.py
arnavmishra4's picture
Upload 9 files
0b37019 verified
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from typing import Tuple, List, Dict, Optional
from tqdm import tqdm
from pathlib import Path
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pickle
# ==============================================================================
# MODEL ARCHITECTURE
# ==============================================================================
class ResidualBlock(nn.Module):
def __init__(self, channels: int):
super().__init__()
self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(channels)
self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
residual = x
out = self.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += residual
out = self.relu(out)
return out
class EncoderBlock(nn.Module):
def __init__(self, in_channels: int, out_channels: int, downsample: bool = True):
super().__init__()
self.downsample = downsample
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.res_block = ResidualBlock(out_channels)
if downsample:
self.pool = nn.Conv2d(out_channels, out_channels, kernel_size=3,
stride=2, padding=1, bias=False)
def forward(self, x):
x = self.relu(self.bn(self.conv(x)))
x = self.res_block(x)
if self.downsample:
skip = x
x = self.pool(x)
return x, skip
else:
return x
class DecoderBlock(nn.Module):
def __init__(self, in_channels: int, skip_channels: int, out_channels: int):
super().__init__()
self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
self.conv = nn.Conv2d(in_channels + skip_channels, out_channels,
kernel_size=3, padding=1, bias=False)
self.bn = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.res_block = ResidualBlock(out_channels)
def forward(self, x, skip):
x = self.upsample(x)
x = torch.cat([x, skip], dim=1)
x = self.relu(self.bn(self.conv(x)))
x = self.res_block(x)
return x
class ResUNetAutoencoder(nn.Module):
def __init__(self, in_channels: int = 7, latent_dim: int = 256):
super().__init__()
# Encoder
self.enc1 = EncoderBlock(in_channels, 32, downsample=True)
self.enc2 = EncoderBlock(32, 64, downsample=True)
self.enc3 = EncoderBlock(64, 128, downsample=True)
self.bottleneck = EncoderBlock(128, 256, downsample=False)
# Latent
self.global_pool = nn.AdaptiveAvgPool2d(1)
self.latent_dim = latent_dim
# Decoder
self.dec1 = DecoderBlock(256, 128, 128)
self.dec2 = DecoderBlock(128, 64, 64)
self.dec3 = DecoderBlock(64, 32, 32)
self.final_conv = nn.Conv2d(32, in_channels, kernel_size=3, padding=1)
def forward(self, x):
# Encoder
x, skip1 = self.enc1(x)
x, skip2 = self.enc2(x)
x, skip3 = self.enc3(x)
x = self.bottleneck(x)
# Latent
latent = self.global_pool(x)
latent = latent.view(latent.size(0), -1)
# Decoder
x = self.dec1(x, skip3)
x = self.dec2(x, skip2)
x = self.dec3(x, skip1)
# Final Conv (Reduces 32 channels -> 7 channels)
x = self.final_conv(x)
reconstruction = torch.sigmoid(x)
return reconstruction, latent
def count_parameters(self):
return sum(p.numel() for p in self.parameters() if p.requires_grad)
# ==============================================================================
# RECONSTRUCTION FUNCTIONS
# ==============================================================================
def reconstruct_full_aoi_corrected(
patches: np.ndarray,
metadata: List[Dict],
patch_size: int = 64,
use_weighted: bool = True
) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
"""Reconstruct full AOI from overlapping patches with proper blending
Args:
patches: Array of patches with shape (num_patches, channels, height, width)
metadata: List of dicts containing 'row' and 'col' positions for each patch
patch_size: Size of each patch (default 64)
use_weighted: Whether to use weighted blending (default True)
Returns:
reconstructed: Full AOI reconstruction (channels, height, width)
weight_map: Map showing patch overlap counts
shape: Tuple of (height, width) of reconstructed AOI
"""
if len(patches) == 0:
raise ValueError("No patches provided for reconstruction")
num_patches = patches.shape[0]
num_channels = patches.shape[1]
print(f" Reconstructing from {num_patches} patches with {num_channels} channels...")
# Determine full AOI dimensions
max_row = max(m['row'] + patch_size for m in metadata)
max_col = max(m['col'] + patch_size for m in metadata)
print(f" Target dimensions: {max_row}×{max_col} pixels")
# Initialize accumulation arrays
reconstructed = np.zeros((num_channels, max_row, max_col), dtype=np.float64)
weight_map = np.zeros((max_row, max_col), dtype=np.float64)
# Create blending weight template
if use_weighted:
y, x = np.ogrid[0:patch_size, 0:patch_size]
center = patch_size // 2
dist_from_center = np.sqrt((y - center)**2 + (x - center)**2)
max_dist = np.sqrt(2) * center
blend_weight = 1.0 - (dist_from_center / max_dist) ** 2
blend_weight = np.clip(blend_weight, 0.1, 1.0)
else:
blend_weight = np.ones((patch_size, patch_size), dtype=np.float64)
# Stitch all patches
for i, meta in enumerate(metadata):
row_start = meta['row']
col_start = meta['col']
row_end = min(row_start + patch_size, max_row)
col_end = min(col_start + patch_size, max_col)
patch_height = row_end - row_start
patch_width = col_end - col_start
if patch_height <= 0 or patch_width <= 0:
continue
patch_data = patches[i, :, :patch_height, :patch_width]
patch_weight = blend_weight[:patch_height, :patch_width]
for ch in range(num_channels):
reconstructed[ch, row_start:row_end, col_start:col_end] += (
patch_data[ch] * patch_weight
)
weight_map[row_start:row_end, col_start:col_end] += patch_weight
# Normalize by weights
weight_map_safe = np.where(weight_map > 0, weight_map, 1.0)
for ch in range(num_channels):
reconstructed[ch] = reconstructed[ch] / weight_map_safe
# Set uncovered areas to NaN
uncovered_mask = weight_map == 0
for ch in range(num_channels):
reconstructed[ch][uncovered_mask] = np.nan
reconstructed = reconstructed.astype(np.float32)
weight_map = weight_map.astype(np.float32)
# Report coverage statistics
coverage = np.sum(weight_map > 0) / weight_map.size * 100
avg_overlap = np.mean(weight_map[weight_map > 0]) if np.any(weight_map > 0) else 0
print(f" Coverage: {coverage:.1f}% | Avg overlap: {avg_overlap:.2f}×")
if coverage < 50:
print(f" ⚠️ WARNING: Only {coverage:.1f}% coverage!")
return reconstructed, weight_map, (max_row, max_col)
# ==============================================================================
# ANOMALY COMPUTATION
# ==============================================================================
def compute_autoencoder_probabilities(
model: nn.Module,
patches: np.ndarray,
metadata: List[Dict],
device: torch.device,
batch_size: int = 32,
use_sigmoid_scores: bool = True
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute autoencoder anomaly probabilities
Returns:
prob_scores: (num_patches,) - Normalized probability scores [0, 1]
pixel_scores: (num_patches, H, W) - Per-pixel anomaly maps
reconstructed_patches: (num_patches, C, H, W) - Reconstructions
"""
model.eval()
patches_torch = torch.from_numpy(patches).float()
anomaly_scores = []
pixel_scores = []
reconstructed_patches = []
with torch.no_grad():
for i in tqdm(range(0, len(patches), batch_size),
desc="Model 1 (Autoencoder)", leave=False):
batch = patches_torch[i:i+batch_size].to(device)
reconstruction, _ = model(batch)
if use_sigmoid_scores:
# Sigmoid uncertainty: higher = more anomalous
sigmoid_uncertainty = 1.0 - torch.abs(reconstruction - 0.5) * 2
pixel_score = sigmoid_uncertainty.mean(dim=1) # Per pixel
patch_score = sigmoid_uncertainty.mean(dim=(1, 2, 3)) # Per patch
else:
# MSE-based approach
squared_error = (reconstruction - batch) ** 2
pixel_score = squared_error.mean(dim=1)
patch_score = squared_error.mean(dim=(1, 2, 3))
anomaly_scores.append(patch_score.cpu().numpy())
pixel_scores.append(pixel_score.cpu().numpy())
reconstructed_patches.append(reconstruction.cpu().numpy())
anomaly_scores = np.concatenate(anomaly_scores)
pixel_scores = np.concatenate(pixel_scores)
reconstructed_patches = np.concatenate(reconstructed_patches)
# NORMALIZE to [0, 1] probability range
prob_scores = (anomaly_scores - anomaly_scores.min()) / (anomaly_scores.max() - anomaly_scores.min() + 1e-10)
return prob_scores, pixel_scores, reconstructed_patches
# ==============================================================================
# TRAIN/TEST SPLIT
# ==============================================================================
def discover_and_split_aois(
patches_dir: Path,
train_ratio: float = 0.8
) -> Tuple[List[str], List[str]]:
"""Discover all AOIs and split into train/test sets
Args:
patches_dir: Directory containing AOI_xxxx_important_patches.npz files
train_ratio: Fraction of AOIs for training (default 0.8)
Returns:
train_aoi_names: List of AOI names for training
test_aoi_names: List of AOI names for testing
"""
# Find all important_patches files
important_files = sorted(list(patches_dir.glob("AOI_*_important_patches.npz")))
if len(important_files) == 0:
raise FileNotFoundError(f"No *_important_patches.npz files found in {patches_dir}")
# Extract AOI names (remove suffix)
aoi_names = [f.stem.replace('_important_patches', '') for f in important_files]
print(f"\n📂 Discovered {len(aoi_names)} AOIs in {patches_dir}")
print(f" Examples: {', '.join(aoi_names[:5])}")
# Shuffle for random split
np.random.shuffle(aoi_names)
split_idx = int(len(aoi_names) * train_ratio)
train_aoi_names = aoi_names[:split_idx]
test_aoi_names = aoi_names[split_idx:]
print(f"\n📊 AOI-Level Split:")
print(f" Total AOIs: {len(aoi_names)}")
print(f" Training AOIs: {len(train_aoi_names)}")
print(f" Testing AOIs: {len(test_aoi_names)}")
return train_aoi_names, test_aoi_names
# ==============================================================================
# VISUALIZATION
# ==============================================================================
def visualize_full_aoi_anomalies(
aoi_name: str,
model: nn.Module,
patches_dir: Path,
device: torch.device,
top_k: int = 20,
channel_names: Optional[List[str]] = None,
save_path: Optional[str] = None,
use_sigmoid_scores: bool = True # ADD THIS LINE
):
"""Visualize anomalies on full AOI reconstruction using ALL patches
Args:
aoi_name: Name of the AOI to visualize
model: Trained autoencoder model
patches_dir: Directory containing patch files
device: torch device to use
top_k: Number of top anomalies to highlight
channel_names: List of channel names (optional)
save_path: Path to save visualization (optional)
use_sigmoid_scores: If True, use sigmoid-based scoring; if False, use MSE
Returns:
anomaly_scores: Anomaly score per patch
anomaly_heatmap: Full-resolution anomaly heatmap
original_full: Original reconstructed AOI
reconstructed_full: Model-reconstructed AOI
"""
if channel_names is None:
channel_names = ['DTM', 'Slope', 'Roughness', 'NDVI', 'NDWI', 'FlowAcc', 'FlowDir']
score_type = "Sigmoid Score" if use_sigmoid_scores else "MSE"
print(f"\n{'='*70}")
print(f"FULL AOI ANOMALY DETECTION: {aoi_name}")
print(f"Scoring Method: {score_type}")
print(f"{'='*70}")
# Load ALL patches
all_patches_file = patches_dir / f"{aoi_name}_all_patches.npz"
if not all_patches_file.exists():
raise FileNotFoundError(f"All patches file not found: {all_patches_file}")
print(f" Loading ALL patches from: {all_patches_file.name}")
with np.load(all_patches_file, allow_pickle=True) as data:
patches = data['patches']
metadata = list(data['metadata'])
patches = np.nan_to_num(patches, nan=0.0, posinf=0.0, neginf=0.0)
print(f" Patches loaded: {len(patches)}")
# Compute anomalies
print(f" 🔬 Computing anomaly scores ({score_type})...")
anomaly_scores, pixel_scores, recon_patches, original_patches = compute_full_aoi_anomalies(
model, patches, metadata, device, use_sigmoid_scores=use_sigmoid_scores
)
# Reconstruct full AOI
print(f" 🔨 Reconstructing original AOI...")
original_full, weight_map_orig, shape = reconstruct_full_aoi_corrected(
patches, metadata, use_weighted=True
)
print(f" 🔨 Reconstructing from model output...")
reconstructed_full, weight_map_recon, _ = reconstruct_full_aoi_corrected(
recon_patches, metadata, use_weighted=True
)
# Create anomaly heatmap
print(f" 🗺️ Creating anomaly heatmap...")
anomaly_heatmap = np.zeros(shape, dtype=np.float32)
anomaly_count = np.zeros(shape, dtype=np.float32)
for i, meta in enumerate(metadata):
row = meta['row']
col = meta['col']
row_end = min(row + 64, shape[0])
col_end = min(col + 64, shape[1])
patch_h = row_end - row
patch_w = col_end - col
if patch_h > 0 and patch_w > 0:
anomaly_heatmap[row:row_end, col:col_end] += pixel_scores[i, :patch_h, :patch_w]
anomaly_count[row:row_end, col:col_end] += 1
anomaly_count = np.maximum(anomaly_count, 1)
anomaly_heatmap = anomaly_heatmap / anomaly_count
# Find top anomalies
top_indices = np.argsort(anomaly_scores)[-top_k:][::-1]
# Create visualization
print(f" 🎨 Creating visualization...")
fig = plt.figure(figsize=(24, 18))
gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3)
# Row 1: Original channels
for i, (ch_idx, ch_name) in enumerate([(0, 'DTM'), (3, 'NDVI'), (4, 'NDWI')]):
ax = fig.add_subplot(gs[0, i])
data = original_full[ch_idx]
data_masked = np.ma.masked_invalid(data)
cmap = 'terrain' if ch_idx == 0 else 'RdYlGn'
im = ax.imshow(data_masked, cmap=cmap, interpolation='bilinear')
ax.set_title(f'{aoi_name}\n{ch_name} (Original)', fontsize=12, fontweight='bold')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
# Row 2: Reconstruction quality
ax = fig.add_subplot(gs[1, 0])
recon_masked = np.ma.masked_invalid(reconstructed_full[0])
im = ax.imshow(recon_masked, cmap='terrain', interpolation='bilinear')
ax.set_title('DTM Reconstruction', fontsize=12, fontweight='bold')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
ax = fig.add_subplot(gs[1, 1])
error_map = np.abs(original_full[0] - reconstructed_full[0])
error_masked = np.ma.masked_invalid(error_map)
im = ax.imshow(error_masked, cmap='Reds', interpolation='bilinear')
ax.set_title('Absolute Error (DTM)', fontsize=12, fontweight='bold')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
ax = fig.add_subplot(gs[1, 2])
im = ax.imshow(weight_map_orig, cmap='viridis', interpolation='nearest')
coverage_pct = np.sum(weight_map_orig > 0) / weight_map_orig.size * 100
ax.set_title(f'Patch Overlap\n(coverage: {coverage_pct:.1f}%)', fontsize=12, fontweight='bold')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
# Row 3: Anomaly detection
ax = fig.add_subplot(gs[2, 0])
im = ax.imshow(anomaly_heatmap, cmap='hot', interpolation='bilinear')
title = f'Anomaly Heatmap ({score_type})'
ax.set_title(title, fontsize=12, fontweight='bold')
ax.axis('off')
plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04, label='Anomaly Score')
ax = fig.add_subplot(gs[2, 1])
ax.imshow(original_full[0], cmap='terrain', interpolation='bilinear', alpha=0.6)
im = ax.imshow(anomaly_heatmap, cmap='hot', interpolation='bilinear', alpha=0.5)
for idx in top_indices[:10]:
row = metadata[idx]['row']
col = metadata[idx]['col']
rect = Rectangle((col, row), 64, 64, linewidth=2,
edgecolor='cyan', facecolor='none', linestyle='--')
ax.add_patch(rect)
ax.set_title(f'Top-{min(10, top_k)} Anomalies\n(cyan boxes)', fontsize=12, fontweight='bold')
ax.axis('off')
ax = fig.add_subplot(gs[2, 2])
ax.axis('off')
for i in range(min(6, len(top_indices))):
idx = top_indices[i]
row = metadata[idx]['row']
col = metadata[idx]['col']
row_end = min(row + 64, shape[0])
col_end = min(col + 64, shape[1])
patch_dtm = original_full[0, row:row_end, col:col_end]
patch_error = pixel_scores[idx]
sub_row = i // 3
sub_col = i % 3
sub_ax = ax.inset_axes([sub_col*0.33, (1-sub_row*0.5)-0.45, 0.3, 0.42])
sub_ax.imshow(patch_dtm, cmap='terrain', interpolation='nearest')
sub_ax.imshow(patch_error, cmap='hot', interpolation='nearest', alpha=0.5)
sub_ax.set_title(f'#{i+1}: {anomaly_scores[idx]:.4f}', fontsize=8)
sub_ax.axis('off')
ax.set_title(f'Top-6 Anomalous Patches', fontsize=12, fontweight='bold', pad=10)
plt.suptitle(f'{aoi_name} - Full AOI Anomaly Detection ({score_type})\n{shape[0]}×{shape[1]} pixels from {len(patches)} patches',
fontsize=14, fontweight='bold', y=0.995)
if save_path:
plt.savefig(save_path, dpi=150, bbox_inches='tight')
print(f" 💾 Saved: {save_path}")
plt.show()
# Print statistics
print(f"\n 📊 Anomaly Statistics ({score_type}):")
print(f" Mean score: {np.mean(anomaly_scores):.6f}")
print(f" Std score: {np.std(anomaly_scores):.6f}")
print(f" 95th percentile: {np.percentile(anomaly_scores, 95):.6f}")
print(f" 99th percentile: {np.percentile(anomaly_scores, 99):.6f}")
print(f"\n 🔍 Top-{min(10, top_k)} Anomalies:")
for i, idx in enumerate(top_indices[:10]):
row = metadata[idx]['row']
col = metadata[idx]['col']
print(f" {i+1:2d}. Patch at ({row:4d}, {col:4d}) - Score: {anomaly_scores[idx]:.6f}")
return anomaly_scores, anomaly_heatmap, original_full, reconstructed_full
"""
utils.py
========
Reusable utilities for Isolation Forest anomaly detection pipeline.
Contains model architectures, data processing, and core operations.
"""
# ==============================================================================
# DATA PROCESSING
# ==============================================================================
def load_patches(patches_file: Path) -> Tuple[np.ndarray, List[Dict]]:
"""Load patches from .npz file and sanitize data"""
with np.load(patches_file, allow_pickle=True) as data:
patches = data['patches']
metadata = list(data['metadata'])
patches = np.nan_to_num(patches, nan=0.0, posinf=0.0, neginf=0.0)
return patches, metadata
def reconstruct_full_aoi_corrected(
patches: np.ndarray,
metadata: List[Dict],
patch_size: int = 64,
use_weighted: bool = True
) -> Tuple[np.ndarray, np.ndarray, Tuple[int, int]]:
"""Reconstruct full AOI from overlapping patches with proper blending"""
if len(patches) == 0:
raise ValueError("No patches provided for reconstruction")
num_patches = patches.shape[0]
num_channels = patches.shape[1]
# Determine full AOI dimensions
max_row = max(m['row'] + patch_size for m in metadata)
max_col = max(m['col'] + patch_size for m in metadata)
# Initialize accumulation arrays
reconstructed = np.zeros((num_channels, max_row, max_col), dtype=np.float64)
weight_map = np.zeros((max_row, max_col), dtype=np.float64)
# Create blending weight template
if use_weighted:
y, x = np.ogrid[0:patch_size, 0:patch_size]
center = patch_size // 2
dist_from_center = np.sqrt((y - center)**2 + (x - center)**2)
max_dist = np.sqrt(2) * center
blend_weight = 1.0 - (dist_from_center / max_dist) ** 2
blend_weight = np.clip(blend_weight, 0.1, 1.0)
else:
blend_weight = np.ones((patch_size, patch_size), dtype=np.float64)
# Stitch all patches
for i, meta in enumerate(metadata):
row_start = meta['row']
col_start = meta['col']
row_end = min(row_start + patch_size, max_row)
col_end = min(col_start + patch_size, max_col)
patch_height = row_end - row_start
patch_width = col_end - col_start
if patch_height <= 0 or patch_width <= 0:
continue
patch_data = patches[i, :, :patch_height, :patch_width]
patch_weight = blend_weight[:patch_height, :patch_width]
for ch in range(num_channels):
reconstructed[ch, row_start:row_end, col_start:col_end] += (
patch_data[ch] * patch_weight
)
weight_map[row_start:row_end, col_start:col_end] += patch_weight
# Normalize by weights
weight_map_safe = np.where(weight_map > 0, weight_map, 1.0)
for ch in range(num_channels):
reconstructed[ch] = reconstructed[ch] / weight_map_safe
# Set uncovered areas to NaN
uncovered_mask = weight_map == 0
for ch in range(num_channels):
reconstructed[ch][uncovered_mask] = np.nan
reconstructed = reconstructed.astype(np.float32)
weight_map = weight_map.astype(np.float32)
return reconstructed, weight_map, (max_row, max_col)
# ==============================================================================
# EMBEDDING EXTRACTION
# ==============================================================================
def extract_embeddings(
encoder: nn.Module,
patches: np.ndarray,
device: torch.device,
batch_size: int = 32
) -> np.ndarray:
"""Extract embeddings from patches using encoder"""
encoder.eval()
patches_torch = torch.from_numpy(patches).float()
embeddings = []
with torch.no_grad():
for i in tqdm(range(0, len(patches), batch_size),
desc="Extracting embeddings", leave=False):
batch = patches_torch[i:i+batch_size].to(device)
embedding = encoder(batch)
embeddings.append(embedding.cpu().numpy())
embeddings = np.concatenate(embeddings, axis=0)
return embeddings
# ==============================================================================
# ISOLATION FOREST OPERATIONS
# ==============================================================================
def train_isolation_forest(
embeddings: np.ndarray,
contamination: float = 0.05,
n_estimators: int = 100,
max_samples: str = 'auto',
random_state: int = 42
) -> Tuple[IsolationForest, StandardScaler]:
"""
Train Isolation Forest on embeddings
Args:
embeddings: (N, embedding_dim) array
contamination: Expected proportion of outliers
n_estimators: Number of trees
max_samples: Samples to draw for each tree
random_state: Random seed
Returns:
Trained IsolationForest model and fitted scaler
"""
print(f"\n🌲 Training Isolation Forest")
print(f" Embeddings shape: {embeddings.shape}")
print(f" Contamination: {contamination:.1%}")
print(f" Trees: {n_estimators}")
# Standardize embeddings
scaler = StandardScaler()
embeddings_scaled = scaler.fit_transform(embeddings)
# Train Isolation Forest
iforest = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
max_samples=max_samples,
random_state=random_state,
n_jobs=-1,
verbose=1
)
iforest.fit(embeddings_scaled)
# Get anomaly scores
scores = iforest.decision_function(embeddings_scaled)
predictions = iforest.predict(embeddings_scaled)
n_anomalies = np.sum(predictions == -1)
print(f" ✓ Training complete")
print(f" Detected anomalies: {n_anomalies} ({n_anomalies/len(predictions)*100:.2f}%)")
print(f" Score range: [{scores.min():.4f}, {scores.max():.4f}]")
return iforest, scaler
def compute_isolation_forest_anomalies(
encoder: nn.Module,
iforest: IsolationForest,
scaler: StandardScaler,
patches: np.ndarray,
device: torch.device,
batch_size: int = 32
) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute anomaly scores and predictions for patches
Returns:
probability_scores: Normalized anomaly probabilities [0, 1]
predictions: Binary predictions (-1 = anomaly, 1 = normal)
"""
# Extract embeddings
embeddings = extract_embeddings(encoder, patches, device, batch_size)
embeddings_scaled = scaler.transform(embeddings)
# Get raw scores (Negative = Anomaly, Positive = Normal)
raw_scores = iforest.decision_function(embeddings_scaled)
predictions = iforest.predict(embeddings_scaled)
# Invert so Higher = More Anomalous
inverted_scores = -raw_scores
# Convert to probability (0.0 to 1.0)
min_s = inverted_scores.min()
max_s = inverted_scores.max()
# Min-Max Normalization
probability_scores = (inverted_scores - min_s) / (max_s - min_s + 1e-10)
return probability_scores, predictions
def save_model(iforest: IsolationForest, scaler: StandardScaler, save_path: str):
"""Save trained Isolation Forest model and scaler"""
with open(save_path, 'wb') as f:
pickle.dump({'iforest': iforest, 'scaler': scaler}, f)
print(f"💾 Saved model: {save_path}")
def load_model(model_path: str) -> Tuple[IsolationForest, StandardScaler]:
"""Load trained Isolation Forest model and scaler"""
with open(model_path, 'rb') as f:
data = pickle.load(f)
print(f"✓ Loaded model from: {model_path}")
return data['iforest'], data['scaler']
# ==============================================================================
# MISSING: ResUNetEncoder Class (for Isolation Forest)
# ==============================================================================
"""
Fixed ResUNetEncoder with proper weight loading
"""
class ResUNetEncoder(nn.Module):
"""Encoder-only version for embedding extraction"""
def __init__(self, in_channels: int = 7, embedding_dim: int = 128):
super().__init__()
# Encoder (must match autoencoder architecture)
self.enc1 = EncoderBlock(in_channels, 32, downsample=True)
self.enc2 = EncoderBlock(32, 64, downsample=True)
self.enc3 = EncoderBlock(64, 128, downsample=True)
self.bottleneck = EncoderBlock(128, 256, downsample=False)
# Embedding projection
self.global_pool = nn.AdaptiveAvgPool2d(1)
self.embedding_dim = embedding_dim
# Bottleneck outputs 256 channels, project to embedding_dim
self.projection = nn.Linear(256, embedding_dim)
def forward(self, x):
# Encoder path
x, _ = self.enc1(x)
x, _ = self.enc2(x)
x, _ = self.enc3(x)
x = self.bottleneck(x)
# Global pooling: (batch, 256, H, W) -> (batch, 256, 1, 1)
x = self.global_pool(x)
# Flatten: (batch, 256, 1, 1) -> (batch, 256)
x = x.view(x.size(0), -1)
# Project to target embedding dimension: (batch, 256) -> (batch, embedding_dim)
embedding = self.projection(x)
return embedding
def load_from_autoencoder(self, autoencoder_path: str):
"""Load encoder weights from trained autoencoder"""
print(f"Loading encoder weights from: {autoencoder_path}")
# Load full autoencoder state dict
autoencoder_state = torch.load(autoencoder_path, map_location='cpu')
# Filter to get only encoder + bottleneck weights
encoder_dict = {}
for k, v in autoencoder_state.items():
if k.startswith(('enc1.', 'enc2.', 'enc3.', 'bottleneck.')):
encoder_dict[k] = v
# Load encoder weights (projection layer will remain randomly initialized)
missing, unexpected = self.load_state_dict(encoder_dict, strict=False)
# Verify what was loaded
print(f"✓ Loaded {len(encoder_dict)} encoder layers")
print(f" Missing keys (expected): {len(missing)} - {missing[:3]}..." if len(missing) > 3 else f" Missing keys: {missing}")
# The projection layer should be in missing keys (it's not in autoencoder)
if 'projection.weight' not in missing or 'projection.bias' not in missing:
print("⚠️ Warning: Projection layer might have been incorrectly loaded")
else:
print(f"✓ Projection layer (256 → {self.embedding_dim}) will be trained from scratch")
# ==============================================================================
# MISSING: Isolation Forest Probability Computation
# ==============================================================================
def compute_iforest_probabilities(
encoder: nn.Module,
iforest: IsolationForest,
scaler: StandardScaler,
patches: np.ndarray,
metadata: List[Dict],
device: torch.device,
batch_size: int = 32
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute Isolation Forest anomaly probabilities
Returns:
prob_scores: (num_patches,) - Normalized probability scores [0, 1]
embeddings: (num_patches, embedding_dim) - Latent embeddings
predictions: (num_patches,) - Binary predictions (-1=anomaly, 1=normal)
"""
# Extract embeddings
embeddings = extract_embeddings(encoder, patches, device, batch_size)
embeddings_scaled = scaler.transform(embeddings)
# Get raw scores
raw_scores = iforest.decision_function(embeddings_scaled)
predictions = iforest.predict(embeddings_scaled)
# Invert so Higher = More Anomalous
inverted_scores = -raw_scores
# Normalize to [0, 1]
min_s = inverted_scores.min()
max_s = inverted_scores.max()
prob_scores = (inverted_scores - min_s) / (max_s - min_s + 1e-10)
return prob_scores, embeddings, predictions
# ==============================================================================
# MISSING: Template Matching Probabilities (Model 3 placeholder)
# ==============================================================================
def compute_template_matching_probabilities(
patches: np.ndarray,
metadata: List[Dict],
template_library: Dict[str, np.ndarray],
method: str = 'correlation'
) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute template matching probabilities (PLACEHOLDER - implement your method)
Returns:
prob_scores: (num_patches,) - Match probability scores [0, 1]
similarity_scores: (num_patches, num_templates) - Raw similarity to each template
"""
from scipy.signal import correlate2d
num_patches = len(patches)
num_templates = len(template_library)
similarity_scores = np.zeros((num_patches, num_templates))
# Extract DTM channel (channel 0)
patch_dtm = patches[:, 0, :, :]
for i, patch in enumerate(tqdm(patch_dtm, desc="Model 3 (Template Matching)", leave=False)):
for j, (template_name, template) in enumerate(template_library.items()):
if method == 'correlation':
corr = correlate2d(patch, template, mode='valid')
if corr.size > 0:
similarity = np.max(corr) / (np.linalg.norm(patch) * np.linalg.norm(template) + 1e-10)
else:
similarity = 0.0
else:
# Add other methods as needed
similarity = 0.0
similarity_scores[i, j] = similarity
# Use max similarity across templates
max_similarity = similarity_scores.max(axis=1)
# Normalize to [0, 1]
prob_scores = (max_similarity - max_similarity.min()) / (max_similarity.max() - max_similarity.min() + 1e-10)
return prob_scores, similarity_scores
# ==============================================================================
# MISSING: Probability Matrix Storage
# ==============================================================================
def save_probability_matrix(
aoi_name: str,
probability_matrix: np.ndarray,
metadata: List[Dict],
save_dir: Path = Path('./probability_matrices')
):
"""Save probability matrix for later use by the gate model"""
save_dir.mkdir(exist_ok=True, parents=True)
save_path = save_dir / f"{aoi_name}_probability_matrix.npz"
np.savez_compressed(
save_path,
probability_matrix=probability_matrix,
metadata=metadata,
column_names=['prob_autoencoder', 'prob_iforest', 'prob_template', 'sim_score']
)
print(f"💾 Saved probability matrix: {save_path}")
def load_probability_matrix(
aoi_name: str,
load_dir: Path = Path('./probability_matrices')
) -> Tuple[np.ndarray, List[Dict]]:
"""Load saved probability matrix"""
load_path = load_dir / f"{aoi_name}_probability_matrix.npz"
with np.load(load_path, allow_pickle=True) as data:
probability_matrix = data['probability_matrix']
metadata = data['metadata']
print(f"✓ Loaded probability matrix from: {load_path}")
print(f" Shape: {probability_matrix.shape}")
return probability_matrix, metadata
# ==============================================================================
# MODEL 3: SIGNATURE MATCHING UTILITIES
# ==============================================================================
def compute_cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
"""Compute cosine similarity between two vectors"""
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
def compute_signature_scores(
latent: np.ndarray,
centroids: np.ndarray,
scaler: Optional[StandardScaler] = None
) -> Dict:
"""Compute signature matching scores for a single latent vector"""
if scaler is not None:
latent_scaled = scaler.transform(latent.reshape(1, -1))[0]
else:
latent_scaled = latent
n_clusters = centroids.shape[0]
similarities = np.zeros(n_clusters)
for i in range(n_clusters):
similarities[i] = compute_cosine_similarity(latent_scaled, centroids[i])
similarities_normalized = (similarities + 1.0) / 2.0
best_cluster = int(np.argmax(similarities_normalized))
best_score = float(similarities_normalized[best_cluster])
return {
"cluster_scores": similarities_normalized.tolist(),
"best_cluster": best_cluster,
"best_score": best_score,
"max_similarity": best_score
}
def compute_kmeans_probabilities(
encoder: nn.Module,
kmeans_model,
scaler: Optional[StandardScaler],
patches: np.ndarray,
metadata: List[Dict],
device: torch.device,
batch_size: int = 32
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute K-Means signature matching probabilities
Returns:
prob_scores: (num_patches,) - Match probability scores [0, 1]
similarity_scores: (num_patches,) - Max similarity to any cluster
cluster_assignments: (num_patches,) - Best matching cluster ID
"""
embeddings = extract_embeddings(encoder, patches, device, batch_size)
centroids = kmeans_model.cluster_centers_
n_patches = len(embeddings)
prob_scores = np.zeros(n_patches)
similarity_scores = np.zeros(n_patches)
cluster_assignments = np.zeros(n_patches, dtype=int)
for i in tqdm(range(n_patches), desc="Model 3 (K-Means Matching)", leave=False):
scores = compute_signature_scores(embeddings[i], centroids, scaler)
prob_scores[i] = scores['best_score']
similarity_scores[i] = scores['max_similarity']
cluster_assignments[i] = scores['best_cluster']
return prob_scores, similarity_scores, cluster_assignments
def load_kmeans_model(model_path: str):
"""Load trained K-Means model and scaler"""
with open(model_path, 'rb') as f:
data = pickle.load(f)
print(f"✓ Loaded K-Means model from: {model_path}")
# Handle different storage formats
if isinstance(data, dict):
kmeans = data.get('kmeans', data.get('model', None))
scaler = data.get('scaler', None)
else:
# Assume it's the KMeans object directly
kmeans = data
scaler = None
print("⚠️ Warning: No scaler found, loaded KMeans object only")
if kmeans is None:
raise ValueError(f"Could not find KMeans model in {model_path}")
return kmeans, scaler
# ==============================================================================
# ADD THESE FUNCTIONS TO YOUR EXISTING utils.py
# ==============================================================================
def load_unified_probability_matrix(
aoi_name: str,
load_dir: Path = Path('./unified_probability_matrices')
) -> Tuple[np.ndarray, List[Dict], List[str]]:
"""
Load saved unified probability matrix
Args:
aoi_name: Name of AOI (e.g., "AOI_0001")
load_dir: Directory containing saved matrices
Returns:
unified_matrix: Shape (num_patches, 64, 64, 4)
metadata: List of patch metadata dicts
channel_names: List of channel names
"""
load_path = load_dir / f"{aoi_name}_unified_prob_matrix.npz"
if not load_path.exists():
raise FileNotFoundError(f"Unified matrix not found: {load_path}")
with np.load(load_path, allow_pickle=True) as data:
unified_matrix = data['unified_matrix']
metadata = data['metadata']
channel_names = data['channel_names']
print(f"✓ Loaded unified probability matrix from: {load_path}")
print(f" Shape: {unified_matrix.shape}")
print(f" Channels: {list(channel_names)}")
return unified_matrix, metadata, channel_names
def broadcast_patch_scores_to_pixels(
patch_scores: np.ndarray,
patch_size: int = 64
) -> np.ndarray:
"""
Broadcast patch-level scores to pixel-level
Args:
patch_scores: Shape (num_patches,) or (num_patches, 1)
patch_size: Size of patches (default 64)
Returns:
pixel_scores: Shape (num_patches, patch_size, patch_size)
"""
if patch_scores.ndim == 1:
patch_scores = patch_scores[:, np.newaxis, np.newaxis]
elif patch_scores.ndim == 2 and patch_scores.shape[1] == 1:
patch_scores = patch_scores[:, :, np.newaxis]
num_patches = patch_scores.shape[0]
pixel_scores = np.broadcast_to(
patch_scores,
(num_patches, patch_size, patch_size)
).copy()
return pixel_scores
def normalize_scores(scores: np.ndarray) -> np.ndarray:
"""
Normalize scores to [0, 1] range using min-max scaling
Args:
scores: Input scores of any shape
Returns:
normalized_scores: Same shape as input, values in [0, 1]
"""
min_val = scores.min()
max_val = scores.max()
if max_val - min_val < 1e-10:
# All values are the same, return zeros or ones
return np.zeros_like(scores)
normalized = (scores - min_val) / (max_val - min_val)
return normalized.astype(np.float32)
def save_unified_probability_matrix(
aoi_name: str,
unified_matrix: np.ndarray,
metadata: List[Dict],
save_dir: Path = Path('./unified_probability_matrices'),
channel_names: List[str] = None
):
"""
Save unified probability matrix to disk
Args:
aoi_name: Name of AOI
unified_matrix: Shape (num_patches, 64, 64, 4)
metadata: Patch metadata
save_dir: Directory to save to
channel_names: Names of the 4 channels
"""
if channel_names is None:
channel_names = ['prob_autoencoder', 'prob_iforest', 'prob_kmeans', 'sim_score']
save_dir.mkdir(exist_ok=True, parents=True)
save_path = save_dir / f"{aoi_name}_unified_prob_matrix.npz"
np.savez_compressed(
save_path,
unified_matrix=unified_matrix,
metadata=metadata,
channel_names=channel_names,
aoi_name=aoi_name
)
print(f"💾 Saved unified probability matrix: {save_path}")
print(f" Shape: {unified_matrix.shape}")
print(f" Channels: {channel_names}")
# Add this function to utils.py to fix the missing function issue
def compute_full_aoi_anomalies(
model: nn.Module,
patches: np.ndarray,
metadata: List[Dict],
device: torch.device,
batch_size: int = 32,
use_sigmoid_scores: bool = True
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Compute autoencoder anomaly scores for full AOI visualization
Returns:
anomaly_scores: (num_patches,) - Per-patch anomaly scores
pixel_scores: (num_patches, H, W) - Per-pixel anomaly maps
reconstructed_patches: (num_patches, C, H, W) - Model reconstructions
original_patches: (num_patches, C, H, W) - Original input patches
"""
model.eval()
patches_torch = torch.from_numpy(patches).float()
anomaly_scores = []
pixel_scores = []
reconstructed_patches = []
with torch.no_grad():
for i in tqdm(range(0, len(patches), batch_size),
desc="Computing anomalies", leave=False):
batch = patches_torch[i:i+batch_size].to(device)
reconstruction, _ = model(batch)
if use_sigmoid_scores:
# Sigmoid uncertainty: higher = more anomalous
sigmoid_uncertainty = 1.0 - torch.abs(reconstruction - 0.5) * 2
pixel_score = sigmoid_uncertainty.mean(dim=1) # Per pixel
patch_score = sigmoid_uncertainty.mean(dim=(1, 2, 3)) # Per patch
else:
# MSE-based approach
squared_error = (reconstruction - batch) ** 2
pixel_score = squared_error.mean(dim=1)
patch_score = squared_error.mean(dim=(1, 2, 3))
anomaly_scores.append(patch_score.cpu().numpy())
pixel_scores.append(pixel_score.cpu().numpy())
reconstructed_patches.append(reconstruction.cpu().numpy())
anomaly_scores = np.concatenate(anomaly_scores)
pixel_scores = np.concatenate(pixel_scores)
reconstructed_patches = np.concatenate(reconstructed_patches)
# Return original patches as well (convert to numpy)
original_patches = patches
return anomaly_scores, pixel_scores, reconstructed_patches, original_patches