EMMA-CVPR2026 / Baseline /run_additional_ablations.py
abanerj3's picture
Upload folder using huggingface_hub
b46439f verified
#!/usr/bin/env python3
"""
Run architecture ablation for Pendulum 90° and 150°
Appends results to architecture_ablation_results.csv
"""
import os
import csv
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from ncps.torch import LTC
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
Nloop = 0
# Import classes from architecture_ablation.py
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from architecture_ablation import (
cut_in_sequences,
PendulumData,
ArchitectureModel
)
class Custom_Pendulum_Loss_Generic(nn.Module):
"""Physics-informed loss for pendulum with configurable ground truth."""
def __init__(self, labels, logits, omega, alpha_nominal, beta_nominal):
super().__init__()
self.y_true = labels
self.y_pred = logits
self.y_omega = omega
self.alpha_nominal = alpha_nominal
self.beta_nominal = beta_nominal
def forward(self):
dev = self.y_pred.device
T, B, _ = self.y_pred.shape
maxChange = 95.0
getp = lambda k: self.y_pred[:,:,k]
gamma_nominal = 100.0
alpha = (1 + (0.5 - getp(0)) * maxChange / 100.0) * self.alpha_nominal
beta = (1 + (0.5 - getp(1)) * maxChange / 100.0) * self.beta_nominal
gamma = (1 + (0.5 - getp(2)) * maxChange / 100.0) * gamma_nominal
L = alpha
tau = beta
g = torch.tensor(9.81, device=dev)
thetaVal = self.y_true[:,:,0]
omegaVal = self.y_omega[:,:,0]
theta = thetaVal.clone().unsqueeze(2)
omega = omegaVal.clone().unsqueeze(2)
limitLoop = min(500, T)
tau_dt = 0.03
for i in range(1, limitLoop):
y1 = theta[:,:,i-1] + omega[:,:,i-1]*tau_dt
y0 = omega[:,:,i-1] + (-torch.mul(tau,omega[:,:,i-1]) - torch.mul(torch.div(g,L.clamp(min=0.0001)),torch.sin(theta[:,:,i-1])))*tau_dt
theta = torch.cat([theta, y1.unsqueeze(2)],dim=2)
omega = torch.cat([omega, y0.unsqueeze(2)],dim=2)
loss_Cal_theta = gamma * 0.01
loss_Cal_omega = gamma * 0.005
mse_loss = torch.abs(torch.sum(torch.square(self.y_true[:,:,0:limitLoop]-theta)/limitLoop, dim=2)-loss_Cal_theta) + \
torch.abs(torch.sum(torch.square(self.y_omega[:,:,0:limitLoop]-omega)/limitLoop, dim=2)-loss_Cal_omega)
param_penalty = 0.0
param_penalty += 10.0 * torch.mean(torch.relu(-alpha))
param_penalty += 10.0 * torch.mean(torch.relu(-beta))
param_penalty += 2.0 * torch.mean(torch.relu(alpha - 2.0))
param_penalty += 2.0 * torch.mean(torch.relu(beta - 1.0))
param_penalty += 1.0 * torch.mean(torch.relu(gamma - 500.0))
total_loss = mse_loss + 0.001 * param_penalty
self.L = L
self.tau = tau
return mse_loss
def run_ablation_for_dataset(dataset_name, data_dirs, alpha_nominal, beta_nominal):
"""
Run architecture ablation for a specific pendulum dataset.
Args:
dataset_name: Name of dataset (e.g., "90", "150")
data_dirs: List of data directories for each video
alpha_nominal: Ground truth length (m)
beta_nominal: Ground truth damping (1/s)
"""
import random
print(f"\n{'='*70}")
print(f"Running ablation for Pendulum {dataset_name}°")
print(f"Ground Truth: L={alpha_nominal}m, τ={beta_nominal} 1/s")
print(f"{'='*70}")
architectures = ["LTC", "LSTM", "GRU", "TRANSFORMER"]
results = []
for video_idx, data_dir in enumerate(data_dirs, start=1):
print(f"\n{'='*70}")
print(f"Processing Video {video_idx}/5: {data_dir}")
print(f"{'='*70}")
for arch_name in architectures:
print(f"\n Testing {arch_name}...")
# Set seeds
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(42)
# Load data
dataset = PendulumData(seq_len=16, data_dir=data_dir)
# Create model
model = ArchitectureModel(
model_type=arch_name.lower(),
model_size=64,
learning_rate=0.0003
).to(device)
optimizer = model.optimizer
scheduler = model.scheduler
# Training
num_epochs = 40
batch_size = 2
best_loss = float('inf')
start_time = time.time()
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
batch_count = 0
for batch_x, batch_y, batch_omega, _ in dataset.iterate_train(batch_size=batch_size):
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
batch_omega = batch_omega.to(device)
optimizer.zero_grad()
predicted_params = model(batch_x)
# Use generic loss with configurable ground truth
loss_fn = Custom_Pendulum_Loss_Generic(
batch_y, predicted_params, batch_omega,
alpha_nominal, beta_nominal
)
loss = loss_fn.forward()
# Loss might be a tensor, take mean
if loss.dim() > 0:
loss = loss.mean()
if torch.isnan(loss):
continue
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
epoch_loss += loss.item()
batch_count += 1
if batch_count > 0:
avg_loss = epoch_loss / batch_count
scheduler.step()
if avg_loss < best_loss:
best_loss = avg_loss
training_time = time.time() - start_time
# Evaluate
model.eval()
with torch.no_grad():
sample_batch = next(iter(dataset.iterate_train(batch_size=1)))
sample_x = sample_batch[0].to(device)
predicted_params = model(sample_x)
maxChange = 95.0
getp = lambda k: predicted_params[:,:,k].mean()
L_estimated = ((1 + (0.5 - getp(0)) * maxChange / 100.0) * alpha_nominal).item()
tau_estimated = ((1 + (0.5 - getp(1)) * maxChange / 100.0) * beta_nominal).item()
print(f" L={L_estimated:.6f}m, τ={tau_estimated:.6f} 1/s, Loss={best_loss:.2f}, Time={training_time:.2f}s")
results.append({
'dataset': f'{dataset_name}_v{video_idx}',
'architecture': arch_name,
'hidden_units': 64,
'video_number': video_idx,
'best_loss': best_loss,
'training_time_s': training_time,
'L_estimated': L_estimated,
'tau_estimated': tau_estimated
})
return results
def main():
"""Run ablations for 90° and 150° pendulums and append to CSV."""
# Check if data directories exist
base_dir = "Pendulum-EMMA"
# Pendulum 90° configuration
data_dirs_90 = [
os.path.join(base_dir, "90_v1", "data"),
os.path.join(base_dir, "90_v2", "data"),
os.path.join(base_dir, "90_v3", "data"),
os.path.join(base_dir, "90_v4", "data"),
os.path.join(base_dir, "90_v5", "data"),
]
# Pendulum 150° configuration
data_dirs_150 = [
os.path.join(base_dir, "150_v1", "data"),
os.path.join(base_dir, "150_v2", "data"),
os.path.join(base_dir, "150_v3", "data"),
os.path.join(base_dir, "150_v4", "data"),
os.path.join(base_dir, "150_v5", "data"),
]
# Check if directories exist
print("Checking data directories...")
for dirs, name in [(data_dirs_90, "90°"), (data_dirs_150, "150°")]:
for d in dirs:
if os.path.exists(d):
print(f" ✓ {d}")
else:
print(f" ✗ {d} NOT FOUND")
all_results = []
# Run ablation for 90°
if all(os.path.exists(d) for d in data_dirs_90):
results_90 = run_ablation_for_dataset("90", data_dirs_90, 0.90, 0.05)
all_results.extend(results_90)
else:
print("\n⚠️ Skipping 90° - data directories not found")
# Run ablation for 150°
if all(os.path.exists(d) for d in data_dirs_150):
results_150 = run_ablation_for_dataset("150", data_dirs_150, 1.50, 0.05)
all_results.extend(results_150)
else:
print("\n⚠️ Skipping 150° - data directories not found")
# Append to existing CSV
if all_results:
csv_file = 'architecture_ablation_results.csv'
print(f"\n{'='*70}")
print(f"Appending {len(all_results)} results to {csv_file}")
print(f"{'='*70}")
with open(csv_file, 'a', newline='') as f:
if all_results:
writer = csv.DictWriter(f, fieldnames=all_results[0].keys())
# Don't write header since we're appending
writer.writerows(all_results)
print(f"✅ Results appended successfully!")
else:
print("\n❌ No results to append - check data directories")
if __name__ == "__main__":
main()