Spaces:
Configuration error
Configuration error
| #!/usr/bin/env python3 | |
| """ | |
| Run architecture ablation for Pendulum 90° and 150° | |
| Appends results to architecture_ablation_results.csv | |
| """ | |
| import os | |
| import csv | |
| import time | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from ncps.torch import LTC | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Using device: {device}") | |
| Nloop = 0 | |
| # Import classes from architecture_ablation.py | |
| import sys | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| from architecture_ablation import ( | |
| cut_in_sequences, | |
| PendulumData, | |
| ArchitectureModel | |
| ) | |
| class Custom_Pendulum_Loss_Generic(nn.Module): | |
| """Physics-informed loss for pendulum with configurable ground truth.""" | |
| def __init__(self, labels, logits, omega, alpha_nominal, beta_nominal): | |
| super().__init__() | |
| self.y_true = labels | |
| self.y_pred = logits | |
| self.y_omega = omega | |
| self.alpha_nominal = alpha_nominal | |
| self.beta_nominal = beta_nominal | |
| def forward(self): | |
| dev = self.y_pred.device | |
| T, B, _ = self.y_pred.shape | |
| maxChange = 95.0 | |
| getp = lambda k: self.y_pred[:,:,k] | |
| gamma_nominal = 100.0 | |
| alpha = (1 + (0.5 - getp(0)) * maxChange / 100.0) * self.alpha_nominal | |
| beta = (1 + (0.5 - getp(1)) * maxChange / 100.0) * self.beta_nominal | |
| gamma = (1 + (0.5 - getp(2)) * maxChange / 100.0) * gamma_nominal | |
| L = alpha | |
| tau = beta | |
| g = torch.tensor(9.81, device=dev) | |
| thetaVal = self.y_true[:,:,0] | |
| omegaVal = self.y_omega[:,:,0] | |
| theta = thetaVal.clone().unsqueeze(2) | |
| omega = omegaVal.clone().unsqueeze(2) | |
| limitLoop = min(500, T) | |
| tau_dt = 0.03 | |
| for i in range(1, limitLoop): | |
| y1 = theta[:,:,i-1] + omega[:,:,i-1]*tau_dt | |
| y0 = omega[:,:,i-1] + (-torch.mul(tau,omega[:,:,i-1]) - torch.mul(torch.div(g,L.clamp(min=0.0001)),torch.sin(theta[:,:,i-1])))*tau_dt | |
| theta = torch.cat([theta, y1.unsqueeze(2)],dim=2) | |
| omega = torch.cat([omega, y0.unsqueeze(2)],dim=2) | |
| loss_Cal_theta = gamma * 0.01 | |
| loss_Cal_omega = gamma * 0.005 | |
| mse_loss = torch.abs(torch.sum(torch.square(self.y_true[:,:,0:limitLoop]-theta)/limitLoop, dim=2)-loss_Cal_theta) + \ | |
| torch.abs(torch.sum(torch.square(self.y_omega[:,:,0:limitLoop]-omega)/limitLoop, dim=2)-loss_Cal_omega) | |
| param_penalty = 0.0 | |
| param_penalty += 10.0 * torch.mean(torch.relu(-alpha)) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-beta)) | |
| param_penalty += 2.0 * torch.mean(torch.relu(alpha - 2.0)) | |
| param_penalty += 2.0 * torch.mean(torch.relu(beta - 1.0)) | |
| param_penalty += 1.0 * torch.mean(torch.relu(gamma - 500.0)) | |
| total_loss = mse_loss + 0.001 * param_penalty | |
| self.L = L | |
| self.tau = tau | |
| return mse_loss | |
| def run_ablation_for_dataset(dataset_name, data_dirs, alpha_nominal, beta_nominal): | |
| """ | |
| Run architecture ablation for a specific pendulum dataset. | |
| Args: | |
| dataset_name: Name of dataset (e.g., "90", "150") | |
| data_dirs: List of data directories for each video | |
| alpha_nominal: Ground truth length (m) | |
| beta_nominal: Ground truth damping (1/s) | |
| """ | |
| import random | |
| print(f"\n{'='*70}") | |
| print(f"Running ablation for Pendulum {dataset_name}°") | |
| print(f"Ground Truth: L={alpha_nominal}m, τ={beta_nominal} 1/s") | |
| print(f"{'='*70}") | |
| architectures = ["LTC", "LSTM", "GRU", "TRANSFORMER"] | |
| results = [] | |
| for video_idx, data_dir in enumerate(data_dirs, start=1): | |
| print(f"\n{'='*70}") | |
| print(f"Processing Video {video_idx}/5: {data_dir}") | |
| print(f"{'='*70}") | |
| for arch_name in architectures: | |
| print(f"\n Testing {arch_name}...") | |
| # Set seeds | |
| random.seed(42) | |
| np.random.seed(42) | |
| torch.manual_seed(42) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(42) | |
| # Load data | |
| dataset = PendulumData(seq_len=16, data_dir=data_dir) | |
| # Create model | |
| model = ArchitectureModel( | |
| model_type=arch_name.lower(), | |
| model_size=64, | |
| learning_rate=0.0003 | |
| ).to(device) | |
| optimizer = model.optimizer | |
| scheduler = model.scheduler | |
| # Training | |
| num_epochs = 40 | |
| batch_size = 2 | |
| best_loss = float('inf') | |
| start_time = time.time() | |
| for epoch in range(num_epochs): | |
| model.train() | |
| epoch_loss = 0.0 | |
| batch_count = 0 | |
| for batch_x, batch_y, batch_omega, _ in dataset.iterate_train(batch_size=batch_size): | |
| batch_x = batch_x.to(device) | |
| batch_y = batch_y.to(device) | |
| batch_omega = batch_omega.to(device) | |
| optimizer.zero_grad() | |
| predicted_params = model(batch_x) | |
| # Use generic loss with configurable ground truth | |
| loss_fn = Custom_Pendulum_Loss_Generic( | |
| batch_y, predicted_params, batch_omega, | |
| alpha_nominal, beta_nominal | |
| ) | |
| loss = loss_fn.forward() | |
| # Loss might be a tensor, take mean | |
| if loss.dim() > 0: | |
| loss = loss.mean() | |
| if torch.isnan(loss): | |
| continue | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| batch_count += 1 | |
| if batch_count > 0: | |
| avg_loss = epoch_loss / batch_count | |
| scheduler.step() | |
| if avg_loss < best_loss: | |
| best_loss = avg_loss | |
| training_time = time.time() - start_time | |
| # Evaluate | |
| model.eval() | |
| with torch.no_grad(): | |
| sample_batch = next(iter(dataset.iterate_train(batch_size=1))) | |
| sample_x = sample_batch[0].to(device) | |
| predicted_params = model(sample_x) | |
| maxChange = 95.0 | |
| getp = lambda k: predicted_params[:,:,k].mean() | |
| L_estimated = ((1 + (0.5 - getp(0)) * maxChange / 100.0) * alpha_nominal).item() | |
| tau_estimated = ((1 + (0.5 - getp(1)) * maxChange / 100.0) * beta_nominal).item() | |
| print(f" L={L_estimated:.6f}m, τ={tau_estimated:.6f} 1/s, Loss={best_loss:.2f}, Time={training_time:.2f}s") | |
| results.append({ | |
| 'dataset': f'{dataset_name}_v{video_idx}', | |
| 'architecture': arch_name, | |
| 'hidden_units': 64, | |
| 'video_number': video_idx, | |
| 'best_loss': best_loss, | |
| 'training_time_s': training_time, | |
| 'L_estimated': L_estimated, | |
| 'tau_estimated': tau_estimated | |
| }) | |
| return results | |
| def main(): | |
| """Run ablations for 90° and 150° pendulums and append to CSV.""" | |
| # Check if data directories exist | |
| base_dir = "Pendulum-EMMA" | |
| # Pendulum 90° configuration | |
| data_dirs_90 = [ | |
| os.path.join(base_dir, "90_v1", "data"), | |
| os.path.join(base_dir, "90_v2", "data"), | |
| os.path.join(base_dir, "90_v3", "data"), | |
| os.path.join(base_dir, "90_v4", "data"), | |
| os.path.join(base_dir, "90_v5", "data"), | |
| ] | |
| # Pendulum 150° configuration | |
| data_dirs_150 = [ | |
| os.path.join(base_dir, "150_v1", "data"), | |
| os.path.join(base_dir, "150_v2", "data"), | |
| os.path.join(base_dir, "150_v3", "data"), | |
| os.path.join(base_dir, "150_v4", "data"), | |
| os.path.join(base_dir, "150_v5", "data"), | |
| ] | |
| # Check if directories exist | |
| print("Checking data directories...") | |
| for dirs, name in [(data_dirs_90, "90°"), (data_dirs_150, "150°")]: | |
| for d in dirs: | |
| if os.path.exists(d): | |
| print(f" ✓ {d}") | |
| else: | |
| print(f" ✗ {d} NOT FOUND") | |
| all_results = [] | |
| # Run ablation for 90° | |
| if all(os.path.exists(d) for d in data_dirs_90): | |
| results_90 = run_ablation_for_dataset("90", data_dirs_90, 0.90, 0.05) | |
| all_results.extend(results_90) | |
| else: | |
| print("\n⚠️ Skipping 90° - data directories not found") | |
| # Run ablation for 150° | |
| if all(os.path.exists(d) for d in data_dirs_150): | |
| results_150 = run_ablation_for_dataset("150", data_dirs_150, 1.50, 0.05) | |
| all_results.extend(results_150) | |
| else: | |
| print("\n⚠️ Skipping 150° - data directories not found") | |
| # Append to existing CSV | |
| if all_results: | |
| csv_file = 'architecture_ablation_results.csv' | |
| print(f"\n{'='*70}") | |
| print(f"Appending {len(all_results)} results to {csv_file}") | |
| print(f"{'='*70}") | |
| with open(csv_file, 'a', newline='') as f: | |
| if all_results: | |
| writer = csv.DictWriter(f, fieldnames=all_results[0].keys()) | |
| # Don't write header since we're appending | |
| writer.writerows(all_results) | |
| print(f"✅ Results appended successfully!") | |
| else: | |
| print("\n❌ No results to append - check data directories") | |
| if __name__ == "__main__": | |
| main() | |