Spaces:
Sleeping
Sleeping
File size: 6,210 Bytes
56ec9ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
"""
PaDiM: Patch Distribution Modeling for Anomaly Detection
Implementation based on: https://arxiv.org/abs/2011.08785
"""
import torch
import numpy as np
from scipy.ndimage import gaussian_filter
from sklearn.random_projection import SparseRandomProjection
from typing import Tuple, Optional
import pickle
from pathlib import Path
import config
class PaDiM:
"""
PaDiM anomaly detection model
Models the distribution of patch-level features using multivariate Gaussian
and detects anomalies via Mahalanobis distance.
"""
def __init__(self, reduce_dim: int = 100, epsilon: float = 1e-5):
"""
Args:
reduce_dim: Target dimensionality after random projection
epsilon: Small constant for numerical stability
"""
self.reduce_dim = reduce_dim
self.epsilon = epsilon
# Model parameters (learned from training data)
self.mean = None # [H, W, D]
self.cov_inv = None # [H, W, D, D]
self.projection = None # Random projection matrix
self.feature_shape = None # Spatial dimensions of features
def fit(self, embeddings: np.ndarray):
"""
Fit PaDiM model on normal training embeddings
Args:
embeddings: Training embeddings [N, D, H, W]
"""
N, D, H, W = embeddings.shape
self.feature_shape = (H, W)
print(f"Fitting PaDiM on {N} training samples...")
print(f"Original embedding dimension: {D}")
# Apply random dimensionality reduction
if D > self.reduce_dim:
embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D)
self.projection = SparseRandomProjection(
n_components=self.reduce_dim,
random_state=42
)
embeddings_reduced = self.projection.fit_transform(embeddings_reshaped)
embeddings = embeddings_reduced.reshape(N, H, W, self.reduce_dim)
D = self.reduce_dim
print(f"Reduced embedding dimension: {D}")
else:
embeddings = embeddings.transpose(0, 2, 3, 1) # [N, H, W, D]
# Compute mean and covariance for each spatial location
self.mean = np.mean(embeddings, axis=0) # [H, W, D]
# Compute covariance inverse for each pixel
self.cov_inv = np.zeros((H, W, D, D))
for h in range(H):
for w in range(W):
# Get features at this spatial location across all samples
features = embeddings[:, h, w, :] # [N, D]
# Compute covariance matrix
cov = np.cov(features.T) + self.epsilon * np.eye(D)
# Compute precision matrix (inverse covariance)
try:
self.cov_inv[h, w] = np.linalg.inv(cov)
except np.linalg.LinAlgError:
# Fallback to pseudo-inverse if singular
self.cov_inv[h, w] = np.linalg.pinv(cov)
print("PaDiM model fitted successfully!")
def predict(self, embeddings: np.ndarray) -> Tuple[float, np.ndarray]:
"""
Compute anomaly score and heatmap for test embeddings
Args:
embeddings: Test embeddings [1, D, H, W] (single image)
Returns:
anomaly_score: Image-level anomaly score (scalar)
anomaly_map: Pixel-level anomaly heatmap [H, W]
"""
if self.mean is None:
raise ValueError("Model not fitted. Call fit() first.")
# Apply same dimensionality reduction as training
_, D, H, W = embeddings.shape
if self.projection is not None:
embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D)
embeddings_reduced = self.projection.transform(embeddings_reshaped)
embeddings = embeddings_reduced.reshape(1, H, W, self.reduce_dim)
D = self.reduce_dim
else:
embeddings = embeddings.transpose(0, 2, 3, 1) # [1, H, W, D]
embeddings = embeddings[0] # Remove batch dimension [H, W, D]
# Compute Mahalanobis distance for each pixel
anomaly_map = np.zeros((H, W))
for h in range(H):
for w in range(W):
delta = embeddings[h, w] - self.mean[h, w] # [D]
# Mahalanobis distance: sqrt(delta^T * Sigma^-1 * delta)
distance = np.sqrt(
delta @ self.cov_inv[h, w] @ delta.T
)
anomaly_map[h, w] = distance
# Apply Gaussian smoothing to reduce noise
anomaly_map = gaussian_filter(anomaly_map, sigma=4)
# Image-level score is max of anomaly map
anomaly_score = np.max(anomaly_map)
return anomaly_score, anomaly_map
def save(self, path: Path):
"""Save model parameters to disk"""
model_dict = {
'mean': self.mean,
'cov_inv': self.cov_inv,
'projection': self.projection,
'feature_shape': self.feature_shape,
'reduce_dim': self.reduce_dim,
'epsilon': self.epsilon
}
with open(path, 'wb') as f:
pickle.dump(model_dict, f)
print(f"Model saved to {path}")
def load(self, path: Path):
"""Load model parameters from disk"""
with open(path, 'rb') as f:
model_dict = pickle.load(f)
self.mean = model_dict['mean']
self.cov_inv = model_dict['cov_inv']
self.projection = model_dict['projection']
self.feature_shape = model_dict['feature_shape']
self.reduce_dim = model_dict['reduce_dim']
self.epsilon = model_dict['epsilon']
print(f"Model loaded from {path}")
|