File size: 3,578 Bytes
58839b6
 
 
 
 
 
 
 
 
 
 
 
d9b5881
58839b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d9b5881
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58839b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import torch
import numpy as np
import torchvision.transforms as transforms
from sklearn.metrics import accuracy_score
from sklearn.decomposition import TruncatedSVD
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

def fit_svd_baseline(X_train, y_train, n_components=20):
    """Fits a linear baseline (SVD + Logistic Regression) on the fly."""
    pipeline = Pipeline([
        ('scaler', StandardScaler(with_std=False)),
        ('svd', TruncatedSVD(n_components=n_components, random_state=42)),
        ('logistic', LogisticRegression(max_iter=1000))
    ])
    pipeline.fit(X_train, y_train)
    return pipeline

def add_gaussian_noise(X, sigma):
    """
    Uniform noise addition for both torch Tensors and numpy arrays.
    Returns the same type as input.
    """
    if sigma <= 0: return X
    if torch.is_tensor(X):
        noise = torch.randn_like(X) * sigma
        return torch.clamp(X + noise, 0, 1)
    else:
        noise = np.random.randn(*X.shape) * sigma
        return np.clip(X + noise, 0, 1)

def add_svd_aligned_noise(X, sigma, components):
    """
    Adds noise that is projected onto the SVD components, living entirely
    within the 'signal' subspace.
    """
    if sigma <= 0: return X
    is_tensor = torch.is_tensor(X)
    
    # Flatten if needed
    orig_shape = list(X.shape)
    if is_tensor:
        X_flat = X.cpu().numpy().reshape(orig_shape[0], -1)
        components_np = components.cpu().numpy() if torch.is_tensor(components) else components
    else:
        X_flat = X.reshape(orig_shape[0], -1)
        components_np = components
    
    # 1. Generate random Gaussian noise in full dimensionality
    noise = np.random.randn(*X_flat.shape) * sigma
    
    # 2. Project noise onto components (V_k)
    # V_k (components_np) is assumed to be (k, 784)
    # Projection P = V_k^T @ V_k
    projected_noise = (noise @ components_np.T) @ components_np
    
    # 3. Add back and clip
    X_noisy = X_flat + projected_noise
    X_noisy = np.clip(X_noisy, 0, 1)
    
    if is_tensor:
        return torch.from_numpy(X_noisy).float().view(orig_shape)
    else:
        return X_noisy.reshape(orig_shape)

def add_blur(X, kernel_size):
    """Unified blur for torch Tensors (4D: B, C, H, W)."""
    if kernel_size <= 1: 
        return X
    sigma = 0.1 + 0.3 * (kernel_size // 2)
    blur_fn = transforms.GaussianBlur(kernel_size=(kernel_size, kernel_size), sigma=(sigma, sigma))
    return blur_fn(X)

def evaluate_classifier(model, X, y, device="cpu", is_pytorch=True):
    """
    Unified evaluation function.
    Handles PyTorch models (CNN, Hybrid) and Sklearn pipelines (SVD+LR).
    """
    if is_pytorch:
        model.eval()
        model.to(device)
        # Ensure X is 4D for CNN (B, 1, 28, 28)
        if len(X.shape) == 2:
            X_t = torch.as_tensor(X.reshape(-1, 1, 28, 28), dtype=torch.float32).to(device)
        else:
            X_t = torch.as_tensor(X, dtype=torch.float32).to(device)
        
        y_t = torch.as_tensor(y, dtype=torch.long).to(device)
        
        with torch.no_grad():
            logits = model(X_t)
            preds = torch.argmax(logits, dim=1).cpu().numpy()
        return accuracy_score(y, preds)
    else:
        # Sklearn pipeline - Ensure X is flattened 2D numpy
        if torch.is_tensor(X):
            X_np = X.view(X.size(0), -1).cpu().numpy()
        else:
            X_np = X.reshape(X.shape[0], -1)
        preds = model.predict(X_np)
        return accuracy_score(y, preds)