Spaces:
Sleeping
Sleeping
| """ | |
| PaDiM: Patch Distribution Modeling for Anomaly Detection | |
| Implementation based on: https://arxiv.org/abs/2011.08785 | |
| """ | |
| import torch | |
| import numpy as np | |
| from scipy.ndimage import gaussian_filter | |
| from sklearn.random_projection import SparseRandomProjection | |
| from typing import Tuple, Optional | |
| import pickle | |
| from pathlib import Path | |
| import config | |
| class PaDiM: | |
| """ | |
| PaDiM anomaly detection model | |
| Models the distribution of patch-level features using multivariate Gaussian | |
| and detects anomalies via Mahalanobis distance. | |
| """ | |
| def __init__(self, reduce_dim: int = 100, epsilon: float = 1e-5): | |
| """ | |
| Args: | |
| reduce_dim: Target dimensionality after random projection | |
| epsilon: Small constant for numerical stability | |
| """ | |
| self.reduce_dim = reduce_dim | |
| self.epsilon = epsilon | |
| # Model parameters (learned from training data) | |
| self.mean = None # [H, W, D] | |
| self.cov_inv = None # [H, W, D, D] | |
| self.projection = None # Random projection matrix | |
| self.feature_shape = None # Spatial dimensions of features | |
| def fit(self, embeddings: np.ndarray): | |
| """ | |
| Fit PaDiM model on normal training embeddings | |
| Args: | |
| embeddings: Training embeddings [N, D, H, W] | |
| """ | |
| N, D, H, W = embeddings.shape | |
| self.feature_shape = (H, W) | |
| print(f"Fitting PaDiM on {N} training samples...") | |
| print(f"Original embedding dimension: {D}") | |
| # Apply random dimensionality reduction | |
| if D > self.reduce_dim: | |
| embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D) | |
| self.projection = SparseRandomProjection( | |
| n_components=self.reduce_dim, | |
| random_state=42 | |
| ) | |
| embeddings_reduced = self.projection.fit_transform(embeddings_reshaped) | |
| embeddings = embeddings_reduced.reshape(N, H, W, self.reduce_dim) | |
| D = self.reduce_dim | |
| print(f"Reduced embedding dimension: {D}") | |
| else: | |
| embeddings = embeddings.transpose(0, 2, 3, 1) # [N, H, W, D] | |
| # Compute mean and covariance for each spatial location | |
| self.mean = np.mean(embeddings, axis=0) # [H, W, D] | |
| # Compute covariance inverse for each pixel | |
| self.cov_inv = np.zeros((H, W, D, D)) | |
| for h in range(H): | |
| for w in range(W): | |
| # Get features at this spatial location across all samples | |
| features = embeddings[:, h, w, :] # [N, D] | |
| # Compute covariance matrix | |
| cov = np.cov(features.T) + self.epsilon * np.eye(D) | |
| # Compute precision matrix (inverse covariance) | |
| try: | |
| self.cov_inv[h, w] = np.linalg.inv(cov) | |
| except np.linalg.LinAlgError: | |
| # Fallback to pseudo-inverse if singular | |
| self.cov_inv[h, w] = np.linalg.pinv(cov) | |
| print("PaDiM model fitted successfully!") | |
| def predict(self, embeddings: np.ndarray) -> Tuple[float, np.ndarray]: | |
| """ | |
| Compute anomaly score and heatmap for test embeddings | |
| Args: | |
| embeddings: Test embeddings [1, D, H, W] (single image) | |
| Returns: | |
| anomaly_score: Image-level anomaly score (scalar) | |
| anomaly_map: Pixel-level anomaly heatmap [H, W] | |
| """ | |
| if self.mean is None: | |
| raise ValueError("Model not fitted. Call fit() first.") | |
| # Apply same dimensionality reduction as training | |
| _, D, H, W = embeddings.shape | |
| if self.projection is not None: | |
| embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D) | |
| embeddings_reduced = self.projection.transform(embeddings_reshaped) | |
| embeddings = embeddings_reduced.reshape(1, H, W, self.reduce_dim) | |
| D = self.reduce_dim | |
| else: | |
| embeddings = embeddings.transpose(0, 2, 3, 1) # [1, H, W, D] | |
| embeddings = embeddings[0] # Remove batch dimension [H, W, D] | |
| # Compute Mahalanobis distance for each pixel | |
| anomaly_map = np.zeros((H, W)) | |
| for h in range(H): | |
| for w in range(W): | |
| delta = embeddings[h, w] - self.mean[h, w] # [D] | |
| # Mahalanobis distance: sqrt(delta^T * Sigma^-1 * delta) | |
| distance = np.sqrt( | |
| delta @ self.cov_inv[h, w] @ delta.T | |
| ) | |
| anomaly_map[h, w] = distance | |
| # Apply Gaussian smoothing to reduce noise | |
| anomaly_map = gaussian_filter(anomaly_map, sigma=4) | |
| # Image-level score is max of anomaly map | |
| anomaly_score = np.max(anomaly_map) | |
| return anomaly_score, anomaly_map | |
| def save(self, path: Path): | |
| """Save model parameters to disk""" | |
| model_dict = { | |
| 'mean': self.mean, | |
| 'cov_inv': self.cov_inv, | |
| 'projection': self.projection, | |
| 'feature_shape': self.feature_shape, | |
| 'reduce_dim': self.reduce_dim, | |
| 'epsilon': self.epsilon | |
| } | |
| with open(path, 'wb') as f: | |
| pickle.dump(model_dict, f) | |
| print(f"Model saved to {path}") | |
| def load(self, path: Path): | |
| """Load model parameters from disk""" | |
| with open(path, 'rb') as f: | |
| model_dict = pickle.load(f) | |
| self.mean = model_dict['mean'] | |
| self.cov_inv = model_dict['cov_inv'] | |
| self.projection = model_dict['projection'] | |
| self.feature_shape = model_dict['feature_shape'] | |
| self.reduce_dim = model_dict['reduce_dim'] | |
| self.epsilon = model_dict['epsilon'] | |
| print(f"Model loaded from {path}") | |