| |
| """ |
| Experiment 2: Temporal Action Segmentation |
| Per-frame action classification using multi-modal time series. |
| Uses annotations from annotations_by_scene/ to create frame-level labels. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import re |
| import random |
| import argparse |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from sklearn.metrics import f1_score, accuracy_score |
| from torch.utils.data import Dataset, DataLoader |
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| from data.dataset import ( |
| DATASET_DIR, MODALITY_FILES, SKIP_COLS, SKIP_COL_SUFFIXES, |
| TRAIN_VOLS, VAL_VOLS, TEST_VOLS, load_modality_array, get_modality_filepath |
| ) |
|
|
| ANNOTATION_DIR = "${PULSE_ROOT}/annotations_v2" |
| ANNOTATION_DIR_FALLBACK = "${PULSE_ROOT}/annotations_by_scene" |
| ANNOTATION_DIR_COARSE = "${PULSE_ROOT}/annotations_coarse" |
|
|
| |
| FINE_ACTION_LABELS = { |
| 'Idle': 0, |
| 'Grasp': 1, |
| 'Place': 2, |
| 'Pour': 3, |
| 'Wipe': 4, |
| 'Fold': 5, |
| 'OpenClose': 6, |
| 'Stir': 7, |
| 'TearCut': 8, |
| 'Arrange': 9, |
| 'Transport': 10, |
| } |
|
|
| |
| COARSE_ACTION_LABELS = { |
| 'Idle': 0, |
| 'Manipulate': 1, |
| 'CleanOrganize': 2, |
| 'Transfer': 3, |
| 'Assemble': 4, |
| 'FoodPrep': 5, |
| } |
|
|
| |
| ACTION_LABELS = FINE_ACTION_LABELS |
| NUM_ACTIONS = len(ACTION_LABELS) |
| ACTION_NAMES = {v: k for k, v in ACTION_LABELS.items()} |
|
|
| WINDOW_SIZE = 512 |
| WINDOW_STRIDE = 256 |
|
|
|
|
| def set_seed(seed): |
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
|
|
|
|
| def classify_action(task_text): |
| """Map Chinese task description to coarse action category.""" |
| t = task_text |
| if any(k in t for k in ['抓取', '拿起', '拿取', '取出', '掀开', '取下', '搬起']): |
| return 'Grasp' |
| elif any(k in t for k in ['放置', '放回', '放入', '放下', '放到', '释放', '移开', '松开']): |
| return 'Place' |
| elif any(k in t for k in ['倾倒', '倒入', '倒出', '注水', '倒水', '倒置', '倾斜', '转移']): |
| return 'Pour' |
| elif any(k in t for k in ['擦拭', '抹布', '清洁', '擦干', '擦除']): |
| return 'Wipe' |
| elif any(k in t for k in ['折叠', '对折', '折好', '卷', '缠绕']): |
| return 'Fold' |
| elif any(k in t for k in ['打开', '关闭', '开启', '合上', '旋开', '旋紧', '拉链', |
| '拧开', '拧紧', '盖上', '拔开']): |
| return 'OpenClose' |
| elif any(k in t for k in ['搅拌', '搅动']): |
| return 'Stir' |
| elif any(k in t for k in ['撕', '剪', '切', '粘贴', '胶带', '封箱']): |
| return 'TearCut' |
| elif any(k in t for k in ['整理', '调整', '摆放', '对齐', '铺', '展开', '抚平', |
| '理顺', '排列', '码放', '微调', '压实']): |
| return 'Arrange' |
| elif any(k in t for k in ['搬运', '移动', '移至', '运送', '搬到', '提起', '抬起', |
| '携带', '移回', '将菜锅移']): |
| return 'Transport' |
| else: |
| return 'Idle' |
|
|
|
|
| def parse_timestamp(ts_str): |
| """Parse 'MM:SS' to seconds.""" |
| parts = ts_str.strip().split(':') |
| if len(parts) == 2: |
| return int(parts[0]) * 60 + int(parts[1]) |
| return 0 |
|
|
|
|
| def load_annotations(vol, scenario, n_frames, sampling_rate=100, use_coarse=False): |
| """Load annotations and create per-frame labels. |
| |
| use_coarse=False: fine-grained (11 classes) from annotations_v2 |
| use_coarse=True: coarse-grained (6 classes) from annotations_coarse |
| """ |
| if use_coarse: |
| ann_path = os.path.join(ANNOTATION_DIR_COARSE, vol, f"{scenario}.json") |
| if not os.path.exists(ann_path): |
| return None |
| with open(ann_path) as f: |
| data = json.load(f) |
| labels = np.zeros(n_frames, dtype=np.int64) |
| for seg in data.get('coarse_segments', []): |
| ts = seg['timestamp'] |
| match = re.match(r'(\d+:\d+)\s*-\s*(\d+:\d+)', ts) |
| if not match: |
| continue |
| start_sec = parse_timestamp(match.group(1)) |
| end_sec = parse_timestamp(match.group(2)) |
| start_frame = min(int(start_sec * sampling_rate), n_frames) |
| end_frame = min(int(end_sec * sampling_rate), n_frames) |
| action = seg.get('coarse_action', 'Idle') |
| if action in ACTION_LABELS: |
| labels[start_frame:end_frame] = ACTION_LABELS[action] |
| return labels |
| else: |
| |
| ann_path = os.path.join(ANNOTATION_DIR, vol, f"{scenario}.json") |
| if not os.path.exists(ann_path): |
| ann_path = os.path.join(ANNOTATION_DIR_FALLBACK, vol, f"{scenario}.json") |
| if not os.path.exists(ann_path): |
| return None |
| with open(ann_path) as f: |
| data = json.load(f) |
| labels = np.zeros(n_frames, dtype=np.int64) |
| for seg in data['segments']: |
| ts = seg['timestamp'] |
| match = re.match(r'(\d+:\d+)\s*-\s*(\d+:\d+)', ts) |
| if not match: |
| continue |
| start_sec = parse_timestamp(match.group(1)) |
| end_sec = parse_timestamp(match.group(2)) |
| start_frame = min(int(start_sec * sampling_rate), n_frames) |
| end_frame = min(int(end_sec * sampling_rate), n_frames) |
| if 'action_label' in seg: |
| action = seg['action_label'] |
| else: |
| action = classify_action(seg['task']) |
| if action in ACTION_LABELS: |
| labels[start_frame:end_frame] = ACTION_LABELS[action] |
| return labels |
|
|
|
|
| class ActionSegmentationDataset(Dataset): |
| """Sliding window dataset for action segmentation.""" |
|
|
| def __init__(self, volunteers, modalities, window_size=WINDOW_SIZE, |
| stride=WINDOW_STRIDE, downsample=2, stats=None, use_coarse=False): |
| self.windows = [] |
| self._feat_dim = None |
| all_features = [] |
|
|
| for vol in volunteers: |
| vol_dir = os.path.join(DATASET_DIR, vol) |
| if not os.path.isdir(vol_dir): |
| continue |
| for scenario in sorted(os.listdir(vol_dir)): |
| scenario_dir = os.path.join(vol_dir, scenario) |
| if not os.path.isdir(scenario_dir): |
| continue |
| meta_path = os.path.join(scenario_dir, 'alignment_metadata.json') |
| if not os.path.exists(meta_path): |
| continue |
| with open(meta_path) as f: |
| meta = json.load(f) |
|
|
| available = set(meta['modalities']) |
| |
| if os.path.exists(os.path.join(scenario_dir, 'video_features_100hz.npy')): |
| available.add('video') |
| if os.path.exists(os.path.join(scenario_dir, 'video_features_videomae_100hz.npy')): |
| available.add('videomae') |
| if not set(modalities).issubset(available): |
| continue |
|
|
| |
| parts = [] |
| skip = False |
| for mod in modalities: |
| filepath = get_modality_filepath(scenario_dir, mod, vol, scenario) |
| arr = load_modality_array(filepath, mod) |
| if arr is None: |
| skip = True |
| break |
| parts.append(arr) |
| if skip: |
| continue |
|
|
| min_len = min(p.shape[0] for p in parts) |
| features = np.concatenate([p[:min_len] for p in parts], axis=1) |
|
|
| |
| labels = load_annotations(vol, scenario, min_len, use_coarse=use_coarse) |
| if labels is None: |
| continue |
|
|
| |
| features = features[::downsample] |
| labels = labels[::downsample] |
|
|
| if self._feat_dim is None: |
| self._feat_dim = features.shape[1] |
|
|
| all_features.append(features) |
|
|
| |
| T = features.shape[0] |
| for start in range(0, T - window_size + 1, stride): |
| end = start + window_size |
| self.windows.append((features[start:end], labels[start:end])) |
|
|
| |
| if stats is not None: |
| self.mean, self.std = stats |
| else: |
| if all_features: |
| all_data = np.concatenate(all_features, axis=0).astype(np.float64) |
| self.mean = np.mean(all_data, axis=0, keepdims=True) |
| self.std = np.std(all_data, axis=0, keepdims=True) |
| self.std[self.std < 1e-8] = 1.0 |
| else: |
| d = self._feat_dim or 1 |
| self.mean = np.zeros((1, d), dtype=np.float64) |
| self.std = np.ones((1, d), dtype=np.float64) |
|
|
| self.windows = [ |
| (((w[0].astype(np.float64) - self.mean) / self.std).astype(np.float32), w[1]) |
| for w in self.windows |
| ] |
|
|
| |
| if self.windows: |
| all_labels = np.concatenate([w[1] for w in self.windows]) |
| print(f" Windows: {len(self.windows)}, feat_dim: {self._feat_dim}", flush=True) |
| for i in range(NUM_ACTIONS): |
| count = (all_labels == i).sum() |
| if count > 0: |
| print(f" {ACTION_NAMES[i]}: {count} frames ({100*count/len(all_labels):.1f}%)", |
| flush=True) |
|
|
| def get_stats(self): |
| return (self.mean, self.std) |
|
|
| @property |
| def feat_dim(self): |
| return self._feat_dim |
|
|
| def get_class_weights(self): |
| all_labels = np.concatenate([w[1] for w in self.windows]) |
| counts = np.bincount(all_labels, minlength=NUM_ACTIONS).astype(np.float32) |
| counts[counts == 0] = 1.0 |
| weights = 1.0 / counts |
| weights = weights / weights.sum() * NUM_ACTIONS |
| return torch.FloatTensor(weights) |
|
|
| def __len__(self): |
| return len(self.windows) |
|
|
| def __getitem__(self, idx): |
| features, labels = self.windows[idx] |
| return torch.from_numpy(features), torch.from_numpy(labels) |
|
|
|
|
| |
| |
| |
|
|
| class DilatedResBlock(nn.Module): |
| def __init__(self, channels, dilation): |
| super().__init__() |
| self.conv1 = nn.Conv1d(channels, channels, 3, padding=dilation, dilation=dilation) |
| self.conv2 = nn.Conv1d(channels, channels, 1) |
| self.bn1 = nn.BatchNorm1d(channels) |
| self.bn2 = nn.BatchNorm1d(channels) |
| self.dropout = nn.Dropout(0.1) |
|
|
| def forward(self, x): |
| residual = x |
| x = self.dropout(torch.relu(self.bn1(self.conv1(x)))) |
| x = self.dropout(torch.relu(self.bn2(self.conv2(x)))) |
| return x + residual |
|
|
|
|
| class TCNStage(nn.Module): |
| """Single stage of MS-TCN.""" |
| def __init__(self, in_channels, hidden_channels, num_classes, num_layers=8): |
| super().__init__() |
| self.input_conv = nn.Conv1d(in_channels, hidden_channels, 1) |
| self.layers = nn.ModuleList([ |
| DilatedResBlock(hidden_channels, 2 ** i) for i in range(num_layers) |
| ]) |
| self.output_conv = nn.Conv1d(hidden_channels, num_classes, 1) |
|
|
| def forward(self, x): |
| x = self.input_conv(x) |
| for layer in self.layers: |
| x = layer(x) |
| return self.output_conv(x) |
|
|
|
|
| class MSTCN(nn.Module): |
| """Multi-Stage TCN (MS-TCN++) for action segmentation.""" |
| def __init__(self, input_dim, num_classes, hidden_dim=64, num_stages=2, num_layers=8): |
| super().__init__() |
| self.stages = nn.ModuleList() |
| self.stages.append(TCNStage(input_dim, hidden_dim, num_classes, num_layers)) |
| for _ in range(num_stages - 1): |
| self.stages.append(TCNStage(num_classes, hidden_dim, num_classes, num_layers)) |
|
|
| def forward(self, x): |
| |
| x = x.permute(0, 2, 1) |
| outputs = [] |
| for stage in self.stages: |
| x = stage(x) |
| outputs.append(x.permute(0, 2, 1)) |
| return outputs |
|
|
|
|
| class SimpleTCN(nn.Module): |
| """Single-stage TCN baseline.""" |
| def __init__(self, input_dim, num_classes, hidden_dim=64, num_layers=8): |
| super().__init__() |
| self.stage = TCNStage(input_dim, hidden_dim, num_classes, num_layers) |
|
|
| def forward(self, x): |
| x = x.permute(0, 2, 1) |
| out = self.stage(x) |
| return [out.permute(0, 2, 1)] |
|
|
|
|
| class BiLSTMSeg(nn.Module): |
| """Bi-LSTM for action segmentation.""" |
| def __init__(self, input_dim, num_classes, hidden_dim=64): |
| super().__init__() |
| self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=2, |
| batch_first=True, bidirectional=True, dropout=0.2) |
| self.head = nn.Linear(hidden_dim * 2, num_classes) |
|
|
| def forward(self, x): |
| out, _ = self.lstm(x) |
| return [self.head(out)] |
|
|
|
|
| def build_seg_model(name, input_dim, num_classes, hidden_dim=64): |
| if name == 'mstcn': |
| return MSTCN(input_dim, num_classes, hidden_dim, num_stages=2) |
| elif name == 'tcn': |
| return SimpleTCN(input_dim, num_classes, hidden_dim) |
| elif name == 'lstm': |
| return BiLSTMSeg(input_dim, num_classes, hidden_dim) |
| elif name == 'asformer': |
| from experiments.published_baselines import ASFormer |
| return ASFormer(input_dim, num_classes, hidden_dim, |
| num_layers=5, num_decoders=3) |
| elif name == 'mstcnpp': |
| from experiments.published_models import MSTCNPP |
| return MSTCNPP(input_dim, num_classes, hidden_dim, num_stages=4, num_layers=10) |
| elif name == 'diffact': |
| from experiments.published_models import DiffAct |
| return DiffAct(input_dim, num_classes, hidden_dim, |
| num_encoder_layers=6, num_denoise_layers=6, |
| num_diffusion_steps=10) |
| else: |
| raise ValueError(f"Unknown model: {name}") |
|
|
|
|
| |
| |
| |
|
|
| def compute_segmental_f1(pred, gt, iou_threshold=0.5): |
| """Compute segmental F1 score at a given IoU threshold.""" |
| def get_segments(seq): |
| segments = [] |
| if len(seq) == 0: |
| return segments |
| start = 0 |
| for i in range(1, len(seq)): |
| if seq[i] != seq[i - 1]: |
| segments.append((seq[start], start, i)) |
| start = i |
| segments.append((seq[start], start, len(seq))) |
| return segments |
|
|
| pred_segs = get_segments(pred) |
| gt_segs = get_segments(gt) |
|
|
| tp = 0 |
| matched_gt = set() |
| for p_label, p_start, p_end in pred_segs: |
| if p_label == 0: |
| continue |
| best_iou = 0 |
| best_idx = -1 |
| for idx, (g_label, g_start, g_end) in enumerate(gt_segs): |
| if g_label != p_label or idx in matched_gt: |
| continue |
| inter_start = max(p_start, g_start) |
| inter_end = min(p_end, g_end) |
| inter = max(0, inter_end - inter_start) |
| union = (p_end - p_start) + (g_end - g_start) - inter |
| iou = inter / union if union > 0 else 0 |
| if iou > best_iou: |
| best_iou = iou |
| best_idx = idx |
| if best_iou >= iou_threshold: |
| tp += 1 |
| matched_gt.add(best_idx) |
|
|
| pred_count = sum(1 for l, _, _ in pred_segs if l != 0) |
| gt_count = sum(1 for l, _, _ in gt_segs if l != 0) |
| precision = tp / pred_count if pred_count > 0 else 0 |
| recall = tp / gt_count if gt_count > 0 else 0 |
| f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0 |
| return f1 |
|
|
|
|
| |
| |
| |
|
|
| def train_one_epoch(model, loader, criterion, optimizer, device): |
| model.train() |
| total_loss = 0 |
| n = 0 |
| for x, y in loader: |
| x, y = x.to(device), y.to(device) |
| optimizer.zero_grad() |
| outputs = model(x) |
| loss = sum(criterion(out.reshape(-1, out.shape[-1]), y.reshape(-1)) for out in outputs) |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| optimizer.step() |
| total_loss += loss.item() * x.size(0) |
| n += x.size(0) |
| return total_loss / n |
|
|
|
|
| @torch.no_grad() |
| def evaluate(model, loader, criterion, device): |
| model.eval() |
| total_loss = 0 |
| n = 0 |
| all_preds, all_labels = [], [] |
|
|
| for x, y in loader: |
| x, y = x.to(device), y.to(device) |
| outputs = model(x) |
| loss = criterion(outputs[-1].reshape(-1, outputs[-1].shape[-1]), y.reshape(-1)) |
| total_loss += loss.item() * x.size(0) |
| n += x.size(0) |
|
|
| pred = outputs[-1].argmax(dim=-1).cpu().numpy() |
| all_preds.append(pred.flatten()) |
| all_labels.append(y.cpu().numpy().flatten()) |
|
|
| avg_loss = total_loss / n |
| preds = np.concatenate(all_preds) |
| labels = np.concatenate(all_labels) |
|
|
| frame_acc = accuracy_score(labels, preds) |
| frame_f1 = f1_score(labels, preds, average='macro', zero_division=0) |
|
|
| |
| seg_f1_10 = compute_segmental_f1(preds, labels, 0.1) |
| seg_f1_25 = compute_segmental_f1(preds, labels, 0.25) |
| seg_f1_50 = compute_segmental_f1(preds, labels, 0.5) |
|
|
| metrics = { |
| 'loss': avg_loss, |
| 'frame_acc': frame_acc, |
| 'frame_f1': frame_f1, |
| 'seg_f1@10': seg_f1_10, |
| 'seg_f1@25': seg_f1_25, |
| 'seg_f1@50': seg_f1_50, |
| } |
| return metrics |
|
|
|
|
| def run_experiment(args): |
| global ACTION_LABELS, NUM_ACTIONS, ACTION_NAMES |
|
|
| set_seed(args.seed) |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| modalities = args.modalities.split(',') |
| use_coarse = getattr(args, 'coarse_labels', False) |
|
|
| |
| if use_coarse: |
| ACTION_LABELS = COARSE_ACTION_LABELS |
| NUM_ACTIONS = len(ACTION_LABELS) |
| ACTION_NAMES = {v: k for k, v in ACTION_LABELS.items()} |
| print(f"\n{'='*60}", flush=True) |
| print(f"Exp2 Action Seg (COARSE 6-class) | Model: {args.model} | Mods: {modalities}", flush=True) |
| else: |
| ACTION_LABELS = FINE_ACTION_LABELS |
| NUM_ACTIONS = len(ACTION_LABELS) |
| ACTION_NAMES = {v: k for k, v in ACTION_LABELS.items()} |
| print(f"\n{'='*60}", flush=True) |
| print(f"Exp2 Action Seg | Model: {args.model} | Mods: {modalities}", flush=True) |
| print(f"{'='*60}", flush=True) |
|
|
| train_ds = ActionSegmentationDataset(TRAIN_VOLS, modalities, downsample=args.downsample, use_coarse=use_coarse) |
| stats = train_ds.get_stats() |
| val_ds = ActionSegmentationDataset(VAL_VOLS, modalities, downsample=args.downsample, stats=stats, use_coarse=use_coarse) |
| test_ds = ActionSegmentationDataset(TEST_VOLS, modalities, downsample=args.downsample, stats=stats, use_coarse=use_coarse) |
|
|
| if len(train_ds) == 0: |
| print("No training data!") |
| return None |
|
|
| train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True) |
| test_loader = DataLoader(test_ds, batch_size=args.batch_size, shuffle=False) |
| |
| if len(val_ds) > 0: |
| val_loader = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False) |
| else: |
| val_loader = test_loader |
| print(" No val data, using test set for early stopping.", flush=True) |
|
|
| model = build_seg_model(args.model, train_ds.feat_dim, NUM_ACTIONS, args.hidden_dim).to(device) |
| n_params = sum(p.numel() for p in model.parameters() if p.requires_grad) |
| print(f"Params: {n_params:,}", flush=True) |
|
|
| class_weights = train_ds.get_class_weights().to(device) |
| criterion = nn.CrossEntropyLoss(weight=class_weights) |
| optimizer = torch.optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.weight_decay) |
| scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=7, factor=0.5) |
|
|
| mod_str = '-'.join(modalities) |
| exp_name = f"exp2_{args.model}_{mod_str}_s{args.seed}" |
| out_dir = os.path.join(args.output_dir, exp_name) |
| os.makedirs(out_dir, exist_ok=True) |
|
|
| best_val_f1 = 0 |
| best_epoch = 0 |
| patience_counter = 0 |
|
|
| for epoch in range(1, args.epochs + 1): |
| t0 = time.time() |
| train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device) |
| val_metrics = evaluate(model, val_loader, criterion, device) |
| scheduler.step(val_metrics['loss']) |
| elapsed = time.time() - t0 |
|
|
| print(f" Epoch {epoch:3d} | Train: {train_loss:.4f} | " |
| f"Val: acc={val_metrics['frame_acc']:.4f} f1={val_metrics['frame_f1']:.4f} " |
| f"seg@50={val_metrics['seg_f1@50']:.4f} | {elapsed:.1f}s", flush=True) |
|
|
| if val_metrics['frame_f1'] > best_val_f1: |
| best_val_f1 = val_metrics['frame_f1'] |
| best_epoch = epoch |
| patience_counter = 0 |
| torch.save(model.state_dict(), os.path.join(out_dir, 'model_best.pt')) |
| else: |
| patience_counter += 1 |
|
|
| if patience_counter >= args.patience: |
| print(f" Early stopping at epoch {epoch}", flush=True) |
| break |
|
|
| |
| model.load_state_dict(torch.load(os.path.join(out_dir, 'model_best.pt'), weights_only=True)) |
| test_metrics = evaluate(model, test_loader, criterion, device) |
|
|
| print(f"\n--- Test Results (epoch {best_epoch}) ---", flush=True) |
| for k, v in test_metrics.items(): |
| print(f" {k}: {v:.4f}", flush=True) |
|
|
| results = { |
| 'experiment': exp_name, |
| 'model': args.model, |
| 'modalities': modalities, |
| 'best_epoch': best_epoch, |
| 'test_metrics': {k: float(v) for k, v in test_metrics.items()}, |
| 'n_params': n_params, |
| 'train_windows': len(train_ds), |
| 'args': vars(args), |
| } |
| with open(os.path.join(out_dir, 'results.json'), 'w') as f: |
| json.dump(results, f, indent=2) |
| return results |
|
|
|
|
| def run_all(args): |
| modality_combos = [ |
| 'mocap', |
| 'emg', |
| 'mocap,emg,eyetrack', |
| 'mocap,emg,eyetrack,imu', |
| 'mocap,emg,eyetrack,imu,pressure', |
| ] |
| models = ['tcn', 'mstcn', 'lstm'] |
| all_results = [] |
|
|
| for mod_combo in modality_combos: |
| for model_name in models: |
| args.modalities = mod_combo |
| args.model = model_name |
| try: |
| result = run_experiment(args) |
| if result: |
| all_results.append(result) |
| except Exception as e: |
| import traceback; traceback.print_exc() |
| print(f"FAILED: {model_name}/{mod_combo}: {e}", flush=True) |
| all_results.append({'experiment': f"exp2_{model_name}_{mod_combo}", 'error': str(e)}) |
|
|
| summary_path = os.path.join(args.output_dir, 'exp2_summary.json') |
| with open(summary_path, 'w') as f: |
| json.dump(all_results, f, indent=2) |
|
|
| print(f"\n{'='*60}", flush=True) |
| print(f"{'Model':<10} {'Modalities':<35} {'Acc':<8} {'F1':<8} {'Seg@50':<8}", flush=True) |
| print('-' * 70, flush=True) |
| for r in all_results: |
| if 'error' in r: |
| continue |
| m = r['test_metrics'] |
| mods = ','.join(r['modalities']) |
| print(f"{r['model']:<10} {mods:<35} {m['frame_acc']:.4f} {m['frame_f1']:.4f} {m['seg_f1@50']:.4f}", |
| flush=True) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Exp2: Action Segmentation') |
| parser.add_argument('--model', type=str, default='mstcn', |
| choices=['tcn', 'mstcn', 'lstm', 'asformer', 'mstcnpp', 'diffact']) |
| parser.add_argument('--modalities', type=str, default='mocap,emg,eyetrack') |
| parser.add_argument('--epochs', type=int, default=80) |
| parser.add_argument('--batch_size', type=int, default=16) |
| parser.add_argument('--lr', type=float, default=5e-4) |
| parser.add_argument('--weight_decay', type=float, default=1e-4) |
| parser.add_argument('--hidden_dim', type=int, default=64) |
| parser.add_argument('--downsample', type=int, default=2) |
| parser.add_argument('--patience', type=int, default=15) |
| parser.add_argument('--seed', type=int, default=42) |
| parser.add_argument('--output_dir', type=str, |
| default='${PULSE_ROOT}/results/exp2') |
| parser.add_argument('--run_all', action='store_true') |
| parser.add_argument('--coarse_labels', action='store_true', |
| help='Use coarse 6-class labels instead of fine 11-class') |
| args = parser.parse_args() |
| os.makedirs(args.output_dir, exist_ok=True) |
|
|
| if args.run_all: |
| run_all(args) |
| else: |
| run_experiment(args) |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|