| """ |
| Multimodal scene dataset for Experiment 1: Activity Recognition. |
| Loads aligned 100Hz multi-modal data, supports modality selection, |
| subject-independent splits, and variable-length sequence handling. |
| """ |
|
|
| import os |
| import json |
| import numpy as np |
| import pandas as pd |
| import torch |
| from torch.utils.data import Dataset, DataLoader |
| from torch.nn.utils.rnn import pad_sequence |
|
|
| DATASET_DIR = "${PULSE_ROOT}/dataset" |
|
|
| MODALITY_FILES = { |
| 'mocap': None, |
| 'emg': 'aligned_emg_100hz.csv', |
| 'eyetrack': 'aligned_eyetrack_100hz.csv', |
| 'imu': 'aligned_imu_100hz.csv', |
| 'pressure': 'aligned_pressure_100hz.csv', |
| 'video': 'video_features_100hz.npy', |
| 'videomae': 'video_features_videomae_100hz.npy', |
| } |
|
|
|
|
| def get_modality_filepath(scenario_dir, modality, vol=None, scenario=None): |
| """Return the file path for a given modality. |
| |
| Mocap uses a special naming pattern: aligned_{vol}{scene}_s_Q.tsv |
| All other modalities use MODALITY_FILES directly. |
| """ |
| if modality == 'mocap': |
| if vol is None or scenario is None: |
| raise ValueError("vol and scenario required for mocap modality") |
| return os.path.join(scenario_dir, f"aligned_{vol}{scenario}_s_Q.tsv") |
| return os.path.join(scenario_dir, MODALITY_FILES[modality]) |
|
|
| SKIP_COLS = {'Frame', 'Time', 'time', 'UTC'} |
| SKIP_COL_SUFFIXES = (' Type',) |
|
|
| |
| |
| |
| EYETRACK_SKIP_PATTERNS = ('Index Of Cognitive Activity', 'Marker Coordinates', 'Markers_') |
| EYETRACK_CORE_COLS = [ |
| 'Dikablis Glasses 3_Eye Data_Original_Pupil X', |
| 'Dikablis Glasses 3_Eye Data_Original_Pupil Y', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil X', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Y', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Area', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Height', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Pupil Width', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Fixations_Fixations', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Fixations_Fixations Duration', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Saccades_Saccades', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Saccades_Saccades Duration', |
| 'Dikablis Glasses 3_Eye Data_Original_Left Eye_Saccades_Saccades Angle', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil X', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Y', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Area', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Height', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Pupil Width', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Fixations_Fixations', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Fixations_Fixations Duration', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Saccades_Saccades', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Saccades_Saccades Duration', |
| 'Dikablis Glasses 3_Eye Data_Original_Right Eye_Saccades_Saccades Angle', |
| 'Dikablis Glasses 3_Field Data_Scene Cam_Original_Gaze_Gaze X', |
| 'Dikablis Glasses 3_Field Data_Scene Cam_Original_Gaze_Gaze Y', |
| ] |
| EYETRACK_EXCLUDED_RECORDINGS = {('v1', 's1'), ('v14', 's8')} |
|
|
| SCENE_LABELS = {f's{i}': i - 1 for i in range(1, 9)} |
| NUM_CLASSES = 8 |
|
|
| TRAIN_VOLS = ['v1', 'v2', 'v11', 'v12', 'v13', 'v15', 'v16', 'v17', 'v19', 'v20', 'v21', 'v22', 'v23', 'v24'] |
| VAL_VOLS = [] |
| TEST_VOLS = ['v25', 'v26', 'v27', 'v3'] |
|
|
|
|
| def _preprocess_mocap_skeleton(arr, feat_cols): |
| """Convert absolute skeleton coords to hip-relative positions + velocity. |
| |
| Input: (T, F) with absolute XYZ + quaternions |
| Output: (T, F + N_pos) where N_pos = number of XYZ position features |
| [hip-relative features, XYZ velocity] |
| """ |
| col_to_idx = {c: i for i, c in enumerate(feat_cols)} |
|
|
| |
| hip_x_idx = col_to_idx.get('Hips_X') |
| hip_y_idx = col_to_idx.get('Hips_Y') |
| hip_z_idx = col_to_idx.get('Hips_Z') |
| if hip_x_idx is None: |
| return arr |
|
|
| |
| x_indices = [i for i, c in enumerate(feat_cols) if c.endswith('_X')] |
| y_indices = [i for i, c in enumerate(feat_cols) if c.endswith('_Y')] |
| z_indices = [i for i, c in enumerate(feat_cols) if c.endswith('_Z')] |
| all_pos_indices = sorted(x_indices + y_indices + z_indices) |
|
|
| |
| arr_rel = arr.copy() |
| hip_xyz = arr[:, [hip_x_idx, hip_y_idx, hip_z_idx]] |
| for idx in x_indices: |
| arr_rel[:, idx] -= hip_xyz[:, 0] |
| for idx in y_indices: |
| arr_rel[:, idx] -= hip_xyz[:, 1] |
| for idx in z_indices: |
| arr_rel[:, idx] -= hip_xyz[:, 2] |
|
|
| |
| pos_data = arr_rel[:, all_pos_indices] |
| velocity = np.zeros_like(pos_data) |
| velocity[1:] = pos_data[1:] - pos_data[:-1] |
|
|
| |
| return np.concatenate([arr_rel, velocity], axis=1) |
|
|
|
|
| def load_modality_array(filepath, modality): |
| """Load a modality CSV/TSV/NPY and return numpy_array. |
| Returns None if data is corrupted (extreme values or mostly zeros).""" |
| |
| if filepath.endswith('.npy'): |
| if not os.path.exists(filepath): |
| return None |
| arr = np.load(filepath).astype(np.float32) |
| arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) |
| return arr |
| |
| sep = '\t' if filepath.endswith('.tsv') else ',' |
| df = pd.read_csv(filepath, sep=sep, low_memory=False) |
| df.columns = [str(c).strip() for c in df.columns] |
| if modality == 'eyetrack': |
| parts = os.path.normpath(filepath).split(os.sep) |
| if len(parts) >= 3 and (parts[-3], parts[-2]) in EYETRACK_EXCLUDED_RECORDINGS: |
| return None |
| feat_cols = [c for c in df.columns |
| if c not in SKIP_COLS |
| and not any(c.endswith(s) for s in SKIP_COL_SUFFIXES)] |
| if modality == 'eyetrack': |
| feat_cols = [c for c in EYETRACK_CORE_COLS if c in feat_cols] |
| if len(feat_cols) != len(EYETRACK_CORE_COLS): |
| return None |
| sub = df[feat_cols] |
| |
| obj_cols = sub.select_dtypes(include=['object']).columns |
| if len(obj_cols) > 0: |
| sub = sub.copy() |
| sub[obj_cols] = sub[obj_cols].apply(pd.to_numeric, errors='coerce') |
| arr = sub.values.astype(np.float64) |
| arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) |
| |
| max_abs = np.max(np.abs(arr)) |
| if max_abs > 1e6: |
| return None |
| |
| |
| |
| |
| if modality not in ("pressure", "emg"): |
| zero_ratio = np.mean(arr == 0.0) |
| if zero_ratio > 0.9: |
| return None |
| |
| if modality == 'mocap' and filepath.endswith('.tsv'): |
| arr = _preprocess_mocap_skeleton(arr, feat_cols) |
| arr = arr.astype(np.float32) |
| return arr |
|
|
|
|
| class MultimodalSceneDataset(Dataset): |
| """Dataset for scene-level classification from multimodal time series.""" |
|
|
| def __init__(self, volunteers, modalities, downsample=5, stats=None): |
| self.modalities = modalities |
| self.downsample = downsample |
| self.data = [] |
| self.labels = [] |
| self.sample_info = [] |
| self._modality_dims = {} |
|
|
| for vol in volunteers: |
| vol_dir = os.path.join(DATASET_DIR, vol) |
| if not os.path.isdir(vol_dir): |
| continue |
| for scenario in sorted(os.listdir(vol_dir)): |
| scenario_dir = os.path.join(vol_dir, scenario) |
| if not os.path.isdir(scenario_dir) or scenario not in SCENE_LABELS: |
| continue |
| meta_path = os.path.join(scenario_dir, 'alignment_metadata.json') |
| if not os.path.exists(meta_path): |
| continue |
| with open(meta_path) as f: |
| meta = json.load(f) |
| available = set(meta['modalities']) |
| if not set(modalities).issubset(available): |
| continue |
|
|
| parts = [] |
| skip = False |
| for mod in modalities: |
| if mod == 'mocap': |
| |
| tsv_name = f"aligned_{vol}{scenario}_s_Q.tsv" |
| filepath = os.path.join(scenario_dir, tsv_name) |
| else: |
| filepath = os.path.join(scenario_dir, MODALITY_FILES[mod]) |
| if not os.path.exists(filepath): |
| skip = True |
| break |
| arr = load_modality_array(filepath, mod) |
| if arr is None: |
| print(f" SKIP {vol}/{scenario} {mod}: corrupted data", flush=True) |
| skip = True |
| break |
| |
| if mod in self._modality_dims and arr.shape[1] != self._modality_dims[mod]: |
| print(f" WARNING: {vol}/{scenario} {mod} dim {arr.shape[1]} " |
| f"!= expected {self._modality_dims[mod]}, padding/truncating", |
| flush=True) |
| expected = self._modality_dims[mod] |
| if arr.shape[1] < expected: |
| pad = np.zeros((arr.shape[0], expected - arr.shape[1]), dtype=np.float32) |
| arr = np.concatenate([arr, pad], axis=1) |
| else: |
| arr = arr[:, :expected] |
| if mod not in self._modality_dims: |
| self._modality_dims[mod] = arr.shape[1] |
| parts.append(arr) |
|
|
| if skip: |
| continue |
|
|
| min_len = min(p.shape[0] for p in parts) |
| parts = [p[:min_len] for p in parts] |
| combined = np.concatenate(parts, axis=1) |
| combined = combined[::downsample] |
|
|
| self.data.append(combined) |
| self.labels.append(SCENE_LABELS[scenario]) |
| self.sample_info.append(f"{vol}/{scenario}") |
|
|
| print(f" Loaded {len(self.data)} samples, modality dims: {self._modality_dims}, " |
| f"total feat dim: {sum(self._modality_dims.values())}", flush=True) |
|
|
| |
| if stats is not None: |
| self.mean, self.std = stats |
| else: |
| self._compute_stats() |
| for i in range(len(self.data)): |
| self.data[i] = ((self.data[i].astype(np.float64) - self.mean) / self.std).astype(np.float32) |
| self.data[i] = np.nan_to_num(self.data[i], nan=0.0, posinf=0.0, neginf=0.0) |
|
|
| def _compute_stats(self): |
| |
| all_frames = np.concatenate(self.data, axis=0).astype(np.float64) |
| self.mean = np.mean(all_frames, axis=0, keepdims=True) |
| self.std = np.std(all_frames, axis=0, keepdims=True) |
| self.std[self.std < 1e-8] = 1.0 |
|
|
| def get_stats(self): |
| return (self.mean, self.std) |
|
|
| @property |
| def feat_dim(self): |
| return sum(self._modality_dims.values()) |
|
|
| @property |
| def modality_dims(self): |
| return dict(self._modality_dims) |
|
|
| def get_class_weights(self): |
| counts = np.bincount(self.labels, minlength=NUM_CLASSES).astype(np.float32) |
| counts[counts == 0] = 1.0 |
| weights = 1.0 / counts |
| weights = weights / weights.sum() * NUM_CLASSES |
| return torch.FloatTensor(weights) |
|
|
| def __len__(self): |
| return len(self.data) |
|
|
| def __getitem__(self, idx): |
| return torch.from_numpy(self.data[idx]), self.labels[idx] |
|
|
|
|
| def collate_fn(batch): |
| """Pad variable-length sequences and create masks.""" |
| sequences, labels = zip(*batch) |
| lengths = torch.LongTensor([s.shape[0] for s in sequences]) |
| padded = pad_sequence(sequences, batch_first=True, padding_value=0.0) |
| max_len = padded.shape[1] |
| mask = torch.arange(max_len).unsqueeze(0) < lengths.unsqueeze(1) |
| labels = torch.LongTensor(labels) |
| return padded, labels, mask, lengths |
|
|
|
|
| def get_dataloaders(modalities, batch_size=16, downsample=5, num_workers=0): |
| """Create train/val/test DataLoaders with proper normalization.""" |
| print("Loading training data...", flush=True) |
| train_ds = MultimodalSceneDataset(TRAIN_VOLS, modalities, downsample) |
| stats = train_ds.get_stats() |
|
|
| print("Loading validation data...", flush=True) |
| val_ds = MultimodalSceneDataset(VAL_VOLS, modalities, downsample, stats=stats) |
|
|
| print("Loading test data...", flush=True) |
| test_ds = MultimodalSceneDataset(TEST_VOLS, modalities, downsample, stats=stats) |
|
|
| train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, |
| collate_fn=collate_fn, num_workers=num_workers, |
| drop_last=False) |
| val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, |
| collate_fn=collate_fn, num_workers=num_workers) |
| test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, |
| collate_fn=collate_fn, num_workers=num_workers) |
|
|
| info = { |
| 'feat_dim': train_ds.feat_dim, |
| 'modality_dims': train_ds.modality_dims, |
| 'num_classes': NUM_CLASSES, |
| 'train_size': len(train_ds), |
| 'val_size': len(val_ds), |
| 'test_size': len(test_ds), |
| 'class_weights': train_ds.get_class_weights(), |
| } |
| return train_loader, val_loader, test_loader, info |
|
|