Ameya729's picture
Upload 474 files
56ec9ba verified
"""
PaDiM: Patch Distribution Modeling for Anomaly Detection
Implementation based on: https://arxiv.org/abs/2011.08785
"""
import torch
import numpy as np
from scipy.ndimage import gaussian_filter
from sklearn.random_projection import SparseRandomProjection
from typing import Tuple, Optional
import pickle
from pathlib import Path
import config
class PaDiM:
"""
PaDiM anomaly detection model
Models the distribution of patch-level features using multivariate Gaussian
and detects anomalies via Mahalanobis distance.
"""
def __init__(self, reduce_dim: int = 100, epsilon: float = 1e-5):
"""
Args:
reduce_dim: Target dimensionality after random projection
epsilon: Small constant for numerical stability
"""
self.reduce_dim = reduce_dim
self.epsilon = epsilon
# Model parameters (learned from training data)
self.mean = None # [H, W, D]
self.cov_inv = None # [H, W, D, D]
self.projection = None # Random projection matrix
self.feature_shape = None # Spatial dimensions of features
def fit(self, embeddings: np.ndarray):
"""
Fit PaDiM model on normal training embeddings
Args:
embeddings: Training embeddings [N, D, H, W]
"""
N, D, H, W = embeddings.shape
self.feature_shape = (H, W)
print(f"Fitting PaDiM on {N} training samples...")
print(f"Original embedding dimension: {D}")
# Apply random dimensionality reduction
if D > self.reduce_dim:
embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D)
self.projection = SparseRandomProjection(
n_components=self.reduce_dim,
random_state=42
)
embeddings_reduced = self.projection.fit_transform(embeddings_reshaped)
embeddings = embeddings_reduced.reshape(N, H, W, self.reduce_dim)
D = self.reduce_dim
print(f"Reduced embedding dimension: {D}")
else:
embeddings = embeddings.transpose(0, 2, 3, 1) # [N, H, W, D]
# Compute mean and covariance for each spatial location
self.mean = np.mean(embeddings, axis=0) # [H, W, D]
# Compute covariance inverse for each pixel
self.cov_inv = np.zeros((H, W, D, D))
for h in range(H):
for w in range(W):
# Get features at this spatial location across all samples
features = embeddings[:, h, w, :] # [N, D]
# Compute covariance matrix
cov = np.cov(features.T) + self.epsilon * np.eye(D)
# Compute precision matrix (inverse covariance)
try:
self.cov_inv[h, w] = np.linalg.inv(cov)
except np.linalg.LinAlgError:
# Fallback to pseudo-inverse if singular
self.cov_inv[h, w] = np.linalg.pinv(cov)
print("PaDiM model fitted successfully!")
def predict(self, embeddings: np.ndarray) -> Tuple[float, np.ndarray]:
"""
Compute anomaly score and heatmap for test embeddings
Args:
embeddings: Test embeddings [1, D, H, W] (single image)
Returns:
anomaly_score: Image-level anomaly score (scalar)
anomaly_map: Pixel-level anomaly heatmap [H, W]
"""
if self.mean is None:
raise ValueError("Model not fitted. Call fit() first.")
# Apply same dimensionality reduction as training
_, D, H, W = embeddings.shape
if self.projection is not None:
embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D)
embeddings_reduced = self.projection.transform(embeddings_reshaped)
embeddings = embeddings_reduced.reshape(1, H, W, self.reduce_dim)
D = self.reduce_dim
else:
embeddings = embeddings.transpose(0, 2, 3, 1) # [1, H, W, D]
embeddings = embeddings[0] # Remove batch dimension [H, W, D]
# Compute Mahalanobis distance for each pixel
anomaly_map = np.zeros((H, W))
for h in range(H):
for w in range(W):
delta = embeddings[h, w] - self.mean[h, w] # [D]
# Mahalanobis distance: sqrt(delta^T * Sigma^-1 * delta)
distance = np.sqrt(
delta @ self.cov_inv[h, w] @ delta.T
)
anomaly_map[h, w] = distance
# Apply Gaussian smoothing to reduce noise
anomaly_map = gaussian_filter(anomaly_map, sigma=4)
# Image-level score is max of anomaly map
anomaly_score = np.max(anomaly_map)
return anomaly_score, anomaly_map
def save(self, path: Path):
"""Save model parameters to disk"""
model_dict = {
'mean': self.mean,
'cov_inv': self.cov_inv,
'projection': self.projection,
'feature_shape': self.feature_shape,
'reduce_dim': self.reduce_dim,
'epsilon': self.epsilon
}
with open(path, 'wb') as f:
pickle.dump(model_dict, f)
print(f"Model saved to {path}")
def load(self, path: Path):
"""Load model parameters from disk"""
with open(path, 'rb') as f:
model_dict = pickle.load(f)
self.mean = model_dict['mean']
self.cov_inv = model_dict['cov_inv']
self.projection = model_dict['projection']
self.feature_shape = model_dict['feature_shape']
self.reduce_dim = model_dict['reduce_dim']
self.epsilon = model_dict['epsilon']
print(f"Model loaded from {path}")