PULSE-code / experiments /tasks /train_exp1.py
velvet-pine-22's picture
Upload folder using huggingface_hub
b4b2877 verified
#!/usr/bin/env python3
"""
Experiment 1: Daily Activity Scene Recognition
Train and evaluate models with different modality combinations and fusion strategies.
"""
import os
import sys
import json
import time
import random
import argparse
import numpy as np
import torch
import torch.nn as nn
from sklearn.metrics import (
accuracy_score, f1_score, confusion_matrix, classification_report
)
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from data.dataset import get_dataloaders, NUM_CLASSES, SCENE_LABELS
from nets.models import build_model
SCENE_NAMES = ['s1_office', 's2_package', 's3_kitchen', 's4_cleaning',
's5_table_set', 's6_luggage', 's7_coffee', 's8_clothes']
def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
def apply_augmentation(x, mask, noise_std=0.1, time_mask_ratio=0.1):
"""Apply data augmentation on GPU tensors: Gaussian noise + time masking."""
if noise_std > 0:
noise = torch.randn_like(x) * noise_std
x = x + noise * mask.unsqueeze(-1).float()
if time_mask_ratio > 0:
B, T, C = x.shape
mask_len = int(T * time_mask_ratio)
if mask_len > 0:
for i in range(B):
valid_len = mask[i].sum().int().item()
if valid_len > mask_len:
start = random.randint(0, valid_len - mask_len)
x[i, start:start + mask_len, :] = 0.0
return x
def _load_and_freeze_backbone(model, pretrained_path, freeze_idx, fusion_type):
"""Load pretrained SingleModel weights into a fusion model branch and freeze it."""
if fusion_type == 'early':
print("WARNING: Early fusion has a shared backbone — cannot freeze single modality. Skipping.")
return
pretrained_sd = torch.load(pretrained_path, weights_only=True)
# Map SingleModel keys -> fusion model keys
new_sd = {}
for k, v in pretrained_sd.items():
if k.startswith('backbone.'):
new_key = k.replace('backbone.', f'backbones.{freeze_idx}.')
new_sd[new_key] = v
elif k.startswith('classifier.') and fusion_type != 'attention':
new_key = k.replace('classifier.', f'classifiers.{freeze_idx}.')
new_sd[new_key] = v
model_sd = model.state_dict()
model_sd.update(new_sd)
model.load_state_dict(model_sd)
print(f" Loaded {len(new_sd)} tensors from {pretrained_path} into branch {freeze_idx}")
# Freeze backbone (and classifier for non-attention models)
for name, param in model.named_parameters():
if name.startswith(f'backbones.{freeze_idx}.'):
param.requires_grad = False
if fusion_type != 'attention' and name.startswith(f'classifiers.{freeze_idx}.'):
param.requires_grad = False
frozen_count = sum(not p.requires_grad for p in model.parameters())
total_count = sum(1 for _ in model.parameters())
print(f" Frozen: {frozen_count}/{total_count} parameter tensors")
def train_one_epoch(model, loader, criterion, optimizer, device,
augment=False, noise_std=0.1, time_mask_ratio=0.1):
model.train()
total_loss = 0
all_preds, all_labels = [], []
for x, y, mask, lengths in loader:
x, y, mask = x.to(device), y.to(device), mask.to(device)
if augment:
x = apply_augmentation(x, mask, noise_std, time_mask_ratio)
optimizer.zero_grad()
logits = model(x, mask)
loss = criterion(logits, y)
loss.backward()
trainable_params = [p for p in model.parameters() if p.requires_grad]
torch.nn.utils.clip_grad_norm_(trainable_params, 1.0)
optimizer.step()
total_loss += loss.item() * y.size(0)
all_preds.extend(logits.argmax(dim=1).cpu().numpy())
all_labels.extend(y.cpu().numpy())
n = len(all_labels)
return total_loss / n, accuracy_score(all_labels, all_preds)
@torch.no_grad()
def evaluate(model, loader, criterion, device):
model.eval()
total_loss = 0
all_preds, all_labels = [], []
for x, y, mask, lengths in loader:
x, y, mask = x.to(device), y.to(device), mask.to(device)
logits = model(x, mask)
loss = criterion(logits, y)
total_loss += loss.item() * y.size(0)
all_preds.extend(logits.argmax(dim=1).cpu().numpy())
all_labels.extend(y.cpu().numpy())
n = len(all_labels)
acc = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)
cm = confusion_matrix(all_labels, all_preds, labels=list(range(NUM_CLASSES)))
return total_loss / n, acc, f1, cm, np.array(all_preds), np.array(all_labels)
def run_experiment(args):
set_seed(args.seed)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")
modalities = args.modalities.split(',')
print(f"\n{'='*60}")
print(f"Model: {args.model} | Modalities: {modalities} | Fusion: {args.fusion}")
print(f"{'='*60}")
# Load data
train_loader, val_loader, test_loader, info = get_dataloaders(
modalities, batch_size=args.batch_size, downsample=args.downsample
)
# If no val set, use test set for early stopping / model selection
if info['val_size'] == 0:
val_loader = test_loader
print(f"Train: {info['train_size']}, Val: (using test), Test: {info['test_size']}")
else:
print(f"Train: {info['train_size']}, Val: {info['val_size']}, Test: {info['test_size']}")
print(f"Feature dim: {info['feat_dim']}, Modality dims: {info['modality_dims']}")
# Build model
late_agg = getattr(args, 'late_agg', 'mean')
model = build_model(
args.model, args.fusion, info['feat_dim'],
info['modality_dims'], info['num_classes'],
hidden_dim=args.hidden_dim, proj_dim=args.proj_dim,
late_agg=late_agg,
).to(device)
# Load pretrained backbone and freeze if specified
if args.pretrained_backbone and args.freeze_backbone_idx is not None:
_load_and_freeze_backbone(model, args.pretrained_backbone,
args.freeze_backbone_idx, args.fusion)
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Parameters: {trainable_params:,} trainable / {total_params:,} total")
# Loss with class weights + label smoothing
class_weights = info['class_weights'].to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights,
label_smoothing=args.label_smoothing)
optimizer = torch.optim.Adam(
filter(lambda p: p.requires_grad, model.parameters()),
lr=args.lr, weight_decay=args.weight_decay,
)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6
)
# Training loop with early stopping
best_val_loss = float('inf')
best_val_f1 = 0
best_epoch = 0
patience_counter = 0
# Output directory
mod_str = '-'.join(modalities)
exp_name = f"{args.model}_{mod_str}_{args.fusion}"
if args.tag:
exp_name += f"_{args.tag}"
out_dir = os.path.join(args.output_dir, exp_name)
os.makedirs(out_dir, exist_ok=True)
for epoch in range(1, args.epochs + 1):
t0 = time.time()
train_loss, train_acc = train_one_epoch(
model, train_loader, criterion, optimizer, device,
augment=args.augment, noise_std=args.noise_std,
time_mask_ratio=args.time_mask_ratio,
)
val_loss, val_acc, val_f1, _, _, _ = evaluate(model, val_loader, criterion, device)
scheduler.step(val_loss)
elapsed = time.time() - t0
lr = optimizer.param_groups[0]['lr']
print(f" Epoch {epoch:3d} | "
f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} | "
f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f} F1: {val_f1:.4f} | "
f"LR: {lr:.2e} | {elapsed:.1f}s")
if val_loss < best_val_loss:
best_val_loss = val_loss
best_val_f1 = val_f1
best_epoch = epoch
patience_counter = 0
torch.save(model.state_dict(), os.path.join(out_dir, 'model_best.pt'))
else:
patience_counter += 1
if patience_counter >= args.patience:
print(f" Early stopping at epoch {epoch} (best: {best_epoch})")
break
# Test evaluation
print(f"\nBest epoch: {best_epoch} (val_loss: {best_val_loss:.4f}, val_f1: {best_val_f1:.4f})")
model.load_state_dict(torch.load(os.path.join(out_dir, 'model_best.pt'), weights_only=True))
test_loss, test_acc, test_f1, test_cm, test_preds, test_labels = evaluate(
model, test_loader, criterion, device
)
# Per-class accuracy
per_class_acc = {}
for i in range(NUM_CLASSES):
mask = test_labels == i
if mask.sum() > 0:
per_class_acc[SCENE_NAMES[i]] = float((test_preds[mask] == i).mean())
else:
per_class_acc[SCENE_NAMES[i]] = None
print(f"\n--- Test Results ---")
print(f" Accuracy: {test_acc:.4f}")
print(f" Macro F1: {test_f1:.4f}")
print(f" Per-class: {per_class_acc}")
print(f" Confusion Matrix:\n{test_cm}")
# Save results
results = {
'experiment': exp_name,
'model': args.model,
'modalities': modalities,
'fusion': args.fusion,
'best_epoch': best_epoch,
'best_val_loss': float(best_val_loss),
'best_val_f1': float(best_val_f1),
'test_accuracy': float(test_acc),
'test_macro_f1': float(test_f1),
'test_per_class_accuracy': per_class_acc,
'confusion_matrix': test_cm.tolist(),
'n_params': trainable_params,
'n_params_total': total_params,
'train_size': info['train_size'],
'val_size': info['val_size'],
'test_size': info['test_size'],
'feat_dim': info['feat_dim'],
'args': vars(args),
}
with open(os.path.join(out_dir, 'results.json'), 'w') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
np.save(os.path.join(out_dir, 'confusion_matrix.npy'), test_cm)
print(f" Results saved to {out_dir}")
return results
def run_all_experiments(args):
"""Run all modality ablation + fusion experiments."""
modality_combos = [
'mocap',
'emg',
'eyetrack',
'imu',
'pressure',
'mocap,emg,eyetrack',
'mocap,emg,eyetrack,imu',
'mocap,emg,eyetrack,pressure',
'mocap,emg,eyetrack,imu,pressure',
]
models = ['cnn', 'lstm', 'transformer']
all_results = []
# Part 1: Modality ablation with all backbone models
if not args.skip_ablation:
for mod_combo in modality_combos:
for model_name in models:
args.modalities = mod_combo
args.model = model_name
args.fusion = 'early'
try:
result = run_experiment(args)
all_results.append(result)
except Exception as e:
print(f"FAILED: {model_name} / {mod_combo} / early: {e}")
all_results.append({
'experiment': f"{model_name}_{mod_combo.replace(',', '-')}_early",
'error': str(e),
})
# Part 2: Fusion ablation with 3-core modalities and best backbone
if args.skip_ablation:
best_backbone = args.best_backbone
print(f"\nSkipping ablation. Using specified backbone: {best_backbone}")
else:
# Find best backbone from 3-core early fusion results
core_results = [r for r in all_results
if r.get('modalities') == ['mocap', 'emg', 'eyetrack']
and 'error' not in r]
if core_results:
best_backbone = max(core_results, key=lambda r: r['test_macro_f1'])['model']
else:
best_backbone = 'cnn'
print(f"\nBest backbone for fusion experiments: {best_backbone}")
fusion_methods = ['late', 'attention', 'weighted_late', 'gated_late', 'stacking', 'product', 'moe']
for fusion in fusion_methods:
args.modalities = 'mocap,emg,eyetrack'
args.model = best_backbone
args.fusion = fusion
try:
result = run_experiment(args)
all_results.append(result)
except Exception as e:
print(f"FAILED: {best_backbone} / 3-core / {fusion}: {e}")
all_results.append({
'experiment': f"{best_backbone}_mocap-emg-eyetrack_{fusion}",
'error': str(e),
})
# Also run fusion with all 5 modalities
for fusion in fusion_methods:
args.modalities = 'mocap,emg,eyetrack,imu,pressure'
args.model = best_backbone
args.fusion = fusion
try:
result = run_experiment(args)
all_results.append(result)
except Exception as e:
print(f"FAILED: {best_backbone} / all / {fusion}: {e}")
all_results.append({
'experiment': f"{best_backbone}_all_{fusion}",
'error': str(e),
})
# Save summary
summary_path = os.path.join(args.output_dir, 'exp1_summary.json')
with open(summary_path, 'w') as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
print(f"\n{'='*60}")
print(f"All experiments completed! Summary saved to {summary_path}")
# Print results table
print(f"\n{'Model':<15} {'Modalities':<40} {'Fusion':<10} {'Acc':<8} {'F1':<8}")
print('-' * 85)
for r in all_results:
if 'error' in r:
print(f"{r['experiment']:<65} FAILED: {r['error'][:20]}")
else:
mod_str = ','.join(r['modalities'])
print(f"{r['model']:<15} {mod_str:<40} {r['fusion']:<10} "
f"{r['test_accuracy']:.4f} {r['test_macro_f1']:.4f}")
def main():
parser = argparse.ArgumentParser(description='Exp1: Scene Recognition')
parser.add_argument('--model', type=str, default='cnn',
choices=['cnn', 'lstm', 'transformer', 'tinyhar',
'deepconvlstm', 'inceptiontime'])
parser.add_argument('--modalities', type=str, default='mocap,emg,eyetrack',
help='Comma-separated modality names')
parser.add_argument('--fusion', type=str, default='early',
choices=['early', 'late', 'attention',
'weighted_late', 'gated_late', 'stacking',
'product', 'moe', 'feat_concat'])
parser.add_argument('--epochs', type=int, default=100)
parser.add_argument('--batch_size', type=int, default=16)
parser.add_argument('--lr', type=float, default=1e-3)
parser.add_argument('--weight_decay', type=float, default=1e-3)
parser.add_argument('--hidden_dim', type=int, default=32)
parser.add_argument('--proj_dim', type=int, default=0,
help='Per-modality projection dim (0 = no projection)')
parser.add_argument('--downsample', type=int, default=5,
help='Downsample factor from 100Hz (5 = 20Hz)')
parser.add_argument('--patience', type=int, default=15)
parser.add_argument('--augment', action='store_true',
help='Enable data augmentation (noise + time mask)')
parser.add_argument('--noise_std', type=float, default=0.1,
help='Gaussian noise std for augmentation')
parser.add_argument('--time_mask_ratio', type=float, default=0.1,
help='Fraction of timesteps to mask')
parser.add_argument('--label_smoothing', type=float, default=0.0,
help='Label smoothing for CrossEntropyLoss')
parser.add_argument('--pretrained_backbone', type=str, default=None,
help='Path to pretrained SingleModel weights')
parser.add_argument('--freeze_backbone_idx', type=int, default=None,
help='Index of modality branch to freeze')
parser.add_argument('--late_agg', type=str, default='mean',
choices=['mean', 'confidence', 'learned'],
help='Late fusion aggregation: mean/confidence/learned')
parser.add_argument('--tag', type=str, default='',
help='Experiment name suffix for output dir')
parser.add_argument('--seed', type=int, default=42)
parser.add_argument('--output_dir', type=str,
default='${PULSE_ROOT}/results/exp1')
parser.add_argument('--run_all', action='store_true',
help='Run all modality ablation + fusion experiments')
parser.add_argument('--skip_ablation', action='store_true',
help='Skip Part 1 (modality ablation), run fusion experiments only with --best_backbone')
parser.add_argument('--best_backbone', type=str, default='transformer',
choices=['cnn', 'lstm', 'transformer', 'tinyhar',
'deepconvlstm', 'inceptiontime'],
help='Backbone to use when --skip_ablation (default: transformer)')
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
if args.run_all:
run_all_experiments(args)
else:
run_experiment(args)
if __name__ == '__main__':
main()