""" Multimodal scene dataset for Experiment 1: Activity Recognition. Loads aligned 100Hz multi-modal data, supports modality selection, subject-independent splits, and variable-length sequence handling. """ import os import json import numpy as np import pandas as pd import torch from torch.utils.data import Dataset, DataLoader from torch.nn.utils.rnn import pad_sequence DATASET_DIR = "${PULSE_ROOT}/dataset" MODALITY_FILES = { 'mocap': None, # Special: uses aligned_{vol}{scene}_s_Q.tsv (skeleton data) 'emg': 'aligned_emg_100hz.csv', 'eyetrack': 'aligned_eyetrack_100hz.csv', 'imu': 'aligned_imu_100hz.csv', 'pressure': 'aligned_pressure_100hz.csv', 'video': 'video_features_100hz.npy', # ViT-B/16 (ImageNet) 'videomae': 'video_features_videomae_100hz.npy', # VideoMAE (Kinetics-400) } def get_modality_filepath(scenario_dir, modality, vol=None, scenario=None): """Return the file path for a given modality. Mocap uses a special naming pattern: aligned_{vol}{scene}_s_Q.tsv All other modalities use MODALITY_FILES directly. """ if modality == 'mocap': if vol is None or scenario is None: raise ValueError("vol and scenario required for mocap modality") return os.path.join(scenario_dir, f"aligned_{vol}{scenario}_s_Q.tsv") return os.path.join(scenario_dir, MODALITY_FILES[modality]) SKIP_COLS = {'Frame', 'Time', 'time', 'UTC'} SKIP_COL_SUFFIXES = (' Type',) # Eyetrack exports sometimes include volunteer-specific marker/ICA columns. # Benchmark inputs use the fixed 24 core gaze columns below; recordings missing # any core column are skipped instead of truncating the full dataset. EYETRACK_SKIP_PATTERNS = ('Index Of Cognitive Activity', 'Marker Coordinates', 'Markers_') EYETRACK_CORE_COLS = [ 'Dikablis Glasses 3_Eye Data_Original_Pupil X', 'Dikablis Glasses 3_Eye Data_Original_Pupil Y', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil X', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Y', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Area', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Height', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Width', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Fixations_Fixations', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Fixations_Fixations Duration', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Saccades_Saccades', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Saccades_Saccades Duration', 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Saccades_Saccades Angle', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil X', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Y', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Area', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Height', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Width', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Fixations_Fixations', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Fixations_Fixations Duration', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Saccades_Saccades', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Saccades_Saccades Duration', 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Saccades_Saccades Angle', 'Dikablis Glasses 3_Field Data_Scene Cam_Original_Gaze_Gaze X', 'Dikablis Glasses 3_Field Data_Scene Cam_Original_Gaze_Gaze Y', ] EYETRACK_EXCLUDED_RECORDINGS = {('v1', 's1'), ('v14', 's8')} SCENE_LABELS = {f's{i}': i - 1 for i in range(1, 9)} NUM_CLASSES = 8 TRAIN_VOLS = ['v1', 'v2', 'v11', 'v12', 'v13', 'v15', 'v16', 'v17', 'v19', 'v20', 'v21', 'v22', 'v23', 'v24'] VAL_VOLS = [] # No separate val set; use train for early stopping or cross-val TEST_VOLS = ['v25', 'v26', 'v27', 'v3'] def _preprocess_mocap_skeleton(arr, feat_cols): """Convert absolute skeleton coords to hip-relative positions + velocity. Input: (T, F) with absolute XYZ + quaternions Output: (T, F + N_pos) where N_pos = number of XYZ position features [hip-relative features, XYZ velocity] """ col_to_idx = {c: i for i, c in enumerate(feat_cols)} # Find hip position for subtraction hip_x_idx = col_to_idx.get('Hips_X') hip_y_idx = col_to_idx.get('Hips_Y') hip_z_idx = col_to_idx.get('Hips_Z') if hip_x_idx is None: return arr # No hip joint found, skip preprocessing # Identify all position columns (_X, _Y, _Z) x_indices = [i for i, c in enumerate(feat_cols) if c.endswith('_X')] y_indices = [i for i, c in enumerate(feat_cols) if c.endswith('_Y')] z_indices = [i for i, c in enumerate(feat_cols) if c.endswith('_Z')] all_pos_indices = sorted(x_indices + y_indices + z_indices) # 1. Make XYZ positions hip-relative arr_rel = arr.copy() hip_xyz = arr[:, [hip_x_idx, hip_y_idx, hip_z_idx]] # (T, 3) for idx in x_indices: arr_rel[:, idx] -= hip_xyz[:, 0] for idx in y_indices: arr_rel[:, idx] -= hip_xyz[:, 1] for idx in z_indices: arr_rel[:, idx] -= hip_xyz[:, 2] # 2. Compute velocity of position features only pos_data = arr_rel[:, all_pos_indices] # (T, N_pos) velocity = np.zeros_like(pos_data) velocity[1:] = pos_data[1:] - pos_data[:-1] # 3. Concatenate: [hip-relative features (pos+quat), position velocity] return np.concatenate([arr_rel, velocity], axis=1) def load_modality_array(filepath, modality): """Load a modality CSV/TSV/NPY and return numpy_array. Returns None if data is corrupted (extreme values or mostly zeros).""" # Video features stored as .npy if filepath.endswith('.npy'): if not os.path.exists(filepath): return None arr = np.load(filepath).astype(np.float32) arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) return arr # Mocap uses TSV with tab separator sep = '\t' if filepath.endswith('.tsv') else ',' df = pd.read_csv(filepath, sep=sep, low_memory=False) df.columns = [str(c).strip() for c in df.columns] if modality == 'eyetrack': parts = os.path.normpath(filepath).split(os.sep) if len(parts) >= 3 and (parts[-3], parts[-2]) in EYETRACK_EXCLUDED_RECORDINGS: return None feat_cols = [c for c in df.columns if c not in SKIP_COLS and not any(c.endswith(s) for s in SKIP_COL_SUFFIXES)] if modality == 'eyetrack': feat_cols = [c for c in EYETRACK_CORE_COLS if c in feat_cols] if len(feat_cols) != len(EYETRACK_CORE_COLS): return None sub = df[feat_cols] # Coerce non-numeric columns obj_cols = sub.select_dtypes(include=['object']).columns if len(obj_cols) > 0: sub = sub.copy() sub[obj_cols] = sub[obj_cols].apply(pd.to_numeric, errors='coerce') arr = sub.values.astype(np.float64) arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) # Quality check: reject samples with extreme values (corrupted data) max_abs = np.max(np.abs(arr)) if max_abs > 1e6: return None # Corrupted # Quality check: reject samples that are mostly zeros (sensor dropout). # Pressure and EMG are legitimately zero for long periods (rest, no grip) # so we only apply the strict near-total-loss check to the modalities # where a flat-zero stream is a clear dropout signal. if modality not in ("pressure", "emg"): zero_ratio = np.mean(arr == 0.0) if zero_ratio > 0.9: return None # Near-total data loss # Mocap skeleton: convert to hip-relative + velocity if modality == 'mocap' and filepath.endswith('.tsv'): arr = _preprocess_mocap_skeleton(arr, feat_cols) arr = arr.astype(np.float32) return arr class MultimodalSceneDataset(Dataset): """Dataset for scene-level classification from multimodal time series.""" def __init__(self, volunteers, modalities, downsample=5, stats=None): self.modalities = modalities self.downsample = downsample self.data = [] self.labels = [] self.sample_info = [] self._modality_dims = {} for vol in volunteers: vol_dir = os.path.join(DATASET_DIR, vol) if not os.path.isdir(vol_dir): continue for scenario in sorted(os.listdir(vol_dir)): scenario_dir = os.path.join(vol_dir, scenario) if not os.path.isdir(scenario_dir) or scenario not in SCENE_LABELS: continue meta_path = os.path.join(scenario_dir, 'alignment_metadata.json') if not os.path.exists(meta_path): continue with open(meta_path) as f: meta = json.load(f) available = set(meta['modalities']) if not set(modalities).issubset(available): continue parts = [] skip = False for mod in modalities: if mod == 'mocap': # Skeleton data: aligned_{vol}{scene}_s_Q.tsv tsv_name = f"aligned_{vol}{scenario}_s_Q.tsv" filepath = os.path.join(scenario_dir, tsv_name) else: filepath = os.path.join(scenario_dir, MODALITY_FILES[mod]) if not os.path.exists(filepath): skip = True break arr = load_modality_array(filepath, mod) if arr is None: print(f" SKIP {vol}/{scenario} {mod}: corrupted data", flush=True) skip = True break # Validate dimension consistency if mod in self._modality_dims and arr.shape[1] != self._modality_dims[mod]: print(f" WARNING: {vol}/{scenario} {mod} dim {arr.shape[1]} " f"!= expected {self._modality_dims[mod]}, padding/truncating", flush=True) expected = self._modality_dims[mod] if arr.shape[1] < expected: pad = np.zeros((arr.shape[0], expected - arr.shape[1]), dtype=np.float32) arr = np.concatenate([arr, pad], axis=1) else: arr = arr[:, :expected] if mod not in self._modality_dims: self._modality_dims[mod] = arr.shape[1] parts.append(arr) if skip: continue min_len = min(p.shape[0] for p in parts) parts = [p[:min_len] for p in parts] combined = np.concatenate(parts, axis=1) combined = combined[::downsample] self.data.append(combined) self.labels.append(SCENE_LABELS[scenario]) self.sample_info.append(f"{vol}/{scenario}") print(f" Loaded {len(self.data)} samples, modality dims: {self._modality_dims}, " f"total feat dim: {sum(self._modality_dims.values())}", flush=True) # Normalization (compute in float64 to avoid overflow) if stats is not None: self.mean, self.std = stats else: self._compute_stats() for i in range(len(self.data)): self.data[i] = ((self.data[i].astype(np.float64) - self.mean) / self.std).astype(np.float32) self.data[i] = np.nan_to_num(self.data[i], nan=0.0, posinf=0.0, neginf=0.0) def _compute_stats(self): # Use float64 for accumulation to prevent overflow all_frames = np.concatenate(self.data, axis=0).astype(np.float64) self.mean = np.mean(all_frames, axis=0, keepdims=True) self.std = np.std(all_frames, axis=0, keepdims=True) self.std[self.std < 1e-8] = 1.0 def get_stats(self): return (self.mean, self.std) @property def feat_dim(self): return sum(self._modality_dims.values()) @property def modality_dims(self): return dict(self._modality_dims) def get_class_weights(self): counts = np.bincount(self.labels, minlength=NUM_CLASSES).astype(np.float32) counts[counts == 0] = 1.0 weights = 1.0 / counts weights = weights / weights.sum() * NUM_CLASSES return torch.FloatTensor(weights) def __len__(self): return len(self.data) def __getitem__(self, idx): return torch.from_numpy(self.data[idx]), self.labels[idx] def collate_fn(batch): """Pad variable-length sequences and create masks.""" sequences, labels = zip(*batch) lengths = torch.LongTensor([s.shape[0] for s in sequences]) padded = pad_sequence(sequences, batch_first=True, padding_value=0.0) max_len = padded.shape[1] mask = torch.arange(max_len).unsqueeze(0) < lengths.unsqueeze(1) labels = torch.LongTensor(labels) return padded, labels, mask, lengths def get_dataloaders(modalities, batch_size=16, downsample=5, num_workers=0): """Create train/val/test DataLoaders with proper normalization.""" print("Loading training data...", flush=True) train_ds = MultimodalSceneDataset(TRAIN_VOLS, modalities, downsample) stats = train_ds.get_stats() print("Loading validation data...", flush=True) val_ds = MultimodalSceneDataset(VAL_VOLS, modalities, downsample, stats=stats) print("Loading test data...", flush=True) test_ds = MultimodalSceneDataset(TEST_VOLS, modalities, downsample, stats=stats) train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fn, num_workers=num_workers, drop_last=False) val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=num_workers) test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=num_workers) info = { 'feat_dim': train_ds.feat_dim, 'modality_dims': train_ds.modality_dims, 'num_classes': NUM_CLASSES, 'train_size': len(train_ds), 'val_size': len(val_ds), 'test_size': len(test_ds), 'class_weights': train_ds.get_class_weights(), } return train_loader, val_loader, test_loader, info