File size: 6,210 Bytes
56ec9ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""

PaDiM: Patch Distribution Modeling for Anomaly Detection

Implementation based on: https://arxiv.org/abs/2011.08785

"""

import torch
import numpy as np
from scipy.ndimage import gaussian_filter
from sklearn.random_projection import SparseRandomProjection
from typing import Tuple, Optional
import pickle
from pathlib import Path
import config


class PaDiM:
    """

    PaDiM anomaly detection model

    

    Models the distribution of patch-level features using multivariate Gaussian

    and detects anomalies via Mahalanobis distance.

    """
    
    def __init__(self, reduce_dim: int = 100, epsilon: float = 1e-5):
        """

        Args:

            reduce_dim: Target dimensionality after random projection

            epsilon: Small constant for numerical stability

        """
        self.reduce_dim = reduce_dim
        self.epsilon = epsilon
        
        # Model parameters (learned from training data)
        self.mean = None           # [H, W, D]
        self.cov_inv = None        # [H, W, D, D]
        self.projection = None     # Random projection matrix
        
        self.feature_shape = None  # Spatial dimensions of features
        
    def fit(self, embeddings: np.ndarray):
        """

        Fit PaDiM model on normal training embeddings

        

        Args:

            embeddings: Training embeddings [N, D, H, W]

        """
        N, D, H, W = embeddings.shape
        self.feature_shape = (H, W)
        
        print(f"Fitting PaDiM on {N} training samples...")
        print(f"Original embedding dimension: {D}")
        
        # Apply random dimensionality reduction
        if D > self.reduce_dim:
            embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D)
            
            self.projection = SparseRandomProjection(
                n_components=self.reduce_dim,
                random_state=42
            )
            embeddings_reduced = self.projection.fit_transform(embeddings_reshaped)
            embeddings = embeddings_reduced.reshape(N, H, W, self.reduce_dim)
            D = self.reduce_dim
            print(f"Reduced embedding dimension: {D}")
        else:
            embeddings = embeddings.transpose(0, 2, 3, 1)  # [N, H, W, D]
        
        # Compute mean and covariance for each spatial location
        self.mean = np.mean(embeddings, axis=0)  # [H, W, D]
        
        # Compute covariance inverse for each pixel
        self.cov_inv = np.zeros((H, W, D, D))
        
        for h in range(H):
            for w in range(W):
                # Get features at this spatial location across all samples
                features = embeddings[:, h, w, :]  # [N, D]
                
                # Compute covariance matrix
                cov = np.cov(features.T) + self.epsilon * np.eye(D)
                
                # Compute precision matrix (inverse covariance)
                try:
                    self.cov_inv[h, w] = np.linalg.inv(cov)
                except np.linalg.LinAlgError:
                    # Fallback to pseudo-inverse if singular
                    self.cov_inv[h, w] = np.linalg.pinv(cov)
        
        print("PaDiM model fitted successfully!")
    
    def predict(self, embeddings: np.ndarray) -> Tuple[float, np.ndarray]:
        """

        Compute anomaly score and heatmap for test embeddings

        

        Args:

            embeddings: Test embeddings [1, D, H, W] (single image)

            

        Returns:

            anomaly_score: Image-level anomaly score (scalar)

            anomaly_map: Pixel-level anomaly heatmap [H, W]

        """
        if self.mean is None:
            raise ValueError("Model not fitted. Call fit() first.")
        
        # Apply same dimensionality reduction as training
        _, D, H, W = embeddings.shape
        
        if self.projection is not None:
            embeddings_reshaped = embeddings.transpose(0, 2, 3, 1).reshape(-1, D)
            embeddings_reduced = self.projection.transform(embeddings_reshaped)
            embeddings = embeddings_reduced.reshape(1, H, W, self.reduce_dim)
            D = self.reduce_dim
        else:
            embeddings = embeddings.transpose(0, 2, 3, 1)  # [1, H, W, D]
        
        embeddings = embeddings[0]  # Remove batch dimension [H, W, D]
        
        # Compute Mahalanobis distance for each pixel
        anomaly_map = np.zeros((H, W))
        
        for h in range(H):
            for w in range(W):
                delta = embeddings[h, w] - self.mean[h, w]  # [D]
                
                # Mahalanobis distance: sqrt(delta^T * Sigma^-1 * delta)
                distance = np.sqrt(
                    delta @ self.cov_inv[h, w] @ delta.T
                )
                anomaly_map[h, w] = distance
        
        # Apply Gaussian smoothing to reduce noise
        anomaly_map = gaussian_filter(anomaly_map, sigma=4)
        
        # Image-level score is max of anomaly map
        anomaly_score = np.max(anomaly_map)
        
        return anomaly_score, anomaly_map
    
    def save(self, path: Path):
        """Save model parameters to disk"""
        model_dict = {
            'mean': self.mean,
            'cov_inv': self.cov_inv,
            'projection': self.projection,
            'feature_shape': self.feature_shape,
            'reduce_dim': self.reduce_dim,
            'epsilon': self.epsilon
        }
        
        with open(path, 'wb') as f:
            pickle.dump(model_dict, f)
        
        print(f"Model saved to {path}")
    
    def load(self, path: Path):
        """Load model parameters from disk"""
        with open(path, 'rb') as f:
            model_dict = pickle.load(f)
        
        self.mean = model_dict['mean']
        self.cov_inv = model_dict['cov_inv']
        self.projection = model_dict['projection']
        self.feature_shape = model_dict['feature_shape']
        self.reduce_dim = model_dict['reduce_dim']
        self.epsilon = model_dict['epsilon']
        
        print(f"Model loaded from {path}")