Spaces:
Configuration error
Configuration error
| # This is the code for LED pipeline based on EMMA method | |
| # EMMA LED Pipeline | |
| import os | |
| import csv | |
| import gc | |
| import cv2 | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import Dataset, DataLoader | |
| from ncps.torch import LTC | |
| from matplotlib.animation import FuncAnimation | |
| import pdb | |
| try: | |
| import psutil | |
| _HAS_PSUTIL = True | |
| except Exception: | |
| _HAS_PSUTIL = False | |
| try: | |
| from moviepy.editor import VideoFileClip | |
| except Exception: | |
| from moviepy import VideoFileClip | |
| from torchvision import transforms | |
| # Set device for computation (GPU if available, otherwise CPU) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Using device: {device}") | |
| # Global variable to store the number of features per timestep | |
| Nloop = 0 | |
| def check_memory_usage(): | |
| """ | |
| Monitor system memory usage during processing. | |
| Why: Prevent memory overflow during large video processing | |
| What: Display current memory usage statistics | |
| """ | |
| if not _HAS_PSUTIL: | |
| return | |
| mem = psutil.virtual_memory() | |
| used_gb = mem.used / (1024**3) | |
| total_gb = mem.total / (1024**3) | |
| print(f"[INFO] Memory usage: {used_gb:.1f}GB / {total_gb:.1f}GB ({mem.percent:.1f}%)") | |
| def process_led_video(video_path, output_csv): | |
| """ | |
| Process LED video to extract brightness/intensity trajectory. | |
| This function processes LED videos to: | |
| 1. Load video frames | |
| 2. Calculate average brightness/intensity per frame (entire frame) | |
| 3. Save intensity trajectory data | |
| Args: | |
| video_path: Path to input LED video file | |
| output_csv: Path for trajectory CSV output | |
| Why: Video processing is the foundation of LED intensity trajectory analysis | |
| What: Extracts brightness trajectory from LED video frames (no mask used) | |
| """ | |
| print(f"[STEP 1] Processing LED video: {video_path}") | |
| print(f"[STEP 1] Output CSV: {output_csv}") | |
| os.makedirs(os.path.dirname(output_csv), exist_ok=True) | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Cannot open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| csv_f = open(output_csv, "w", newline="") | |
| csvw = csv.writer(csv_f) | |
| csvw.writerow(["frame", "time_s", "intensity", "intensity_normalized"]) | |
| intensity_series = [] | |
| frame_idx = 0 | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| frame_time = frame_idx / fps | |
| # Convert to grayscale for intensity measurement | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Calculate mean intensity for entire frame (no mask) | |
| intensity = np.mean(gray) | |
| intensity_series.append(intensity) | |
| frame_idx += 1 | |
| if frame_idx % 30 == 0: | |
| print(f"[PROGRESS] Processed {frame_idx} frames") | |
| check_memory_usage() | |
| cap.release() | |
| if intensity_series: | |
| # Normalize intensity to [0, 1] range | |
| intensity_arr = np.array(intensity_series) | |
| intensity_max = intensity_arr.max() | |
| intensity_min = intensity_arr.min() | |
| # Avoid division by zero | |
| if intensity_max > intensity_min: | |
| intensity_normalized = (intensity_arr - intensity_min) / (intensity_max - intensity_min) | |
| else: | |
| intensity_normalized = np.ones_like(intensity_arr) | |
| # Write to CSV | |
| for idx, (raw_val, norm_val) in enumerate(zip(intensity_arr, intensity_normalized)): | |
| frame_time = idx / fps | |
| csvw.writerow([idx, f"{frame_time:.3f}", f"{raw_val:.2f}", f"{norm_val:.6f}"]) | |
| csv_f.close() | |
| # Report extracted intensity range | |
| I_0_actual = intensity_normalized[0] if len(intensity_normalized) > 0 else 0.0 | |
| I_n_actual = intensity_normalized[-1] if len(intensity_normalized) > 0 else 0.0 | |
| print(f"\n[STEP 1] Extracted Intensity Range:") | |
| print(f" Initial intensity: I_0 = {I_0_actual:.3f} (normalized)") | |
| print(f" Final intensity: I_n = {I_n_actual:.3f} (normalized)") | |
| print(f" Intensity change: {I_0_actual - I_n_actual:.3f}") | |
| # Check if intensity decreases (physical constraint for LED decay) | |
| if I_n_actual >= I_0_actual: | |
| print(f" ⚠️ Warning: Intensity does not decrease (may indicate issue)") | |
| else: | |
| print(f" ✅ Intensity decreases as expected for LED decay") | |
| # Match EMMA format (N x 100 matrices for memory optimization) | |
| I_matrix = np.tile(intensity_normalized.reshape(-1, 1), (1, 100)) | |
| # Determine data directory from output_csv path | |
| data_dir = os.path.dirname(output_csv) | |
| os.makedirs(data_dir, exist_ok=True) | |
| np.savetxt(os.path.join(data_dir, "IData.txt"), I_matrix, fmt='%.6f') | |
| del I_matrix | |
| gc.collect() | |
| print(f"\n[STEP 1] ✅ Saved LED intensity trajectory data: {len(intensity_series)} frames") | |
| print(f"[STEP 1] ✅ Saved intensity data: IData.txt") | |
| # Create trajectory plots | |
| print("[STEP 1] Creating LED intensity trajectory plots...") | |
| # Plot intensity vs time | |
| fig, ax = plt.subplots(1, 1, figsize=(12, 6)) | |
| time_array = np.arange(len(intensity_normalized)) / fps | |
| ax.plot(time_array, intensity_normalized, 'b-', linewidth=2, label='Normalized Intensity') | |
| ax.set_xlabel('Time (s)') | |
| ax.set_ylabel('Normalized Intensity') | |
| ax.set_title('LED Intensity vs Time') | |
| ax.grid(True, alpha=0.3) | |
| ax.legend() | |
| plot_path = os.path.join(data_dir, "led_intensity_trajectory.png") | |
| plt.savefig(plot_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| print(f"[STEP 1] ✅ Saved trajectory plot: {plot_path}") | |
| return len(intensity_series) | |
| else: | |
| print("[STEP 1] ⚠️ No intensity data extracted from video") | |
| return 0 | |
| def cut_in_sequences(x, y, seq_len, inc=1): | |
| """ | |
| Slice a long 1D/2D series into overlapping windows for sequence-based learning. | |
| This function creates sequences from the input data for the LTC model. | |
| For LED data: input shape (N, 100) -> output shape (seq_len, num_sequences, 100) | |
| Args: | |
| x: Input data array (e.g., intensity trajectory) | |
| y: Target data array (e.g., intensity trajectory) | |
| seq_len: Length of each sequence (e.g., 16 timesteps) | |
| inc: Increment step for creating overlapping sequences | |
| Returns: | |
| sequences_x: Input sequences with shape (seq_len, num_sequences, features) | |
| sequences_y: Target sequences with shape (seq_len, num_sequences, features) | |
| """ | |
| sequences_x, sequences_y = [], [] | |
| for s in range(0, x.shape[0] - seq_len, inc): | |
| start, end = s, s + seq_len | |
| sequences_x.append(x[start:end]) | |
| sequences_y.append(y[start:end]) | |
| return np.stack(sequences_x, axis=1), np.stack(sequences_y, axis=1) | |
| class Custom_LED_Loss(nn.Module): | |
| """ | |
| Custom loss function that integrates LED decay physics simulation. | |
| This is the core of the parameter estimation system. Instead of using a simple | |
| MSE loss, this function: | |
| 1. Takes predicted γ (decay constant) from the neural network | |
| 2. Runs a complete LED decay physics simulation using this parameter | |
| 3. Compares the simulated intensity trajectory with the actual intensity trajectory | |
| 4. Returns the physics-based loss for gradient descent | |
| The physics simulation includes: | |
| - LED decay: dI/dt = -γ * I | |
| - Intensity decreases exponentially over time | |
| - Parameter estimation for γ (decay constant) | |
| This approach ensures that the learned parameter is physically meaningful | |
| and can be used for actual LED decay prediction. | |
| """ | |
| def __init__(self, labels, logits): | |
| """ | |
| Initialize the physics-based loss function. | |
| Args: | |
| labels: Actual intensity trajectory data [T, B, 1] (intensity I) | |
| logits: Predicted γ constant from neural network [T, B, 1] | |
| """ | |
| super().__init__() | |
| # Store actual trajectory data for comparison | |
| self.y_true = labels # [T, B, 1] - actual intensity data | |
| # Store predicted parameters from neural network | |
| self.y_pred = logits # [T, B, 1] - γ constant | |
| def forward(self): | |
| """ | |
| Complete LED decay dynamics simulation with physics-based loss. | |
| This method performs the following steps: | |
| 1. Extract predicted γ constant from neural network output | |
| 2. Convert normalized parameter to physical value | |
| 3. Initialize intensity state from actual data | |
| 4. Run physics simulation for T timesteps | |
| 5. Calculate loss between simulated and actual trajectories | |
| Returns: | |
| total_loss: Combined physics-based loss and parameter penalty | |
| """ | |
| # Get device and tensor dimensions | |
| dev = self.y_pred.device | |
| T, B, _ = self.y_pred.shape # T=timesteps, B=batch_size, 1=parameter | |
| # ======================================== | |
| # STEP 1: Extract and Convert Parameter | |
| # ======================================== | |
| # The neural network outputs normalized values [0,1] for γ | |
| # We convert these to physical values with ±95% variation around nominal value | |
| maxChange = 95.0 # Maximum percentage change from nominal value | |
| getp = lambda k: self.y_pred[:,:,k] # Extract parameter k for all timesteps [T,B] | |
| # Convert normalized predictions to physical parameter | |
| # γ is scaled from [0,1] to [nominal*(1-0.95), nominal*(1+0.95)] | |
| # Nominal γ value (will be set based on LED duration) | |
| gamma_nominal = 0.46 # Nominal γ value (1/s) - GT value for led_10s | |
| gamma = (1 + (0.5 - getp(0)) * maxChange / 100.0) * gamma_nominal | |
| # ======================================== | |
| # STEP 2: Physical Constants | |
| # ======================================== | |
| # These are fixed physical constants that don't change during training | |
| eps = torch.tensor(1e-6, device=dev) # Small epsilon for numerical stability | |
| # ======================================== | |
| # STEP 3: Get Actual Intensity Data | |
| # ======================================== | |
| # Extract actual intensity data for comparison | |
| if self.y_true.dim() == 3: | |
| actual_I = self.y_true[:, :, 0] # [T,B] - actual intensity from [T,B,1] | |
| else: | |
| actual_I = self.y_true # [T,B] - actual intensity | |
| # ======================================== | |
| # STEP 4: Initialize Intensity State | |
| # ======================================== | |
| # Initialize intensity from actual trajectory (like pendulum approach) | |
| # Match pendulum pattern: theta = thetaVal.clone() where thetaVal = self.y_true[:,:,0] | |
| IVal = actual_I # [T,B] - actual intensity trajectory | |
| I = IVal.clone() # [T,B] - initialize from actual data (like pendulum) | |
| # ======================================== | |
| # STEP 5: Simulation Setup | |
| # ======================================== | |
| # Set up simulation parameters and storage arrays | |
| # Dynamic limitLoop based on actual data length to avoid tensor size mismatch | |
| limitLoop = min(500, T) # Use actual data length or 500, whichever is smaller | |
| tau_dt = 0.01 # Time step (s) - match baseline paper's dt | |
| # Reshape for tensor concatenation approach (like pendulum/sliding block) | |
| # Match pendulum: theta = theta.unsqueeze(2) to get [T,B,1] | |
| I = I.unsqueeze(2) # [T,B] -> [T,B,1] | |
| # ======================================== | |
| # STEP 6: Main Physics Simulation Loop | |
| # ======================================== | |
| # This is the core of the physics simulation | |
| # For each timestep, we: | |
| # 1. Get γ parameter for current timestep | |
| # 2. Calculate dI/dt = -γ * I | |
| # 3. Update intensity using Euler integration | |
| # 4. Store predicted state using tensor concatenation | |
| for i in range(1, limitLoop): | |
| # Current timestep index | |
| t_idx = i | |
| # ======================================== | |
| # STEP 6.1: Get Current Parameter | |
| # ======================================== | |
| # Get γ value for current timestep (match pendulum pattern) | |
| gamma_curr = gamma[t_idx] # [B] - γ constant for current timestep | |
| # ======================================== | |
| # STEP 6.2: LED Decay Dynamics | |
| # ======================================== | |
| # LED decay equation: dI/dt = -γ * I | |
| # Match pendulum pattern: use I[:,:,i-1] to get previous timestep | |
| # Get previous intensity (like pendulum: theta[:,:,i-1]) | |
| I_prev = I[:,:,i-1] # [T,B] - previous intensity from actual trajectory | |
| # Ensure I is non-negative (physical constraint) | |
| I_safe = torch.clamp(I_prev, min=eps) # Prevent negative intensity | |
| # Calculate rate of change: dI/dt = -γ * I | |
| # gamma_curr is [B], I_safe is [T,B], need to expand gamma_curr | |
| gamma_expanded = gamma_curr.unsqueeze(0).expand(T, -1) # [T,B] - expand γ to match I shape | |
| dI_dt = -gamma_expanded * I_safe # [T,B] - rate of intensity change | |
| # ======================================== | |
| # STEP 6.3: Update Intensity | |
| # ======================================== | |
| # Euler integration: I_new = I_old + dI/dt * dt | |
| # Match pendulum pattern: y1 = theta[:,:,i-1] + omega[:,:,i-1]*tau_dt | |
| I_new = I_prev + dI_dt * tau_dt # [T,B] - intensity update | |
| # Ensure intensity remains non-negative and bounded [0,1] (physical constraint) | |
| I_new = torch.clamp(I_new, min=0.0, max=1.0) | |
| # Concatenate to build trajectory (like pendulum: theta = torch.cat([theta, y1.unsqueeze(2)],dim=2)) | |
| I = torch.cat([I, I_new.unsqueeze(2)], dim=2) | |
| # ======================================== | |
| # STEP 7: Calculate Physics-Based Loss | |
| # ======================================== | |
| # Improved loss function for better GT convergence | |
| # Why: Current loss doesn't properly guide toward ground truth gamma values | |
| # What: Fixed trajectory comparison + GT guidance + weighted MSE | |
| # Extract actual intensity for comparison | |
| if self.y_true.dim() == 3: | |
| actual_I_compare = self.y_true[:,:,0] # [T,B] | |
| else: | |
| actual_I_compare = self.y_true # [T,B] | |
| # Fix trajectory comparison: I[:,:,i] contains predicted intensity at timestep i | |
| # I is [T,B,limitLoop] where T=sequence_length, B=batch_size | |
| # actual_I_compare is [T,B] - actual intensity for each sequence timestep | |
| # For each timestep i in simulation: compare I[:,:,i] with actual_I_compare[i,:] | |
| # Why: Need to compare predicted trajectory with actual at each timestep | |
| # What: Properly align dimensions for element-wise comparison | |
| # Extract actual values for each simulation timestep (vectorized) | |
| # I[:,:,i] is [T,B] - predicted intensity at timestep i for all sequences | |
| # actual_I_compare[i,:] is [B] - actual intensity at timestep i for all batches | |
| # Why: Vectorized approach is faster than loop | |
| # What: Extract and expand actual values for all timesteps at once | |
| actual_indices = torch.clamp(torch.arange(limitLoop, device=dev), 0, actual_I_compare.shape[0] - 1) | |
| actual_I_selected = actual_I_compare[actual_indices, :] # [limitLoop, B] | |
| actual_I_target = actual_I_selected.unsqueeze(0).expand(T, -1, -1) # [T, limitLoop, B] | |
| actual_I_target = actual_I_target.permute(0, 2, 1) # [T, B, limitLoop] | |
| # Calculate weighted MSE - focus more on early decay where gamma matters most | |
| # Why: Early decay region is most sensitive to gamma value | |
| # What: Apply exponential weighting with higher weight for early timesteps | |
| time_weights = torch.exp(-torch.arange(limitLoop, device=dev, dtype=torch.float32) / (limitLoop * 0.3)) | |
| time_weights = time_weights / time_weights.sum() * limitLoop # Normalize to maintain scale | |
| time_weights = time_weights.unsqueeze(0).unsqueeze(0) # [1,1,limitLoop] for broadcasting | |
| # Calculate weighted MSE loss | |
| squared_diff = torch.square(actual_I_target - I[:,:,:limitLoop]) # [T,B,limitLoop] | |
| weighted_squared_diff = squared_diff * time_weights # Apply time weighting | |
| raw_mse = torch.sum(weighted_squared_diff / limitLoop, dim=2) # [T,B] | |
| # Use direct MSE (removed calibration for better gradient flow) | |
| mse_loss = raw_mse.mean() | |
| # ======================================== | |
| # STEP 8: GT Guidance Loss | |
| # ======================================== | |
| # Explicitly guide network toward ground truth gamma value | |
| # Why: GT gamma = 0.46 for led_10s; need to guide network to this exact value | |
| # What: Add squared error penalty between learned and GT gamma (0.46) | |
| # Ground truth gamma value (known from experimental setup) | |
| gamma_gt = torch.tensor(0.46, device=dev) # GT gamma for led_10s | |
| # Use mean gamma across batch and timesteps for guidance | |
| gamma_mean = gamma.mean() | |
| # GT guidance loss with moderate weight | |
| # Why: Balance between physics-based learning and GT guidance | |
| # What: Moderate penalty to guide toward GT without overwhelming physics loss | |
| guidance_weight = 10.0 # Moderate guidance weight | |
| guidance_loss = guidance_weight * torch.square(gamma_mean - gamma_gt) | |
| # ======================================== | |
| # STEP 9: Parameter Constraint Penalty | |
| # ======================================== | |
| # Increased penalty weight for better parameter constraints | |
| # Why: Previous weight (0.001) was too small to enforce constraints | |
| # What: Stronger penalties for unrealistic parameter values | |
| param_penalty = 0.0 | |
| # γ must be positive (decay constant cannot be negative) | |
| param_penalty += 50.0 * torch.mean(torch.relu(-gamma)) # γ > 0 (increased from 10.0) | |
| # γ should be reasonable (typically 0.1 to 10.0 for LED decay) | |
| param_penalty += 20.0 * torch.mean(torch.relu(gamma - 10.0)) # γ < 10.0 (tighter bound) | |
| param_penalty += 20.0 * torch.mean(torch.relu(0.05 - gamma)) # γ > 0.05 (minimum bound) | |
| # Calculate RMSE for reporting | |
| rmse_loss = torch.sqrt(mse_loss) | |
| # Total loss: physics error + GT guidance + parameter constraints | |
| # Why: Combined loss ensures both trajectory matching and GT convergence | |
| # What: Weighted combination with stronger guidance toward GT | |
| total_loss = mse_loss + guidance_loss + 0.01 * param_penalty # Increased param weight from 0.001 | |
| # Store predicted trajectory and parameter for debugging | |
| self.predicted_I = I | |
| self.gamma = gamma | |
| self.gamma_mean = gamma_mean | |
| self.gamma_gt = gamma_gt | |
| self.rmse = rmse_loss | |
| return total_loss | |
| class LEDData: | |
| """ | |
| Data handler for LED intensity trajectory data. | |
| This class loads and processes the intensity data from the video processing step, | |
| creating sequences suitable for the LTC neural network. | |
| Matches PendulumData/SlidingBlockData structure for consistency. | |
| """ | |
| def __init__(self, seq_len=16, data_dir="data"): | |
| print(f"Loading LED intensity trajectory data...") | |
| # Load trajectory data from data directory | |
| # Load intensity data (I coordinates) - match pendulum/sliding block format | |
| I_data = np.loadtxt(os.path.join(data_dir, "IData.txt")) | |
| # Transpose to match pendulum/sliding block format: [N, 100] -> [100, N] | |
| I_traj = I_data.T # [100, N] | |
| # Get Nloop from data | |
| global Nloop | |
| Nloop = I_traj.shape[1] # Use actual data size (100) | |
| print(f"Nloop {Nloop}") | |
| # Create sequences for training (like pendulum/sliding block approach) | |
| train_x, train_y = cut_in_sequences(I_traj, I_traj, seq_len) | |
| # Create sequences for testing | |
| test_x, test_y = cut_in_sequences(I_traj, I_traj, seq_len, inc=8) | |
| # Convert to PyTorch tensors | |
| self.train_x = torch.tensor(train_x, dtype=torch.float32) | |
| self.train_y = torch.tensor(train_y, dtype=torch.float32) | |
| self.test_x = torch.tensor(test_x, dtype=torch.float32) | |
| self.test_y = torch.tensor(test_y, dtype=torch.float32) | |
| print(f"Training sequences: {self.train_x.shape[1]}") | |
| print(f"Test sequences: {self.test_x.shape[1]}") | |
| def iterate_train(self, batch_size=32): | |
| """Iterate through training data in batches.""" | |
| total_seqs = self.train_x.shape[1] | |
| permutation = torch.randperm(total_seqs) | |
| total_batches = total_seqs // batch_size | |
| for i in range(total_batches): | |
| start = i * batch_size | |
| end = start + batch_size | |
| batch_x = self.train_x[:, start:end] | |
| batch_y = self.train_y[:, start:end] | |
| yield (batch_x, batch_y) | |
| class LEDModel(nn.Module): | |
| """ | |
| Neural network model for LED γ constant estimation. | |
| This class implements the LTC (Liquid Time-Constant) neural network that learns | |
| to predict the γ constant from intensity trajectory data. The model takes | |
| sequences of intensity data as input and outputs the γ parameter. | |
| Architecture: | |
| - Input: [T, B, Nloop] where T=timesteps, B=batch_size, Nloop=features (100) | |
| - Output: [T, B, 1] where 1 is the γ constant parameter | |
| - Uses LTC for sequence-to-sequence learning | |
| """ | |
| def __init__(self, model_type="ltc", model_size=64, learning_rate=0.001): | |
| """ | |
| Initialize the neural network model. | |
| Args: | |
| model_type: Type of model ("ltc", "lstm", etc.) | |
| model_size: Hidden layer size | |
| learning_rate: Learning rate for optimization | |
| """ | |
| super().__init__() | |
| self.model_type = model_type | |
| self.model_size = model_size | |
| # Input size is the number of features per timestep (Nloop like pendulum/sliding block) | |
| input_size = Nloop if Nloop > 0 else 100 # Default to 100 if Nloop not set | |
| print("Beginning LED parameter estimation model...") | |
| if model_type == "lstm": | |
| self.rnn = nn.LSTM(input_size, model_size, batch_first=False) | |
| elif model_type.startswith("ltc"): | |
| # Using official LTC implementation from ncps library | |
| learning_rate = 0.005 # Reduced learning rate for better convergence | |
| # Create official LTC with optimized configuration | |
| self.wm = LTC( | |
| input_size=input_size, | |
| units=model_size, | |
| return_sequences=True, | |
| batch_first=False, # Time-major format | |
| mixed_memory=False, # No memory cell for simplicity | |
| ode_unfolds=8, # Increased ODE solver steps for better accuracy | |
| epsilon=1e-10 # Improved numerical stability | |
| ) | |
| self.rnn = self.wm | |
| elif model_type == "ctgru": | |
| self.rnn = nn.GRU(input_size, model_size, batch_first=False) | |
| else: | |
| self.rnn = nn.RNN(input_size, model_size, batch_first=False) | |
| # Output layer: 1 parameter (γ constant) | |
| self.dense = nn.Linear(model_size, 1) | |
| self.sigmoid = nn.Sigmoid() | |
| # Improved AdamW optimizer with better settings for parameter estimation | |
| self.optimizer = optim.AdamW(self.parameters(), lr=learning_rate, | |
| weight_decay=1e-4, betas=(0.9, 0.999), eps=1e-8) | |
| self.to(device) | |
| # Improved learning rate scheduler for better convergence | |
| self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts( | |
| self.optimizer, T_0=10, T_mult=2, eta_min=1e-6 | |
| ) | |
| def forward(self, x): | |
| """ | |
| Forward pass through the neural network. | |
| Args: | |
| x: Input intensity trajectory data [T, B, Nloop] | |
| Returns: | |
| y: Predicted γ constant [T, B, 1] | |
| """ | |
| if self.model_type.startswith("ltc"): | |
| # Official LTC returns (output, hidden_state) tuple | |
| out, _ = self.rnn(x) # [T,B,H] | |
| else: | |
| # Other RNNs return (output, hidden_state) tuple | |
| out, _ = self.rnn(x) # [T,B,H] | |
| T, B, H = out.shape | |
| y = self.sigmoid(self.dense(out.reshape(T*B, H))).reshape(T, B, 1) | |
| return y | |
| def compute_loss(self, y_pred, target_y): | |
| """Build the loss object and call .forward().""" | |
| loss_fn = Custom_LED_Loss(target_y, y_pred) | |
| return loss_fn.forward() | |
| def run_led_emma_optimization(output_folder=""): | |
| """ | |
| Main function to run EMMA LED parameter estimation. | |
| This function: | |
| 1. Loads intensity trajectory data | |
| 2. Creates and trains the LTC neural network | |
| 3. Estimates γ constant | |
| 4. Saves results and creates simulation visualization | |
| Args: | |
| output_folder: Folder to save results (default: current directory) | |
| """ | |
| # Set random seeds for reproducibility | |
| import random | |
| random.seed(42) | |
| np.random.seed(42) | |
| torch.manual_seed(42) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(42) | |
| print("[STEP 2] Starting EMMA LED optimization...") | |
| print("Starting EMMA LED Training...") | |
| # Training parameters | |
| seq_len = 16 | |
| batch_size = 2 | |
| num_epochs = 40 | |
| learning_rate = 0.0003 | |
| # Load intensity trajectory data | |
| data_dir = os.path.join(output_folder, "data") if output_folder else "data" | |
| dataset = LEDData(seq_len=seq_len, data_dir=data_dir) | |
| # Create neural network model | |
| model = LEDModel(model_type="ltc", model_size=64, learning_rate=learning_rate).to(device) | |
| optimizer = model.optimizer | |
| scheduler = model.scheduler | |
| print(f"Model parameters: {sum(p.numel() for p in model.parameters())}") | |
| print("Starting training...") | |
| train_losses = [] | |
| best_loss = float('inf') | |
| patience = 50 | |
| patience_counter = 0 | |
| for epoch in range(num_epochs): | |
| model.train() | |
| epoch_loss = 0.0 | |
| batch_count = 0 | |
| for batch_x, batch_y in dataset.iterate_train(batch_size=batch_size): | |
| batch_x = batch_x.to(device) | |
| batch_y = batch_y.to(device) | |
| optimizer.zero_grad() | |
| # Forward pass | |
| predicted_params = model(batch_x) | |
| # Compute physics-based loss | |
| loss_mat = model.compute_loss(predicted_params, batch_y) | |
| loss = loss_mat.mean() | |
| if torch.isnan(loss): | |
| print(f"Warning: NaN loss detected at epoch {epoch}, batch {batch_count}") | |
| continue | |
| # Backward pass with gradient clipping | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| batch_count += 1 | |
| if batch_count % 5 == 0: | |
| print(f'Epoch {epoch}, Batch {batch_count}, Loss: {loss.item():.6f}') | |
| if batch_count > 0: | |
| avg_loss = epoch_loss / batch_count | |
| train_losses.append(avg_loss) | |
| scheduler.step() | |
| print(f'Epoch {epoch}, Average Loss: {avg_loss:.6f}') | |
| # Save best model and check for early stopping | |
| if avg_loss < best_loss: | |
| best_loss = avg_loss | |
| patience_counter = 0 | |
| model_path = os.path.join(output_folder, 'led_emma_final_model.pth') if output_folder else 'led_emma_final_model.pth' | |
| torch.save({ | |
| 'model_state_dict': model.state_dict(), | |
| 'optimizer_state_dict': optimizer.state_dict(), | |
| 'train_losses': train_losses, | |
| 'epoch': epoch, | |
| 'loss': avg_loss | |
| }, model_path) | |
| print(f"New best model saved with loss: {best_loss:.6f}") | |
| else: | |
| patience_counter += 1 | |
| # Early stopping | |
| if patience_counter >= patience: | |
| print(f"Early stopping triggered after {epoch+1} epochs") | |
| break | |
| else: | |
| print(f"Warning: No batches processed in epoch {epoch}") | |
| print("Training completed!") | |
| # Load best model | |
| model_path = os.path.join(output_folder, 'led_emma_final_model.pth') if output_folder else 'led_emma_final_model.pth' | |
| checkpoint = torch.load(model_path, map_location=device) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| # Evaluate and save results | |
| model.eval() | |
| with torch.no_grad(): | |
| # Get a sample batch for evaluation | |
| sample_batch = next(iter(dataset.iterate_train(batch_size=1))) | |
| sample_x, sample_y = sample_batch | |
| sample_x = sample_x.to(device) | |
| sample_y = sample_y.to(device) | |
| # Get predicted parameter | |
| predicted_params = model(sample_x) | |
| # Convert to physical parameter (baseline paper notation) | |
| maxChange = 95.0 # Maximum percentage change from nominal value | |
| getp = lambda k: predicted_params[:,:,k].mean() | |
| gamma_nominal = 0.46 # Nominal γ value (1/s) - GT value for led_10s | |
| gamma = (1 + (0.5 - getp(0)) * maxChange / 100.0) * gamma_nominal | |
| # Save parameter to CSV (baseline paper notation) | |
| vals = [gamma.item()] | |
| csv_path = os.path.join(output_folder, 'led_coefficients.csv') if output_folder else 'led_coefficients.csv' | |
| with open(csv_path, 'w', newline='') as csvfile: | |
| w = csv.writer(csvfile) | |
| w.writerow(['Parameter', 'Value', 'Units', 'Description']) | |
| w.writerow(['gamma', float(gamma.item()), '1/s', 'LED decay constant (dI/dt = -gamma*I)']) | |
| print("\n=== ESTIMATED LED PARAMETER ===") | |
| print(f"γ (gamma): {float(gamma.item()):.6f} 1/s") | |
| print("Model saved as 'led_emma_final_model.pth'") | |
| print("Parameters saved as 'led_coefficients.csv'") | |
| def main(): | |
| """ | |
| Main function to run the complete LED analysis pipeline. | |
| This is the main automation function that orchestrates the entire LED analysis | |
| pipeline. It coordinates data loading and EMMA parameter estimation | |
| to provide a complete analysis of LED decay behavior from intensity data. | |
| Pipeline Execution Flow: | |
| ------------------------ | |
| 1. Initialize directories and configuration | |
| 2. Process video to extract intensity trajectory | |
| 3. Run EMMA parameter estimation (physics-informed neural network) | |
| 4. Generate comprehensive output summary | |
| """ | |
| import sys | |
| # ======================================== | |
| # COMPLETE PIPELINE EXECUTION | |
| # ======================================== | |
| print("=" * 60) | |
| print("LED ANALYSIS PIPELINE") | |
| print("=" * 60) | |
| # ======================================== | |
| # CONFIGURATION SECTION | |
| # ======================================== | |
| # Modify these paths according to your setup | |
| video_path = "../../output_selected/led/led_10s/05/video.mp4" # Set to video path | |
| # Save results in led_10s_v5 folder | |
| output_folder = "led_10s_v5" | |
| os.makedirs(output_folder, exist_ok=True) | |
| os.makedirs(f"{output_folder}/data", exist_ok=True) # Data files directory | |
| try: | |
| # ======================================== | |
| # STEP 1: VIDEO PROCESSING | |
| # ======================================== | |
| # Check if video processing is needed | |
| Idata_path = os.path.join(output_folder, "data", "IData.txt") | |
| if video_path and os.path.exists(video_path): | |
| print("\n" + "=" * 40) | |
| print("STEP 1: VIDEO PROCESSING") | |
| print("=" * 40) | |
| print("Extracting LED intensity from video frames...") | |
| output_csv = os.path.join(output_folder, "data", "led_trajectory.csv") | |
| num_frames = process_led_video( | |
| video_path=video_path, | |
| output_csv=output_csv | |
| ) | |
| if num_frames == 0: | |
| print("⚠️ Warning: No intensity data extracted from video") | |
| print(" Falling back to existing IData.txt if available") | |
| else: | |
| print(f"✅ Successfully extracted {num_frames} intensity measurements") | |
| elif os.path.exists(Idata_path): | |
| print("\n" + "=" * 40) | |
| print("STEP 1: SKIPPED (Using existing intensity data)") | |
| print("=" * 40) | |
| print(f"Found existing IData.txt at: {Idata_path}") | |
| print("Skipping video processing...") | |
| else: | |
| print("\n" + "=" * 40) | |
| print("STEP 1: SKIPPED (No video or data found)") | |
| print("=" * 40) | |
| print("⚠️ No video path provided and IData.txt not found") | |
| print(" Please either:") | |
| print(" 1. Set video_path in main() function, or") | |
| print(" 2. Place IData.txt in data/ directory") | |
| print(" Continuing with existing data if available...") | |
| # ======================================== | |
| # STEP 2: EMMA PARAMETER ESTIMATION | |
| # ======================================== | |
| print("\n" + "=" * 40) | |
| print("STEP 2: EMMA PARAMETER ESTIMATION") | |
| print("=" * 40) | |
| print("Loading intensity trajectory data...") | |
| print("Training LTC neural network...") | |
| print("Estimating γ constant...") | |
| run_led_emma_optimization(output_folder=output_folder) | |
| # ======================================== | |
| # PIPELINE COMPLETION SUMMARY | |
| # ======================================== | |
| print("\n" + "=" * 60) | |
| print(" PIPELINE COMPLETED SUCCESSFULLY!") | |
| print("=" * 60) | |
| print(" OUTPUT SUMMARY:") | |
| if video_path and os.path.exists(video_path): | |
| print(" Trajectory CSV: data/led_trajectory.csv") | |
| print(" Intensity data: data/IData.txt") | |
| print(" EMMA parameters: led_coefficients.csv") | |
| print(" EMMA model: led_emma_final_model.pth") | |
| print("\n All outputs organized in data/ and root directories") | |
| except Exception as e: | |
| print(f"\n PIPELINE FAILED: {e}") | |
| print(" Check that IData.txt exists in data/ directory") | |
| print(" Ensure all required dependencies are installed") | |
| raise | |
| # Main execution block | |
| if __name__ == "__main__": | |
| """ | |
| Main execution entry point for the LED analysis pipeline. | |
| """ | |
| main() | |