"""Phase 3: Feature extraction from preprocessed EEG windows.""" import numpy as np from pathlib import Path from scipy.signal import welch from scipy import stats as scipy_stats PROJECT_ROOT = Path(__file__).resolve().parent.parent FS = 500.0 def extract_psd_features(window, fs=500.0): """ Extract PSD band-power features per channel. window: (n_samples, 6) Returns: (24,) feature vector """ features = [] for ch in range(window.shape[1]): freqs, psd = welch(window[:, ch], fs=fs, nperseg=min(256, window.shape[0])) theta = np.mean(psd[(freqs >= 4) & (freqs < 8)]) alpha = np.mean(psd[(freqs >= 8) & (freqs < 13)]) beta = np.mean(psd[(freqs >= 13) & (freqs < 30)]) alpha_beta = alpha / (beta + 1e-10) features.extend([theta, alpha, beta, alpha_beta]) return np.array(features, dtype=np.float64) def extract_stat_features(window): """ Extract statistical features per channel. window: (n_samples, 6) Returns: (42,) feature vector """ features = [] for ch in range(window.shape[1]): signal = window[:, ch] features.extend([ np.var(signal), np.mean(np.abs(signal)), np.sqrt(np.mean(signal ** 2)), float(np.max(np.abs(signal))), float(scipy_stats.kurtosis(signal)), float(scipy_stats.skew(signal)), np.sum(np.diff(np.sign(signal)) != 0), ]) return np.array(features, dtype=np.float64) def extract_cross_channel_features(window): """ Extract inter-channel asymmetry features. window: (n_samples, 6) Channels: 0=AFF6(R), 1=AFp2(R), 2=AFp1(L), 3=AFF5(L), 4=FCz, 5=CPz Returns: (3,) feature vector """ features = [ np.var(window[:, 3]) - np.var(window[:, 0]), # AFF5(L) - AFF6(R) np.var(window[:, 2]) - np.var(window[:, 1]), # AFp1(L) - AFp2(R) np.var(window[:, 4]) - np.var(window[:, 5]), # FCz - CPz ] return np.array(features, dtype=np.float64) def extract_all_features(window): """Extract all features from a single window. Returns (69,) vector.""" return np.concatenate([ extract_psd_features(window), extract_stat_features(window), extract_cross_channel_features(window), ]) def build_feature_matrix(): """Load preprocessed data, extract features for all windows, save.""" data_path = PROJECT_ROOT / "preprocessed_data.npz" print(f"Loading preprocessed data from {data_path}...") data = np.load(str(data_path), allow_pickle=True) X_windows = data["X"] # (n_windows, 500, 6) y = data["y"] # (n_windows,) string labels subjects = data["subjects"] # (n_windows,) subject IDs n_windows = X_windows.shape[0] print(f"Extracting features from {n_windows} windows...") # Pre-allocate sample_feat = extract_all_features(X_windows[0]) n_features = len(sample_feat) print(f" Features per window: {n_features}") X_features = np.zeros((n_windows, n_features), dtype=np.float64) for i in range(n_windows): if (i + 1) % 2000 == 0: print(f" [{i+1}/{n_windows}]...") X_features[i] = extract_all_features(X_windows[i]) # Check for NaN/Inf nan_count = np.sum(np.isnan(X_features)) inf_count = np.sum(np.isinf(X_features)) if nan_count > 0 or inf_count > 0: print(f" WARNING: {nan_count} NaN, {inf_count} Inf values found. Replacing with 0.") X_features = np.nan_to_num(X_features, nan=0.0, posinf=0.0, neginf=0.0) # Feature stats print(f"\nFeature matrix shape: {X_features.shape}") print(f" Mean per feature: min={X_features.mean(axis=0).min():.4f}, max={X_features.mean(axis=0).max():.4f}") print(f" Std per feature: min={X_features.std(axis=0).min():.4f}, max={X_features.std(axis=0).max():.4f}") # Save out_path = PROJECT_ROOT / "features.npz" np.savez_compressed(str(out_path), X=X_features, y=y, subjects=subjects) print(f" Saved to {out_path} ({out_path.stat().st_size / 1e6:.1f} MB)") return X_features, y, subjects if __name__ == "__main__": X_features, y, subjects = build_feature_matrix()