File size: 4,203 Bytes
832948a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Phase 3: Feature extraction from preprocessed EEG windows."""
import numpy as np
from pathlib import Path
from scipy.signal import welch
from scipy import stats as scipy_stats


PROJECT_ROOT = Path(__file__).resolve().parent.parent
FS = 500.0


def extract_psd_features(window, fs=500.0):
    """
    Extract PSD band-power features per channel.
    window: (n_samples, 6)
    Returns: (24,) feature vector
    """
    features = []
    for ch in range(window.shape[1]):
        freqs, psd = welch(window[:, ch], fs=fs, nperseg=min(256, window.shape[0]))

        theta = np.mean(psd[(freqs >= 4) & (freqs < 8)])
        alpha = np.mean(psd[(freqs >= 8) & (freqs < 13)])
        beta = np.mean(psd[(freqs >= 13) & (freqs < 30)])
        alpha_beta = alpha / (beta + 1e-10)

        features.extend([theta, alpha, beta, alpha_beta])

    return np.array(features, dtype=np.float64)


def extract_stat_features(window):
    """
    Extract statistical features per channel.
    window: (n_samples, 6)
    Returns: (42,) feature vector
    """
    features = []
    for ch in range(window.shape[1]):
        signal = window[:, ch]
        features.extend([
            np.var(signal),
            np.mean(np.abs(signal)),
            np.sqrt(np.mean(signal ** 2)),
            float(np.max(np.abs(signal))),
            float(scipy_stats.kurtosis(signal)),
            float(scipy_stats.skew(signal)),
            np.sum(np.diff(np.sign(signal)) != 0),
        ])
    return np.array(features, dtype=np.float64)


def extract_cross_channel_features(window):
    """
    Extract inter-channel asymmetry features.
    window: (n_samples, 6)
    Channels: 0=AFF6(R), 1=AFp2(R), 2=AFp1(L), 3=AFF5(L), 4=FCz, 5=CPz
    Returns: (3,) feature vector
    """
    features = [
        np.var(window[:, 3]) - np.var(window[:, 0]),  # AFF5(L) - AFF6(R)
        np.var(window[:, 2]) - np.var(window[:, 1]),  # AFp1(L) - AFp2(R)
        np.var(window[:, 4]) - np.var(window[:, 5]),  # FCz - CPz
    ]
    return np.array(features, dtype=np.float64)


def extract_all_features(window):
    """Extract all features from a single window. Returns (69,) vector."""
    return np.concatenate([
        extract_psd_features(window),
        extract_stat_features(window),
        extract_cross_channel_features(window),
    ])


def build_feature_matrix():
    """Load preprocessed data, extract features for all windows, save."""
    data_path = PROJECT_ROOT / "preprocessed_data.npz"
    print(f"Loading preprocessed data from {data_path}...")
    data = np.load(str(data_path), allow_pickle=True)
    X_windows = data["X"]  # (n_windows, 500, 6)
    y = data["y"]           # (n_windows,) string labels
    subjects = data["subjects"]  # (n_windows,) subject IDs

    n_windows = X_windows.shape[0]
    print(f"Extracting features from {n_windows} windows...")

    # Pre-allocate
    sample_feat = extract_all_features(X_windows[0])
    n_features = len(sample_feat)
    print(f"  Features per window: {n_features}")

    X_features = np.zeros((n_windows, n_features), dtype=np.float64)

    for i in range(n_windows):
        if (i + 1) % 2000 == 0:
            print(f"  [{i+1}/{n_windows}]...")
        X_features[i] = extract_all_features(X_windows[i])

    # Check for NaN/Inf
    nan_count = np.sum(np.isnan(X_features))
    inf_count = np.sum(np.isinf(X_features))
    if nan_count > 0 or inf_count > 0:
        print(f"  WARNING: {nan_count} NaN, {inf_count} Inf values found. Replacing with 0.")
        X_features = np.nan_to_num(X_features, nan=0.0, posinf=0.0, neginf=0.0)

    # Feature stats
    print(f"\nFeature matrix shape: {X_features.shape}")
    print(f"  Mean per feature: min={X_features.mean(axis=0).min():.4f}, max={X_features.mean(axis=0).max():.4f}")
    print(f"  Std per feature: min={X_features.std(axis=0).min():.4f}, max={X_features.std(axis=0).max():.4f}")

    # Save
    out_path = PROJECT_ROOT / "features.npz"
    np.savez_compressed(str(out_path), X=X_features, y=y, subjects=subjects)
    print(f"  Saved to {out_path} ({out_path.stat().st_size / 1e6:.1f} MB)")

    return X_features, y, subjects


if __name__ == "__main__":
    X_features, y, subjects = build_feature_matrix()