thoughtlink / src /features.py
Srilekha23's picture
Upload folder using huggingface_hub
832948a verified
"""Phase 3: Feature extraction from preprocessed EEG windows."""
import numpy as np
from pathlib import Path
from scipy.signal import welch
from scipy import stats as scipy_stats
PROJECT_ROOT = Path(__file__).resolve().parent.parent
FS = 500.0
def extract_psd_features(window, fs=500.0):
"""
Extract PSD band-power features per channel.
window: (n_samples, 6)
Returns: (24,) feature vector
"""
features = []
for ch in range(window.shape[1]):
freqs, psd = welch(window[:, ch], fs=fs, nperseg=min(256, window.shape[0]))
theta = np.mean(psd[(freqs >= 4) & (freqs < 8)])
alpha = np.mean(psd[(freqs >= 8) & (freqs < 13)])
beta = np.mean(psd[(freqs >= 13) & (freqs < 30)])
alpha_beta = alpha / (beta + 1e-10)
features.extend([theta, alpha, beta, alpha_beta])
return np.array(features, dtype=np.float64)
def extract_stat_features(window):
"""
Extract statistical features per channel.
window: (n_samples, 6)
Returns: (42,) feature vector
"""
features = []
for ch in range(window.shape[1]):
signal = window[:, ch]
features.extend([
np.var(signal),
np.mean(np.abs(signal)),
np.sqrt(np.mean(signal ** 2)),
float(np.max(np.abs(signal))),
float(scipy_stats.kurtosis(signal)),
float(scipy_stats.skew(signal)),
np.sum(np.diff(np.sign(signal)) != 0),
])
return np.array(features, dtype=np.float64)
def extract_cross_channel_features(window):
"""
Extract inter-channel asymmetry features.
window: (n_samples, 6)
Channels: 0=AFF6(R), 1=AFp2(R), 2=AFp1(L), 3=AFF5(L), 4=FCz, 5=CPz
Returns: (3,) feature vector
"""
features = [
np.var(window[:, 3]) - np.var(window[:, 0]), # AFF5(L) - AFF6(R)
np.var(window[:, 2]) - np.var(window[:, 1]), # AFp1(L) - AFp2(R)
np.var(window[:, 4]) - np.var(window[:, 5]), # FCz - CPz
]
return np.array(features, dtype=np.float64)
def extract_all_features(window):
"""Extract all features from a single window. Returns (69,) vector."""
return np.concatenate([
extract_psd_features(window),
extract_stat_features(window),
extract_cross_channel_features(window),
])
def build_feature_matrix():
"""Load preprocessed data, extract features for all windows, save."""
data_path = PROJECT_ROOT / "preprocessed_data.npz"
print(f"Loading preprocessed data from {data_path}...")
data = np.load(str(data_path), allow_pickle=True)
X_windows = data["X"] # (n_windows, 500, 6)
y = data["y"] # (n_windows,) string labels
subjects = data["subjects"] # (n_windows,) subject IDs
n_windows = X_windows.shape[0]
print(f"Extracting features from {n_windows} windows...")
# Pre-allocate
sample_feat = extract_all_features(X_windows[0])
n_features = len(sample_feat)
print(f" Features per window: {n_features}")
X_features = np.zeros((n_windows, n_features), dtype=np.float64)
for i in range(n_windows):
if (i + 1) % 2000 == 0:
print(f" [{i+1}/{n_windows}]...")
X_features[i] = extract_all_features(X_windows[i])
# Check for NaN/Inf
nan_count = np.sum(np.isnan(X_features))
inf_count = np.sum(np.isinf(X_features))
if nan_count > 0 or inf_count > 0:
print(f" WARNING: {nan_count} NaN, {inf_count} Inf values found. Replacing with 0.")
X_features = np.nan_to_num(X_features, nan=0.0, posinf=0.0, neginf=0.0)
# Feature stats
print(f"\nFeature matrix shape: {X_features.shape}")
print(f" Mean per feature: min={X_features.mean(axis=0).min():.4f}, max={X_features.mean(axis=0).max():.4f}")
print(f" Std per feature: min={X_features.std(axis=0).min():.4f}, max={X_features.std(axis=0).max():.4f}")
# Save
out_path = PROJECT_ROOT / "features.npz"
np.savez_compressed(str(out_path), X=X_features, y=y, subjects=subjects)
print(f" Saved to {out_path} ({out_path.stat().st_size / 1e6:.1f} MB)")
return X_features, y, subjects
if __name__ == "__main__":
X_features, y, subjects = build_feature_matrix()