| """ |
| Segment-to-Next-Segment Triplet Prediction dataset (T10). |
| |
| For every annotated action segment k in every recording: |
| anchor_t = start_time(segment_k) - T_fut (seconds) |
| observation = sensor frames in [anchor_t - T_obs, anchor_t] |
| target = triplet labels of segment_k: (verb_fine, verb_composite, |
| noun, hand) |
| |
| Segments whose observation window would spill before t=0 of the recording |
| are skipped (no left-padding), so we never mix noise with real sensor data. |
| |
| Strategy A is enforced in taxonomy.classify_segment(): segments whose noun is |
| not in the kept set (<50 occurrences) are dropped entirely. |
| |
| Per-modality tensors are returned as a dict so downstream models can either |
| concat them (single-flow baselines) or keep them separate (our cross-modal |
| fusion model). A float mask is returned alongside the sensor tensor so |
| variable-length obs windows can be padded within a batch. |
| """ |
|
|
| from __future__ import annotations |
|
|
| |
| |
| import pandas as pd |
|
|
| import json |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Dict, List, Optional, Sequence, Tuple |
|
|
| import numpy as np |
| import torch |
| from torch.utils.data import Dataset |
|
|
| |
| |
| _THIS = Path(__file__).resolve() |
| sys.path.insert(0, str(_THIS.parent)) |
| sys.path.insert(0, str(_THIS.parent.parent)) |
|
|
| try: |
| from data.dataset import ( |
| MODALITY_FILES, load_modality_array, |
| ) |
| from experiments.taxonomy import ( |
| classify_segment, NOUN, NUM_VERB_FINE, NUM_VERB_COMPOSITE, NUM_NOUN, |
| NUM_HAND, |
| ) |
| except ModuleNotFoundError: |
| from dataset import ( |
| MODALITY_FILES, load_modality_array, |
| ) |
| from taxonomy import ( |
| classify_segment, NOUN, NUM_VERB_FINE, NUM_VERB_COMPOSITE, NUM_NOUN, |
| NUM_HAND, |
| ) |
|
|
| |
| |
| |
|
|
| |
| |
| |
| REPO = Path(os.environ.get( |
| "DAILYACT_REPO", "${PULSE_ROOT}" |
| )) |
| DEFAULT_DATASET_DIR = REPO / "aligned_gy" |
| DEFAULT_ANNOT_DIR = REPO / "annotations_v3" |
|
|
| SAMPLING_RATE_HZ = 100 |
| |
| DEFAULT_DOWNSAMPLE = 5 |
|
|
| VALID_MODALITIES = ("mocap", "emg", "eyetrack", "imu", "pressure") |
|
|
| |
| |
| |
| |
| TEST_VOLS_V3 = ["v14", "v30", "v34", "v38", "v41"] |
| TRAIN_VOLS_V3 = [ |
| "v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8", "v9", "v10", |
| "v11", "v12", "v13", "v15", "v16", "v17", "v18", "v19", "v20", |
| "v21", "v22", "v23", "v24", "v25", "v26", "v27", "v28", "v29", |
| "v31", "v32", "v33", "v35", "v36", "v37", "v39", "v40", |
| ] |
| assert set(TRAIN_VOLS_V3).isdisjoint(TEST_VOLS_V3), "Split must be disjoint" |
|
|
|
|
| |
| |
| |
|
|
| def _parse_ts(ts: str) -> float: |
| """Parse 'HH:MM:SS' or 'MM:SS' (or 'M:S') into seconds.""" |
| parts = ts.strip().split(":") |
| try: |
| if len(parts) == 2: |
| return float(parts[0]) * 60 + float(parts[1]) |
| if len(parts) == 3: |
| return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) |
| except ValueError: |
| return 0.0 |
| return 0.0 |
|
|
|
|
| def parse_ts_range(ts_range: str) -> Tuple[float, float]: |
| """Parse 'MM:SS-MM:SS' or 'HH:MM:SS-HH:MM:SS' into (start_sec, end_sec).""" |
| if "-" not in ts_range: |
| return 0.0, 0.0 |
| a, b = ts_range.split("-", 1) |
| return _parse_ts(a), _parse_ts(b) |
|
|
|
|
| def _load_recording_sensors( |
| scenario_dir: Path, vol: str, scenario: str, |
| modalities: Sequence[str], |
| ) -> Optional[Dict[str, np.ndarray]]: |
| """Load each requested modality as a (T, F_mod) float32 array at 100 Hz. |
| |
| Returns None if any requested modality is missing or corrupted.""" |
| out: Dict[str, np.ndarray] = {} |
| for mod in modalities: |
| if mod == "mocap": |
| fp = scenario_dir / f"aligned_{vol}{scenario}_s_Q.tsv" |
| else: |
| fp = scenario_dir / MODALITY_FILES[mod] |
| if not fp.exists(): |
| return None |
| arr = load_modality_array(str(fp), mod) |
| if arr is None: |
| return None |
| out[mod] = arr.astype(np.float32) |
| |
| T = min(a.shape[0] for a in out.values()) |
| for m in out: |
| out[m] = out[m][:T] |
| return out |
|
|
|
|
| def _load_annotations(annot_path: Path) -> List[dict]: |
| with open(annot_path) as f: |
| d = json.load(f) |
| return d.get("segments", []) |
|
|
|
|
| |
| |
| |
|
|
| class TripletSeqPredDataset(Dataset): |
| """One sample per (annotated segment, recording) pair. |
| |
| Sample schema returned by __getitem__: |
| x: dict {mod_name: FloatTensor(T_frames, F_mod)} |
| y: dict {'verb_fine': int, 'verb_composite': int, |
| 'noun': int, 'hand': int} |
| meta: dict {'vol', 'scene', 'seg_idx', 'anchor_sec'} |
| """ |
|
|
| def __init__( |
| self, |
| volunteers: Sequence[str], |
| modalities: Sequence[str] = ("imu", "mocap", "emg", "eyetrack", "pressure"), |
| t_obs_sec: float = 8.0, |
| t_fut_sec: float = 2.0, |
| downsample: int = DEFAULT_DOWNSAMPLE, |
| dataset_dir: Path = DEFAULT_DATASET_DIR, |
| annot_dir: Path = DEFAULT_ANNOT_DIR, |
| stats: Optional[Dict[str, Tuple[np.ndarray, np.ndarray]]] = None, |
| min_seg_duration_sec: float = 0.4, |
| log: bool = True, |
| mode: str = "recognition", |
| ): |
| for m in modalities: |
| if m not in VALID_MODALITIES: |
| raise ValueError(f"Unknown modality: {m}") |
| if mode not in ("recognition", "anticipation"): |
| raise ValueError(f"mode must be 'recognition' or 'anticipation', got {mode!r}") |
|
|
| self.modalities = tuple(modalities) |
| self.t_obs_sec = float(t_obs_sec) |
| self.t_fut_sec = float(t_fut_sec) |
| self.downsample = int(downsample) |
| self.dataset_dir = Path(dataset_dir) |
| self.annot_dir = Path(annot_dir) |
| self.mode = mode |
|
|
| |
| sr = SAMPLING_RATE_HZ // self.downsample |
| self.T_frames = int(round(self.t_obs_sec * sr)) |
| self._sr_down = sr |
|
|
| self._items: List[dict] = [] |
| self._modality_dims: Dict[str, int] = {} |
|
|
| |
| |
| if stats is not None: |
| for m, (mu, _) in stats.items(): |
| self._modality_dims[m] = mu.shape[1] |
|
|
| stats_counts = { |
| "recordings_scanned": 0, |
| "recordings_used": 0, |
| "segments_seen": 0, |
| "seg_dropped_label": 0, |
| "seg_dropped_too_early": 0, |
| "seg_dropped_short": 0, |
| "seg_kept": 0, |
| } |
|
|
| for vol in volunteers: |
| vol_dir = self.dataset_dir / vol |
| if not vol_dir.is_dir(): |
| continue |
| for scenario_dir in sorted(vol_dir.glob("s*")): |
| if not scenario_dir.is_dir(): |
| continue |
| scene = scenario_dir.name |
| if scene not in {f"s{i}" for i in range(1, 9)}: |
| continue |
|
|
| annot_path = self.annot_dir / vol / f"{scene}.json" |
| if not annot_path.exists(): |
| continue |
|
|
| stats_counts["recordings_scanned"] += 1 |
|
|
| sensors = _load_recording_sensors(scenario_dir, vol, scene, |
| self.modalities) |
| if sensors is None: |
| continue |
|
|
| |
| for m, arr in sensors.items(): |
| if m in self._modality_dims: |
| if arr.shape[1] != self._modality_dims[m]: |
| |
| target = self._modality_dims[m] |
| if arr.shape[1] < target: |
| pad = np.zeros((arr.shape[0], target - arr.shape[1]), |
| dtype=np.float32) |
| sensors[m] = np.concatenate([arr, pad], axis=1) |
| else: |
| sensors[m] = arr[:, :target] |
| else: |
| self._modality_dims[m] = arr.shape[1] |
|
|
| segs = _load_annotations(annot_path) |
| rec_used = False |
| |
| BOS_VC = NUM_VERB_COMPOSITE |
| BOS_N = NUM_NOUN |
| prev_vc, prev_n = BOS_VC, BOS_N |
| for seg_idx, seg in enumerate(segs): |
| stats_counts["segments_seen"] += 1 |
| a = seg.get("action_annotation", {}) |
| labels = classify_segment(a) |
| if labels is None: |
| stats_counts["seg_dropped_label"] += 1 |
| |
| continue |
|
|
| start_sec, end_sec = parse_ts_range(seg.get("timestamp", "")) |
| if end_sec - start_sec < min_seg_duration_sec: |
| stats_counts["seg_dropped_short"] += 1 |
| continue |
|
|
| if self.mode == "anticipation": |
| anchor_sec = start_sec - self.t_fut_sec |
| obs_start_sec = anchor_sec - self.t_obs_sec |
| if obs_start_sec < 0: |
| stats_counts["seg_dropped_too_early"] += 1 |
| continue |
| i0 = int(round(obs_start_sec * SAMPLING_RATE_HZ)) |
| i1 = int(round(anchor_sec * SAMPLING_RATE_HZ)) |
| meta_extra = {"anchor_sec": anchor_sec} |
| else: |
| |
| i0 = int(round(start_sec * SAMPLING_RATE_HZ)) |
| i1 = int(round(end_sec * SAMPLING_RATE_HZ)) |
| meta_extra = {"start_sec": start_sec, "end_sec": end_sec} |
|
|
| T_avail = min(a.shape[0] for a in sensors.values()) |
| if i1 > T_avail: |
| stats_counts["seg_dropped_too_early"] += 1 |
| continue |
| if i0 < 0: |
| i0 = 0 |
|
|
| window: Dict[str, np.ndarray] = {} |
| for m, arr in sensors.items(): |
| w = arr[i0:i1] |
| |
| w = w[::self.downsample] |
| window[m] = w |
|
|
| |
| min_T = min(w.shape[0] for w in window.values()) |
| if min_T < 4: |
| stats_counts["seg_dropped_short"] += 1 |
| continue |
|
|
| self._items.append({ |
| "x": window, |
| "y": labels, |
| "prev": {"verb_composite": prev_vc, "noun": prev_n}, |
| "meta": { |
| "vol": vol, "scene": scene, |
| "seg_idx": seg_idx, **meta_extra, |
| }, |
| }) |
| stats_counts["seg_kept"] += 1 |
| |
| prev_vc = labels["verb_composite"] |
| prev_n = labels["noun"] |
| rec_used = True |
|
|
| if rec_used: |
| stats_counts["recordings_used"] += 1 |
|
|
| if len(self._items) == 0: |
| raise RuntimeError( |
| "No samples collected. Check annot_dir, modalities, t_obs, t_fut." |
| ) |
|
|
| |
| if stats is None: |
| stats = self._compute_stats() |
| self._stats = stats |
| self._apply_stats(stats) |
|
|
| if log: |
| print(f"[TripletSeqPredDataset:{self.mode}] " |
| f"vols={len(volunteers)} " |
| f"recs_scan={stats_counts['recordings_scanned']} " |
| f"recs_used={stats_counts['recordings_used']} " |
| f"segs_seen={stats_counts['segments_seen']} " |
| f"kept={stats_counts['seg_kept']} " |
| f"drop_label={stats_counts['seg_dropped_label']} " |
| f"drop_early={stats_counts['seg_dropped_too_early']} " |
| f"drop_short={stats_counts['seg_dropped_short']}", |
| flush=True) |
| print(f" modality_dims={self._modality_dims} " |
| f"T_frames={self.T_frames} sr_down={sr}Hz", |
| flush=True) |
| self.stats_counts = stats_counts |
|
|
| |
| def _compute_stats(self) -> Dict[str, Tuple[np.ndarray, np.ndarray]]: |
| acc: Dict[str, List[np.ndarray]] = {m: [] for m in self.modalities} |
| for it in self._items: |
| for m, w in it["x"].items(): |
| acc[m].append(w.astype(np.float64)) |
| out: Dict[str, Tuple[np.ndarray, np.ndarray]] = {} |
| for m, arrs in acc.items(): |
| cat = np.concatenate(arrs, axis=0) |
| mu = cat.mean(axis=0, keepdims=True) |
| sd = cat.std(axis=0, keepdims=True) |
| sd[sd < 1e-8] = 1.0 |
| out[m] = (mu.astype(np.float32), sd.astype(np.float32)) |
| return out |
|
|
| def _apply_stats(self, stats: Dict[str, Tuple[np.ndarray, np.ndarray]]) -> None: |
| for it in self._items: |
| for m, w in it["x"].items(): |
| mu, sd = stats[m] |
| z = (w.astype(np.float32) - mu) / sd |
| z = np.nan_to_num(z, nan=0.0, posinf=0.0, neginf=0.0) |
| it["x"][m] = z.astype(np.float32) |
|
|
| def get_stats(self) -> Dict[str, Tuple[np.ndarray, np.ndarray]]: |
| return self._stats |
|
|
| |
| def __len__(self) -> int: |
| return len(self._items) |
|
|
| def __getitem__(self, idx: int): |
| it = self._items[idx] |
| x = {m: torch.from_numpy(w) for m, w in it["x"].items()} |
| y = it["y"] |
| meta = it["meta"] |
| prev = it.get("prev", {"verb_composite": NUM_VERB_COMPOSITE, "noun": NUM_NOUN}) |
| return x, y, meta, prev |
|
|
| |
| @property |
| def modality_dims(self) -> Dict[str, int]: |
| return dict(self._modality_dims) |
|
|
| @property |
| def total_feat_dim(self) -> int: |
| return sum(self._modality_dims.values()) |
|
|
| def class_counts(self) -> Dict[str, np.ndarray]: |
| vf = np.zeros(NUM_VERB_FINE, dtype=np.int64) |
| vc = np.zeros(NUM_VERB_COMPOSITE, dtype=np.int64) |
| n = np.zeros(NUM_NOUN, dtype=np.int64) |
| h = np.zeros(NUM_HAND, dtype=np.int64) |
| for it in self._items: |
| y = it["y"] |
| vf[y["verb_fine"]] += 1 |
| vc[y["verb_composite"]] += 1 |
| n[y["noun"]] += 1 |
| h[y["hand"]] += 1 |
| return {"verb_fine": vf, "verb_composite": vc, "noun": n, "hand": h} |
|
|
|
|
| |
| |
| |
|
|
| def collate_triplet(batch): |
| """Stack samples into batched tensors. Backward-compatible: accepts |
| samples of either (x, y, meta) or (x, y, meta, prev) form. |
| |
| Returned: |
| x: dict[mod] -> FloatTensor (B, T_max, F_mod) |
| mask: BoolTensor (B, T_max) |
| lens: LongTensor (B,) |
| y: dict (each -> LongTensor (B,)) |
| meta: list of dicts |
| prev: dict {'verb_composite': LongTensor (B,), 'noun': LongTensor (B,)} |
| values are class indices, with NUM_VERB_COMPOSITE / NUM_NOUN |
| used as a BOS sentinel for the first segment in a recording. |
| """ |
| has_prev = len(batch[0]) >= 4 |
| if has_prev: |
| xs, ys, metas, prevs = zip(*batch) |
| else: |
| xs, ys, metas = zip(*batch) |
| prevs = [{"verb_composite": NUM_VERB_COMPOSITE, "noun": NUM_NOUN} for _ in batch] |
| B = len(batch) |
| mods = list(xs[0].keys()) |
| lens = torch.tensor([x[mods[0]].shape[0] for x in xs], dtype=torch.long) |
| T_max = int(lens.max().item()) |
|
|
| x_out: Dict[str, torch.Tensor] = {} |
| for m in mods: |
| F = xs[0][m].shape[1] |
| padded = torch.zeros(B, T_max, F, dtype=torch.float32) |
| for i, x in enumerate(xs): |
| w = x[m] |
| padded[i, :w.shape[0]] = w |
| x_out[m] = padded |
|
|
| ar = torch.arange(T_max).unsqueeze(0) |
| mask = ar < lens.unsqueeze(1) |
|
|
| y_out = { |
| k: torch.tensor([y[k] for y in ys], dtype=torch.long) |
| for k in ("verb_fine", "verb_composite", "noun", "hand") |
| } |
| prev_out = { |
| "verb_composite": torch.tensor([p["verb_composite"] for p in prevs], dtype=torch.long), |
| "noun": torch.tensor([p["noun"] for p in prevs], dtype=torch.long), |
| } |
| return x_out, mask, lens, y_out, list(metas), prev_out |
|
|
|
|
| |
| |
| |
|
|
| def build_train_test( |
| modalities: Sequence[str] = ("imu", "mocap", "emg", "eyetrack", "pressure"), |
| t_obs_sec: float = 8.0, |
| t_fut_sec: float = 2.0, |
| downsample: int = DEFAULT_DOWNSAMPLE, |
| dataset_dir: Path = DEFAULT_DATASET_DIR, |
| annot_dir: Path = DEFAULT_ANNOT_DIR, |
| mode: str = "recognition", |
| ) -> Tuple["TripletSeqPredDataset", "TripletSeqPredDataset"]: |
| train = TripletSeqPredDataset( |
| TRAIN_VOLS_V3, modalities=modalities, |
| t_obs_sec=t_obs_sec, t_fut_sec=t_fut_sec, downsample=downsample, |
| dataset_dir=dataset_dir, annot_dir=annot_dir, mode=mode, |
| ) |
| test = TripletSeqPredDataset( |
| TEST_VOLS_V3, modalities=modalities, |
| t_obs_sec=t_obs_sec, t_fut_sec=t_fut_sec, downsample=downsample, |
| dataset_dir=dataset_dir, annot_dir=annot_dir, |
| stats=train.get_stats(), mode=mode, |
| ) |
| return train, test |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import argparse |
|
|
| ap = argparse.ArgumentParser() |
| ap.add_argument("--modalities", type=str, default="imu,emg,eyetrack") |
| ap.add_argument("--t_obs", type=float, default=8.0) |
| ap.add_argument("--t_fut", type=float, default=2.0) |
| ap.add_argument("--smoke_n", type=int, default=3, |
| help="Inspect first N samples per split") |
| args = ap.parse_args() |
|
|
| mods = args.modalities.split(",") |
| print(f"Building train/test with modalities={mods} " |
| f"t_obs={args.t_obs}s t_fut={args.t_fut}s ...") |
| train, test = build_train_test( |
| modalities=mods, |
| t_obs_sec=args.t_obs, |
| t_fut_sec=args.t_fut, |
| ) |
| print(f"train: {len(train)} samples | test: {len(test)} samples") |
|
|
| for name, ds in [("train", train), ("test", test)]: |
| counts = ds.class_counts() |
| print(f"\n[{name}] class counts:") |
| print(" verb_fine:", counts["verb_fine"].tolist()) |
| print(" verb_composite:", counts["verb_composite"].tolist()) |
| print(" noun (sum):", int(counts["noun"].sum()), |
| "nonzero:", int((counts["noun"] > 0).sum())) |
| print(" hand:", counts["hand"].tolist()) |
|
|
| print(f"\n[{name}] first {args.smoke_n} samples:") |
| for i in range(min(args.smoke_n, len(ds))): |
| x, y, meta = ds[i] |
| shape_str = " ".join(f"{m}:{tuple(x[m].shape)}" for m in x) |
| print(f" {i:3d} {meta['vol']}/{meta['scene']}#{meta['seg_idx']:3d} " |
| f"anchor={meta['anchor_sec']:.2f}s y={y} {shape_str}") |
|
|