Spaces:
Configuration error
Configuration error
| # This is the code for Free Fall pipeline based on EMMA method | |
| # EMMA Free Fall Pipeline | |
| import os | |
| import csv | |
| import gc | |
| import cv2 | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import Dataset, DataLoader | |
| from ncps.torch import LTC | |
| from matplotlib.animation import FuncAnimation | |
| import pdb | |
| try: | |
| import psutil | |
| _HAS_PSUTIL = True | |
| except Exception: | |
| _HAS_PSUTIL = False | |
| try: | |
| from moviepy.editor import VideoFileClip | |
| except Exception: | |
| from moviepy import VideoFileClip | |
| from torchvision import transforms | |
| # Set device for computation (GPU if available, otherwise CPU) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Using device: {device}") | |
| # Global variable to store the number of features per timestep | |
| Nloop = 0 | |
| def check_memory_usage(): | |
| """ | |
| Monitor system memory usage during processing. | |
| Why: Prevent memory overflow during large video processing | |
| What: Display current memory usage statistics | |
| """ | |
| if not _HAS_PSUTIL: | |
| return | |
| mem = psutil.virtual_memory() | |
| used_gb = mem.used / (1024**3) | |
| total_gb = mem.total / (1024**3) | |
| print(f"[INFO] Memory usage: {used_gb:.1f}GB / {total_gb:.1f}GB ({mem.percent:.1f}%)") | |
| class FreeFallDetector: | |
| """ | |
| Free fall object tracker using YOLO for position measurement. | |
| This class implements object tracking for the free fall pipeline, similar to | |
| Pendulum and Sliding Block experiments. It uses YOLO to detect and track an object | |
| in free fall, then measures vertical position (y-coordinate) over time. | |
| Why: Accurate position measurement is critical for gravitational acceleration estimation | |
| What: Tracks object position to extract vertical trajectory | |
| """ | |
| def __init__(self, weights_path, conf=0.15, imgsz=640): | |
| """ | |
| Initialize the free fall object detector with YOLO model. | |
| Args: | |
| weights_path: Path to YOLO model weights file (.pt) | |
| conf: Detection confidence threshold (0.0-1.0) | |
| imgsz: Input image size for YOLO processing | |
| """ | |
| from ultralytics import YOLO | |
| self.model = YOLO(weights_path) | |
| self.conf = conf | |
| self.imgsz = imgsz | |
| self.last_detection = None | |
| self.reference_top = None # Store top reference position (y=0) | |
| print(f"[INFO] Loaded YOLO weights: {weights_path}") | |
| def detect_object(self, frame): | |
| """ | |
| Detect falling object in a single video frame using YOLO with intelligent filtering. | |
| Args: | |
| frame: Input video frame (numpy array) | |
| Returns: | |
| tuple: (x1, y1, x2, y2, confidence) or None if no detection | |
| Why: Need to track object position to measure free fall trajectory | |
| What: Returns object bounding box with confidence score | |
| """ | |
| h, w = frame.shape[:2] | |
| img_area = w * h | |
| edge_thresh = max(10, int(0.01 * min(w, h))) | |
| min_area_px = max(100, int(0.00001 * img_area)) # Object is typically small | |
| max_area_px = int(0.1 * img_area) # Object should be reasonably sized | |
| # Run YOLO inference | |
| results = self.model.predict( | |
| source=frame, imgsz=self.imgsz, conf=self.conf, | |
| iou=0.5, agnostic_nms=True, verbose=False | |
| ) | |
| # Filter and validate detections | |
| candidates = [] | |
| for r in results: | |
| if r.boxes is None: | |
| continue | |
| for b in r.boxes: | |
| x1, y1, x2, y2 = [float(v) for v in b.xyxy[0].tolist()] | |
| bw, bh = x2 - x1, y2 - y1 | |
| area = bw * bh | |
| # Size-based filtering (too small or too large) | |
| if area < min_area_px or area > max_area_px: | |
| continue | |
| # Edge proximity filtering (avoid edge artifacts) | |
| if (x1 < edge_thresh or y1 < edge_thresh or | |
| x2 > w - edge_thresh or y2 > h - edge_thresh): | |
| continue | |
| conf = float(b.conf[0].item()) if hasattr(b, "conf") else 0.0 | |
| candidates.append((x1, y1, x2, y2, conf)) | |
| if not candidates: | |
| return None | |
| if len(candidates) == 1: | |
| return candidates[0] | |
| # Multi-candidate selection with tracking consistency (like Pendulum) | |
| candidates.sort(key=lambda t: t[4], reverse=True) | |
| top_candidates = candidates[:3] | |
| # Use tracking consistency if previous detection exists | |
| if self.last_detection is not None: | |
| lx1, ly1, lx2, ly2, _ = self.last_detection | |
| lcx, lcy = (lx1 + lx2) / 2.0, (ly1 + ly2) / 2.0 | |
| def dist(c): | |
| cx, cy = (c[0] + c[2]) / 2.0, (c[1] + c[3]) / 2.0 | |
| return ((cx - lcx) ** 2 + (cy - lcy) ** 2) ** 0.5 | |
| return min(top_candidates, key=dist) | |
| return top_candidates[0] | |
| def detect(self, frame): | |
| """ | |
| Detect object and calculate vertical position measurement. | |
| Args: | |
| frame: Input video frame (numpy array) | |
| Returns: | |
| tuple: (y_position_pixels, confidence) or None if detection fails | |
| Why: Complete detection pipeline for position extraction using object tracking | |
| What: Returns vertical position in pixels from reference top | |
| """ | |
| # Detect falling object | |
| obj_box = self.detect_object(frame) | |
| if obj_box is None: | |
| if self.last_detection is not None: | |
| # Use last known object position (for Kalman filter prediction) | |
| obj_box = self.last_detection | |
| else: | |
| return None | |
| else: | |
| self.last_detection = obj_box | |
| # Initialize reference top on first detection (use object's initial position as y=0) | |
| h, w = frame.shape[:2] | |
| if self.reference_top is None: | |
| # Use top of first detection as reference point (y=0) | |
| _, y1, _, _, _ = obj_box | |
| self.reference_top = float(y1) | |
| # Get object center Y position (this represents vertical position) | |
| _, y1, _, y2, _ = obj_box | |
| obj_center_y = (y1 + y2) / 2.0 | |
| # Calculate vertical position (distance from reference top) | |
| # Position increases as we go down (larger y values) | |
| # Free fall: object moves downward, so y increases over time | |
| y_position_pixels = obj_center_y - self.reference_top | |
| # Ensure position is non-negative | |
| if y_position_pixels < 0: | |
| y_position_pixels = 0 | |
| confidence = obj_box[4] if obj_box is not None else 0.5 | |
| return (y_position_pixels, confidence) | |
| class Kalman1D: | |
| """ | |
| 1D Kalman Filter for position trajectory smoothing and prediction. | |
| This class implements a 1D Kalman filter to smooth position measurements | |
| and predict position when detection fails. The filter tracks position and | |
| velocity (rate of change). | |
| State Vector: [y, vy] (position + velocity) | |
| Measurement: [y] (position only from detection) | |
| Why: Raw detections are noisy and may have gaps | |
| What: Provides smooth, continuous position estimates | |
| """ | |
| def __init__(self, dt=0.01): | |
| """ | |
| Initialize 1D Kalman filter with system dynamics. | |
| Args: | |
| dt: Time step between measurements (seconds) | |
| """ | |
| self.dt = dt | |
| # State vector: [y, vy] (position, velocity) | |
| self.state = np.zeros(2).reshape(-1, 1) # Column vector for matrix operations | |
| # State transition matrix F (constant acceleration model for free fall) | |
| self.F = np.eye(2) | |
| self.F[0, 1] = dt # y += vy * dt | |
| # Measurement matrix H (we measure position only) | |
| self.H = np.array([[1.0, 0.0]]) # [1, 0] to extract position from state | |
| # Process noise covariance Q | |
| self.Q = np.eye(2) * 0.1 | |
| # Measurement noise covariance R | |
| self.R = np.array([[1.0]]) # 1x1 matrix for 1D measurement | |
| # Error covariance matrix P | |
| self.P = np.eye(2) * 100.0 | |
| def predict(self): | |
| """ | |
| Predict next state using motion model (no measurement). | |
| Returns: | |
| float: Predicted position | |
| Why: Estimate position when detection fails | |
| What: Advances state using constant velocity model | |
| """ | |
| self.state = self.F @ self.state | |
| self.P = self.F @ self.P @ self.F.T + self.Q | |
| # Extract position (first element of state vector) as float | |
| return float(self.state[0, 0] if self.state.ndim > 1 else self.state[0]) | |
| def update(self, measurement): | |
| """ | |
| Update state estimate with new measurement. | |
| Args: | |
| measurement: Position measurement (float) | |
| Returns: | |
| float: Updated position estimate | |
| Why: Incorporate new measurements to improve accuracy | |
| What: Combines prediction with measurement using Kalman equations | |
| """ | |
| measurement = np.array([[float(measurement)]]) # Convert to 2D array | |
| y = measurement - self.H @ self.state # Innovation | |
| S = self.H @ self.P @ self.H.T + self.R # Innovation covariance | |
| K = self.P @ self.H.T @ np.linalg.inv(S) # Kalman gain | |
| self.state = self.state + K @ y # Update state | |
| self.P = (np.eye(2) - K @ self.H) @ self.P # Update covariance | |
| # Extract position (first element of state vector) as float | |
| return float(self.state[0, 0] if self.state.ndim > 1 else self.state[0]) | |
| def process_free_fall_video(video_path, weights_path, output_video, output_csv, conf=0.15, pixel_to_meter=0.001): | |
| """ | |
| Process free fall video to extract position trajectory using object tracking. | |
| This function processes free fall videos to: | |
| 1. Load video and YOLO model | |
| 2. Detect and track falling object in each frame (similar to Pendulum) | |
| 3. Track vertical position using Kalman filtering | |
| 4. Convert pixel position to physical units (meters) | |
| 5. Create annotated video with position overlay | |
| 6. Save position trajectory data and generate plots | |
| Args: | |
| video_path: Path to input free fall video file | |
| weights_path: Path to YOLO model weights | |
| output_video: Path for annotated video output | |
| output_csv: Path for trajectory CSV output | |
| conf: YOLO detection confidence threshold | |
| pixel_to_meter: Conversion factor from pixels to meters (default: 0.001 m/pixel) | |
| Why: Video processing is the foundation of free fall trajectory analysis | |
| What: Extracts smooth position trajectory from object tracking in video frames | |
| """ | |
| print(f"[STEP 1] Processing free fall video: {video_path}") | |
| print(f"[STEP 1] Output video: {output_video}") | |
| print(f"[STEP 1] Output CSV: {output_csv}") | |
| os.makedirs(os.path.dirname(output_csv), exist_ok=True) | |
| detector = FreeFallDetector(weights_path, conf=conf) | |
| kf = Kalman1D() | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Cannot open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_video, fourcc, fps, (width, height)) | |
| csv_f = open(output_csv, "w", newline="") | |
| csvw = csv.writer(csv_f) | |
| csvw.writerow(["frame", "time_s", "y_position_pixels", "y_position_meters", "conf"]) | |
| y_series_pixels, y_series_meters = [], [] | |
| frame_idx = 0 | |
| # Calibration: Use a reasonable estimate for pixel_to_meter | |
| # NOTE: We should NOT use ground truth from parameters | |
| # Instead, use a generic reasonable value or estimate from video | |
| # For free fall, typical drop heights are 0.5-2 m, so we use a reasonable default | |
| # User can adjust pixel_to_meter manually if they have a reference object in the video | |
| calibration_complete = False | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| frame_time = frame_idx / fps | |
| det = detector.detect(frame) | |
| if det is not None: | |
| y_position_pixels, conf_val = det | |
| # NOTE: We do NOT calibrate using ground truth | |
| # The pixel_to_meter should be set manually based on a reference object | |
| # or use the default value provided in main() | |
| # This avoids "cheating" by using ground truth information | |
| if not calibration_complete and conf_val > 0.5: | |
| calibration_complete = True | |
| print(f"[STEP 1] Using pixel_to_meter = {pixel_to_meter:.6f} m/px (user-provided or default)") | |
| print(f"[STEP 1] NOTE: For accurate results, calibrate using a known reference object in the video") | |
| y_position_meters = float(y_position_pixels) * pixel_to_meter | |
| # Update Kalman filter | |
| kf.predict() | |
| y_smooth = kf.update(y_position_pixels) # Kalman filter returns float | |
| y_smooth_meters = y_smooth * pixel_to_meter | |
| y_series_pixels.append(y_smooth) | |
| y_series_meters.append(y_smooth_meters) | |
| # Draw detection and position on frame | |
| if detector.last_detection is not None: | |
| x1, y1, x2, y2 = [int(v) for v in detector.last_detection[:4]] | |
| # Draw object bounding box | |
| cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) | |
| # Draw object center point | |
| obj_center = (int((x1 + x2) / 2), int((y1 + y2) / 2)) | |
| cv2.circle(frame, obj_center, 5, (0, 255, 0), -1) | |
| # Draw line from reference top to object (position indicator) | |
| if detector.reference_top is not None: | |
| top_point = (obj_center[0], int(detector.reference_top)) | |
| cv2.line(frame, top_point, obj_center, (255, 0, 0), 2) | |
| # Draw position text | |
| position_text = f"y={y_smooth_meters:.4f}m ({y_smooth:.1f}px), conf={conf_val:.2f}" | |
| cv2.putText(frame, position_text, (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| csvw.writerow([frame_idx, f"{frame_time:.3f}", f"{y_smooth:.2f}", | |
| f"{y_smooth_meters:.6f}", f"{conf_val:.3f}"]) | |
| else: | |
| # No detection - use Kalman prediction | |
| y_pred = kf.predict() # Kalman filter returns float | |
| y_pred_meters = y_pred * pixel_to_meter | |
| y_series_pixels.append(y_pred) | |
| y_series_meters.append(y_pred_meters) | |
| # Draw predicted position | |
| position_text = f"y={y_pred_meters:.4f}m ({y_pred:.1f}px) [predicted]" | |
| cv2.putText(frame, position_text, (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) | |
| csvw.writerow([frame_idx, f"{frame_time:.3f}", f"{y_pred:.2f}", | |
| f"{y_pred_meters:.6f}", "0.000"]) | |
| out.write(frame) | |
| frame_idx += 1 | |
| if frame_idx % 30 == 0: | |
| print(f"[PROGRESS] Processed {frame_idx} frames") | |
| check_memory_usage() | |
| cap.release() | |
| out.release() | |
| csv_f.close() | |
| if y_series_pixels and y_series_meters: | |
| # Save position trajectory data in EMMA format | |
| y_arr = np.array(y_series_meters) # Use meters for physical units | |
| # Report extracted position range (for information only, not validation against ground truth) | |
| y_0_actual = y_arr[0] if len(y_arr) > 0 else 0.0 | |
| y_n_actual = y_arr[-1] if len(y_arr) > 0 else 0.0 | |
| print(f"\n[STEP 1] Extracted Position Range:") | |
| print(f" Initial position: y_0 = {y_0_actual:.3f} m") | |
| print(f" Final position: y_n = {y_n_actual:.3f} m") | |
| print(f" Position change: {y_n_actual - y_0_actual:.3f} m") | |
| # Check if position increases (physical constraint for free fall) | |
| if y_n_actual <= y_0_actual: | |
| print(f" ⚠️ Warning: Position does not increase (may indicate tracking issue)") | |
| else: | |
| print(f" ✅ Position increases as expected for free fall") | |
| # Match EMMA format (N x 100 matrices for memory optimization) | |
| y_matrix = np.tile(y_arr.reshape(-1, 1), (1, 100)) | |
| # Determine data directory from output_csv path | |
| data_dir = os.path.dirname(output_csv) | |
| os.makedirs(data_dir, exist_ok=True) | |
| np.savetxt(os.path.join(data_dir, "yData.txt"), y_matrix, fmt='%.6f') | |
| del y_matrix, y_arr | |
| gc.collect() | |
| print(f"\n[STEP 1] ✅ Saved free fall position trajectory data: {len(y_series_meters)} frames") | |
| print(f"[STEP 1] ✅ Saved position data: yData.txt") | |
| # Create trajectory plots | |
| print("[STEP 1] Creating free fall position trajectory plots...") | |
| # Plot position vs time | |
| fig, ax = plt.subplots(1, 1, figsize=(12, 6)) | |
| time_array = np.arange(len(y_series_meters)) / fps | |
| ax.plot(time_array, y_series_meters, 'b-', linewidth=2, label='Position (m)') | |
| ax.set_xlabel('Time (s)') | |
| ax.set_ylabel('Position (m)') | |
| ax.set_title('Free Fall Position vs Time') | |
| ax.grid(True, alpha=0.3) | |
| ax.legend() | |
| plot_path = os.path.join(data_dir, "free_fall_position_trajectory.png") | |
| plt.savefig(plot_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| print(f"[STEP 1] ✅ Saved trajectory plot: {plot_path}") | |
| return len(y_series_meters) | |
| else: | |
| print("[STEP 1] ⚠️ No position data extracted from video") | |
| return 0 | |
| def cut_in_sequences(x, y, seq_len, inc=1): | |
| """ | |
| Slice a long 1D/2D series into overlapping windows for sequence-based learning. | |
| This function creates sequences from the input data for the LTC model. | |
| For free fall data: input shape (N, 100) -> output shape (seq_len, num_sequences, 100) | |
| Args: | |
| x: Input data array (e.g., position trajectory) | |
| y: Target data array (e.g., position trajectory) | |
| seq_len: Length of each sequence (e.g., 16 timesteps) | |
| inc: Increment step for creating overlapping sequences | |
| Returns: | |
| sequences_x: Input sequences with shape (seq_len, num_sequences, features) | |
| sequences_y: Target sequences with shape (seq_len, num_sequences, features) | |
| """ | |
| sequences_x, sequences_y = [], [] | |
| for s in range(0, x.shape[0] - seq_len, inc): | |
| start, end = s, s + seq_len | |
| sequences_x.append(x[start:end]) | |
| sequences_y.append(y[start:end]) | |
| return np.stack(sequences_x, axis=1), np.stack(sequences_y, axis=1) | |
| class Custom_FreeFall_Loss(nn.Module): | |
| """ | |
| Custom loss function that integrates free fall physics simulation. | |
| This is the core of the parameter estimation system. Instead of using a simple | |
| MSE loss, this function: | |
| 1. Takes predicted g (gravitational acceleration) from the neural network | |
| 2. Runs a complete free fall physics simulation using this parameter | |
| 3. Compares the simulated position trajectory with the actual position trajectory | |
| 4. Returns the physics-based loss for gradient descent | |
| The physics simulation includes: | |
| - Free fall with drag: dr/dt = -g*t*r²/r0f | |
| - Where r is position, g is gravity, t is time, r0f is reference parameter | |
| - Position increases over time as object falls | |
| - Parameter estimation for g (gravitational acceleration) | |
| This approach ensures that the learned parameter is physically meaningful | |
| and can be used for actual free fall prediction. | |
| """ | |
| def __init__(self, labels, logits): | |
| """ | |
| Initialize the physics-based loss function. | |
| Args: | |
| labels: Actual position trajectory data [T, B, 1] (position y) | |
| logits: Predicted g constant from neural network [T, B, 1] | |
| """ | |
| super().__init__() | |
| # Store actual trajectory data for comparison | |
| self.y_true = labels # [T, B, 1] - actual position data | |
| # Store predicted parameters from neural network | |
| self.y_pred = logits # [T, B, 1] - g constant | |
| def forward(self): | |
| """ | |
| Complete free fall dynamics simulation with physics-based loss. | |
| This method performs the following steps: | |
| 1. Extract predicted g constant from neural network output | |
| 2. Convert normalized parameter to physical value | |
| 3. Initialize position and velocity states from actual data | |
| 4. Run physics simulation for T timesteps | |
| 5. Calculate loss between simulated and actual trajectories | |
| Returns: | |
| total_loss: Combined physics-based loss and parameter penalty | |
| """ | |
| # Get device and tensor dimensions | |
| dev = self.y_pred.device | |
| T, B, _ = self.y_pred.shape # T=timesteps, B=batch_size, 1=parameter | |
| # ======================================== | |
| # STEP 1: Extract and Convert Parameter | |
| # ======================================== | |
| # The neural network outputs normalized values [0,1] for g | |
| # We convert these to physical values with ±95% variation around nominal value | |
| maxChange = 95.0 # Maximum percentage change from nominal value | |
| getp = lambda k: self.y_pred[:,:,k] # Extract parameter k for all timesteps [T,B] | |
| # Convert normalized predictions to physical parameter | |
| # g is scaled from [0,1] to [nominal*(1-0.95), nominal*(1+0.95)] | |
| # Nominal g value: 9.81 m/s^2 (Earth's gravitational acceleration) | |
| g_nominal = 9.81 # Nominal g value (m/s^2) - Earth's gravity | |
| g = (1 + (0.5 - getp(0)) * maxChange / 100.0) * g_nominal | |
| # ======================================== | |
| # STEP 2: Physical Constants | |
| # ======================================== | |
| # These are fixed physical constants that don't change during training | |
| eps = torch.tensor(1e-6, device=dev) # Small epsilon for numerical stability | |
| # ======================================== | |
| # STEP 3: Get Actual Position Data | |
| # ======================================== | |
| # Extract actual position data for comparison | |
| if self.y_true.dim() == 3: | |
| actual_y = self.y_true[:, :, 0] # [T,B] - actual position from [T,B,1] | |
| else: | |
| actual_y = self.y_true # [T,B] - actual position | |
| # ======================================== | |
| # STEP 4: Initialize Position State | |
| # ======================================== | |
| # Initialize position (r) from actual trajectory (like pendulum approach) | |
| # Match pendulum pattern: theta = thetaVal.clone() where thetaVal = self.y_true[:,:,0] | |
| rVal = actual_y # [T,B] - actual position trajectory | |
| r = rVal.clone() # [T,B] - initialize from actual data (like pendulum) | |
| # Get initial position for r0f reference | |
| r0f = r[0, :] # [B] - initial position for each batch | |
| # ======================================== | |
| # STEP 5: Simulation Setup | |
| # ======================================== | |
| # Set up simulation parameters and storage arrays | |
| # Dynamic limitLoop based on actual data length to avoid tensor size mismatch | |
| limitLoop = min(500, T) # Use actual data length or 500, whichever is smaller | |
| tau_dt = 0.01 # Time step (s) - match baseline paper's dt | |
| # Reshape for tensor concatenation approach (like pendulum/sliding block) | |
| # Match pendulum: theta = theta.unsqueeze(2) to get [T,B,1] | |
| r = r.unsqueeze(2) # [T,B] -> [T,B,1] | |
| # ======================================== | |
| # STEP 6: Main Physics Simulation Loop | |
| # ======================================== | |
| # This is the core of the physics simulation | |
| # For each timestep, we: | |
| # 1. Get g parameter for current timestep | |
| # 2. Calculate dr/dt = -g*t*r²/r0f (free fall with drag) | |
| # 3. Update position using Euler integration | |
| # 4. Store predicted state using tensor concatenation | |
| for i in range(1, limitLoop): | |
| # Current timestep index | |
| t_idx = i | |
| # ======================================== | |
| # STEP 6.1: Get Current Parameter | |
| # ======================================== | |
| # Get g value for current timestep (match pendulum pattern) | |
| g_curr = g[t_idx] # [B] - g constant for current timestep | |
| # ======================================== | |
| # STEP 6.2: Free Fall with Drag Dynamics | |
| # ======================================== | |
| # Free fall equation: dr/dt = -g*t*r²/r0f | |
| # Match pendulum pattern: use r[:,:,i-1] to get previous timestep | |
| # Get previous position (like pendulum: theta[:,:,i-1]) | |
| r_prev = r[:,:,i-1] # [T,B] - previous position from actual trajectory | |
| # Calculate current time | |
| t_current = t_idx * tau_dt # Current time in seconds | |
| # Calculate dr/dt = -g*t*r²/r0f | |
| # g_curr is [B], r_prev is [T,B], r0f is [B] | |
| # Need to expand g_curr and r0f to match [T,B] shape | |
| g_expanded = g_curr.unsqueeze(0).expand(T, -1) # [T,B] - expand g to match shape | |
| r0f_expanded = r0f.unsqueeze(0).expand(T, -1) # [T,B] - expand r0f to match shape | |
| # Ensure r0f is not zero (avoid division by zero) | |
| r0f_safe = torch.clamp(r0f_expanded, min=eps) | |
| # Calculate rate of change: dr/dt = -g*t*r²/r0f | |
| dr_dt = -g_expanded * t_current * (r_prev ** 2) / r0f_safe # [T,B] - rate of position change | |
| # ======================================== | |
| # STEP 6.3: Update Position | |
| # ======================================== | |
| # Euler integration: r_new = r_old + dr/dt * dt | |
| # Match pendulum pattern: y1 = theta[:,:,i-1] + omega[:,:,i-1]*tau_dt | |
| r_new = r_prev + dr_dt * tau_dt # [T,B] - position update | |
| # Ensure position remains non-negative (physical constraint) | |
| r_new = torch.clamp(r_new, min=0.0) | |
| # Concatenate to build trajectory (like pendulum: theta = torch.cat([theta, y1.unsqueeze(2)],dim=2)) | |
| r = torch.cat([r, r_new.unsqueeze(2)], dim=2) | |
| # ======================================== | |
| # STEP 7: Calculate Physics-Based Loss | |
| # ======================================== | |
| # The loss function compares the simulated trajectory with the actual trajectory | |
| # This is what drives the parameter estimation - the neural network learns | |
| # g that makes the simulation match the real position behavior | |
| # Loss calibration constant (calibrated with maxChange=0 using nominal g) | |
| # Match pendulum approach: loss_Cal_theta = 344.08, loss_Cal_omega = 47.72 | |
| # Calibrated value from running with maxChange=0: 0.000001 (placeholder) | |
| loss_Cal_y = 0.000001 # Calibrated loss value (from calibration run with maxChange=0) | |
| # Calculate MSE loss (match pendulum pattern exactly) | |
| # Pendulum: torch.abs(torch.sum(torch.square(self.y_true[:,:,0:limitLoop]-theta)/limitLoop, dim=2)-loss_Cal_theta) | |
| # For free fall: y is [T,B,limitLoop] after simulation, actual_y is [T,B] | |
| # Need to reshape actual_y to [T,B,limitLoop] for comparison | |
| # Extract actual position for comparison | |
| if self.y_true.dim() == 3: | |
| actual_y_compare = self.y_true[:,:,0] # [T,B] | |
| else: | |
| actual_y_compare = self.y_true # [T,B] | |
| # Match pendulum loss calculation pattern | |
| # In pendulum: theta is [T,B,limitLoop] where theta[:,:,0] is initial (from actual), theta[:,:,1:] are predictions | |
| # The loss compares theta with self.y_true[:,:,0:limitLoop] | |
| # For free fall: r[:,:,0] is initial (from actual), r[:,:,1:] are predictions | |
| # We compare r with actual_y_compare properly reshaped | |
| # Reshape actual_y: r[:,:,i] should compare with actual_y_compare[i,:] for each i | |
| # r is [T,B,limitLoop], actual_y_compare is [T,B] | |
| # We need: actual_y_compare[i,:] compares with r[:,:,i] for each i in [0, limitLoop) | |
| # Create [T,B,limitLoop] where actual_y_broadcast[:,:,i] = actual_y_compare[i,:] for each i | |
| actual_y_broadcast = actual_y_compare[:limitLoop, :].unsqueeze(0).expand(T, -1, -1).permute(0, 2, 1) # [T,B,limitLoop] | |
| # Calculate MSE loss matching pendulum pattern exactly | |
| # Pendulum: torch.sum(torch.square(self.y_true[:,:,0:limitLoop]-theta)/limitLoop, dim=2) | |
| raw_mse = torch.sum(torch.square(actual_y_broadcast - r[:,:,:limitLoop]) / limitLoop, dim=2) | |
| # Calibration already completed - loss_Cal_y is set to calibrated value | |
| # Calculate MSE loss with calibration (match pendulum: torch.abs(raw_mse - loss_Cal)) | |
| # Pendulum: torch.abs(torch.sum(...)-loss_Cal_theta) | |
| mse_loss = torch.abs(raw_mse - loss_Cal_y) | |
| # ======================================== | |
| # STEP 8: Parameter Constraint Penalty | |
| # ======================================== | |
| # Add penalties to ensure learned parameter is physically reasonable | |
| # This prevents the network from learning unrealistic values | |
| param_penalty = 0.0 | |
| # g must be positive (gravitational acceleration cannot be negative) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-g)) # g > 0 | |
| # g should be reasonable (typically 8-12 m/s^2 for Earth) | |
| param_penalty += 2.0 * torch.mean(torch.relu(g - 15.0)) # g < 15 m/s^2 | |
| param_penalty += 2.0 * torch.mean(torch.relu(5.0 - g)) # g > 5 m/s^2 | |
| # Calculate RMSE for reporting | |
| rmse_loss = torch.sqrt(mse_loss) | |
| # Total loss combines physics simulation error with parameter constraints | |
| total_loss = mse_loss + 0.001 * param_penalty | |
| # Store predicted trajectory and parameter for debugging | |
| self.predicted_r = r | |
| self.g = g | |
| self.rmse = rmse_loss | |
| return total_loss | |
| class FreeFallData: | |
| """ | |
| Data handler for free fall position trajectory data. | |
| This class loads and processes the position data from the video processing step, | |
| creating sequences suitable for the LTC neural network. | |
| Matches PendulumData/SlidingBlockData structure for consistency. | |
| """ | |
| def __init__(self, seq_len=16, data_dir="data"): | |
| print(f"Loading free fall position trajectory data...") | |
| # Load trajectory data from data directory | |
| # Load position data (y coordinates) - match pendulum/sliding block format | |
| y_data = np.loadtxt(os.path.join(data_dir, "yData.txt")) | |
| # Transpose to match pendulum/sliding block format: [N, 100] -> [100, N] | |
| y_traj = y_data.T # [100, N] | |
| # Get Nloop from data | |
| global Nloop | |
| Nloop = y_traj.shape[1] # Use actual data size (100) | |
| print(f"Nloop {Nloop}") | |
| # Create sequences for training (like pendulum/sliding block approach) | |
| train_x, train_y = cut_in_sequences(y_traj, y_traj, seq_len) | |
| # Create sequences for testing | |
| test_x, test_y = cut_in_sequences(y_traj, y_traj, seq_len, inc=8) | |
| # Convert to PyTorch tensors | |
| self.train_x = torch.tensor(train_x, dtype=torch.float32) | |
| self.train_y = torch.tensor(train_y, dtype=torch.float32) | |
| self.test_x = torch.tensor(test_x, dtype=torch.float32) | |
| self.test_y = torch.tensor(test_y, dtype=torch.float32) | |
| print(f"Training sequences: {self.train_x.shape[1]}") | |
| print(f"Test sequences: {self.test_x.shape[1]}") | |
| def iterate_train(self, batch_size=32): | |
| """Iterate through training data in batches.""" | |
| total_seqs = self.train_x.shape[1] | |
| permutation = torch.randperm(total_seqs) | |
| total_batches = total_seqs // batch_size | |
| for i in range(total_batches): | |
| start = i * batch_size | |
| end = start + batch_size | |
| batch_x = self.train_x[:, start:end] | |
| batch_y = self.train_y[:, start:end] | |
| yield (batch_x, batch_y) | |
| class FreeFallModel(nn.Module): | |
| """ | |
| Neural network model for free fall g constant estimation. | |
| This class implements the LTC (Liquid Time-Constant) neural network that learns | |
| to predict the g constant from position trajectory data. The model takes | |
| sequences of position data as input and outputs the g parameter. | |
| Architecture: | |
| - Input: [T, B, Nloop] where T=timesteps, B=batch_size, Nloop=features (100) | |
| - Output: [T, B, 1] where 1 is the g constant parameter | |
| - Uses LTC for sequence-to-sequence learning | |
| """ | |
| def __init__(self, model_type="ltc", model_size=64, learning_rate=0.001): | |
| """ | |
| Initialize the neural network model. | |
| Args: | |
| model_type: Type of model ("ltc", "lstm", etc.) | |
| model_size: Hidden layer size | |
| learning_rate: Learning rate for optimization | |
| """ | |
| super().__init__() | |
| self.model_type = model_type | |
| self.model_size = model_size | |
| # Input size is the number of features per timestep (Nloop like pendulum/sliding block) | |
| input_size = Nloop if Nloop > 0 else 100 # Default to 100 if Nloop not set | |
| print("Beginning free fall parameter estimation model...") | |
| if model_type == "lstm": | |
| self.rnn = nn.LSTM(input_size, model_size, batch_first=False) | |
| elif model_type.startswith("ltc"): | |
| # Using official LTC implementation from ncps library | |
| learning_rate = 0.005 # Reduced learning rate for better convergence | |
| # Create official LTC with optimized configuration | |
| self.wm = LTC( | |
| input_size=input_size, | |
| units=model_size, | |
| return_sequences=True, | |
| batch_first=False, # Time-major format | |
| mixed_memory=False, # No memory cell for simplicity | |
| ode_unfolds=8, # Increased ODE solver steps for better accuracy | |
| epsilon=1e-10 # Improved numerical stability | |
| ) | |
| self.rnn = self.wm | |
| elif model_type == "ctgru": | |
| self.rnn = nn.GRU(input_size, model_size, batch_first=False) | |
| else: | |
| self.rnn = nn.RNN(input_size, model_size, batch_first=False) | |
| # Output layer: 1 parameter (g constant) | |
| self.dense = nn.Linear(model_size, 1) | |
| self.sigmoid = nn.Sigmoid() | |
| # Improved AdamW optimizer with better settings for parameter estimation | |
| self.optimizer = optim.AdamW(self.parameters(), lr=learning_rate, | |
| weight_decay=1e-4, betas=(0.9, 0.999), eps=1e-8) | |
| self.to(device) | |
| # Improved learning rate scheduler for better convergence | |
| self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts( | |
| self.optimizer, T_0=10, T_mult=2, eta_min=1e-6 | |
| ) | |
| def forward(self, x): | |
| """ | |
| Forward pass through the neural network. | |
| Args: | |
| x: Input position trajectory data [T, B, Nloop] | |
| Returns: | |
| y: Predicted g constant [T, B, 1] | |
| """ | |
| if self.model_type.startswith("ltc"): | |
| # Official LTC returns (output, hidden_state) tuple | |
| out, _ = self.rnn(x) # [T,B,H] | |
| else: | |
| # Other RNNs return (output, hidden_state) tuple | |
| out, _ = self.rnn(x) # [T,B,H] | |
| T, B, H = out.shape | |
| y = self.sigmoid(self.dense(out.reshape(T*B, H))).reshape(T, B, 1) | |
| return y | |
| def compute_loss(self, y_pred, target_y): | |
| """Build the loss object and call .forward().""" | |
| loss_fn = Custom_FreeFall_Loss(target_y, y_pred) | |
| return loss_fn.forward() | |
| def run_free_fall_emma_optimization(output_folder=""): | |
| """ | |
| Main function to run EMMA free fall parameter estimation. | |
| This function: | |
| 1. Loads position trajectory data | |
| 2. Creates and trains the LTC neural network | |
| 3. Estimates g constant | |
| 4. Saves results and creates simulation visualization | |
| Args: | |
| output_folder: Folder to save results (default: current directory) | |
| """ | |
| # Set random seeds for reproducibility | |
| import random | |
| random.seed(42) | |
| np.random.seed(42) | |
| torch.manual_seed(42) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(42) | |
| print("[STEP 2] Starting EMMA free fall optimization...") | |
| print("Starting EMMA Free Fall Training...") | |
| # Training parameters | |
| seq_len = 16 | |
| batch_size = 2 | |
| num_epochs = 40 | |
| learning_rate = 0.0003 | |
| # Load position trajectory data | |
| data_dir = os.path.join(output_folder, "data") if output_folder else "data" | |
| dataset = FreeFallData(seq_len=seq_len, data_dir=data_dir) | |
| # Create neural network model | |
| model = FreeFallModel(model_type="ltc", model_size=64, learning_rate=learning_rate).to(device) | |
| optimizer = model.optimizer | |
| scheduler = model.scheduler | |
| print(f"Model parameters: {sum(p.numel() for p in model.parameters())}") | |
| print("Starting training...") | |
| train_losses = [] | |
| best_loss = float('inf') | |
| patience = 50 | |
| patience_counter = 0 | |
| for epoch in range(num_epochs): | |
| model.train() | |
| epoch_loss = 0.0 | |
| batch_count = 0 | |
| for batch_x, batch_y in dataset.iterate_train(batch_size=batch_size): | |
| batch_x = batch_x.to(device) | |
| batch_y = batch_y.to(device) | |
| optimizer.zero_grad() | |
| # Forward pass | |
| predicted_params = model(batch_x) | |
| # Compute physics-based loss | |
| loss_mat = model.compute_loss(predicted_params, batch_y) | |
| loss = loss_mat.mean() | |
| if torch.isnan(loss): | |
| print(f"Warning: NaN loss detected at epoch {epoch}, batch {batch_count}") | |
| continue | |
| # Backward pass with gradient clipping | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| batch_count += 1 | |
| if batch_count % 5 == 0: | |
| print(f'Epoch {epoch}, Batch {batch_count}, Loss: {loss.item():.6f}') | |
| if batch_count > 0: | |
| avg_loss = epoch_loss / batch_count | |
| train_losses.append(avg_loss) | |
| scheduler.step() | |
| print(f'Epoch {epoch}, Average Loss: {avg_loss:.6f}') | |
| # Save best model and check for early stopping | |
| if avg_loss < best_loss: | |
| best_loss = avg_loss | |
| patience_counter = 0 | |
| model_path = os.path.join(output_folder, 'free_fall_emma_final_model.pth') if output_folder else 'free_fall_emma_final_model.pth' | |
| torch.save({ | |
| 'model_state_dict': model.state_dict(), | |
| 'optimizer_state_dict': optimizer.state_dict(), | |
| 'train_losses': train_losses, | |
| 'epoch': epoch, | |
| 'loss': avg_loss | |
| }, model_path) | |
| print(f"New best model saved with loss: {best_loss:.6f}") | |
| else: | |
| patience_counter += 1 | |
| # Early stopping | |
| if patience_counter >= patience: | |
| print(f"Early stopping triggered after {epoch+1} epochs") | |
| break | |
| else: | |
| print(f"Warning: No batches processed in epoch {epoch}") | |
| print("Training completed!") | |
| # Load best model | |
| model_path = os.path.join(output_folder, 'free_fall_emma_final_model.pth') if output_folder else 'free_fall_emma_final_model.pth' | |
| checkpoint = torch.load(model_path, map_location=device) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| # Evaluate and save results | |
| model.eval() | |
| with torch.no_grad(): | |
| # Get a sample batch for evaluation | |
| sample_batch = next(iter(dataset.iterate_train(batch_size=1))) | |
| sample_x, sample_y = sample_batch | |
| sample_x = sample_x.to(device) | |
| sample_y = sample_y.to(device) | |
| # Get predicted parameter | |
| predicted_params = model(sample_x) | |
| # Convert to physical parameter (baseline paper notation) | |
| maxChange = 95.0 | |
| getp = lambda k: predicted_params[:,:,k].mean() | |
| g_nominal = 9.81 # Nominal g value (m/s^2) - Earth's gravity | |
| g = (1 + (0.5 - getp(0)) * maxChange / 100.0) * g_nominal | |
| # Save parameter to CSV (baseline paper notation) | |
| vals = [g.item()] | |
| csv_path = os.path.join(output_folder, 'free_fall_coefficients.csv') if output_folder else 'free_fall_coefficients.csv' | |
| with open(csv_path, 'w', newline='') as csvfile: | |
| w = csv.writer(csvfile) | |
| w.writerow(['Parameter', 'Value', 'Units', 'Description']) | |
| w.writerow(['g', float(g.item()), 'm/s^2', 'Gravitational acceleration (dv/dt = g)']) | |
| print("\n=== ESTIMATED FREE FALL PARAMETER ===") | |
| print(f"g: {float(g.item()):.6f} m/s^2") | |
| print("Model saved as 'free_fall_emma_final_model.pth'") | |
| print("Parameters saved as 'free_fall_coefficients.csv'") | |
| def main(): | |
| """ | |
| Main function to run the complete free fall analysis pipeline. | |
| This is the main automation function that orchestrates the entire free fall analysis | |
| pipeline. It coordinates data loading and EMMA parameter estimation | |
| to provide a complete analysis of free fall behavior from position data. | |
| Pipeline Execution Flow: | |
| ------------------------ | |
| 1. Initialize directories and configuration | |
| 2. Run EMMA parameter estimation (physics-informed neural network) | |
| 3. Generate comprehensive output summary | |
| """ | |
| import sys | |
| # Check for command line arguments | |
| simulation_only = "--simulation-only" in sys.argv or "-s" in sys.argv | |
| if simulation_only: | |
| print("=" * 60) | |
| print("EMMA SIMULATION MODE") | |
| print("=" * 60) | |
| print(" Running simulation with existing learned parameters...") | |
| try: | |
| # Check if required files exist | |
| if not os.path.exists('free_fall_coefficients.csv'): | |
| raise FileNotFoundError("free_fall_coefficients.csv not found. Please run full pipeline first.") | |
| if not os.path.exists('free_fall_emma_final_model.pth'): | |
| raise FileNotFoundError("free_fall_emma_final_model.pth not found. Please run full pipeline first.") | |
| # Load existing parameters | |
| import pandas as pd | |
| params_df = pd.read_csv('free_fall_coefficients.csv') | |
| print("Loaded existing free fall parameters:") | |
| for _, row in params_df.iterrows(): | |
| print(f" {row['Parameter']}: {row['Value']:.6f} {row['Units']}") | |
| print("\n SIMULATION COMPLETED SUCCESSFULLY!") | |
| print(" OUTPUT SUMMARY:") | |
| print(" EMMA parameters: free_fall_coefficients.csv") | |
| print(" EMMA model: free_fall_emma_final_model.pth") | |
| except Exception as e: | |
| print(f"\n SIMULATION FAILED: {e}") | |
| print(" Ensure that EMMA parameters have been learned first") | |
| print(" Run 'python free_fall.py' to learn parameters before simulation") | |
| return | |
| # ======================================== | |
| # COMPLETE PIPELINE EXECUTION | |
| # ======================================== | |
| print("=" * 60) | |
| print("FREE FALL ANALYSIS PIPELINE") | |
| print("=" * 60) | |
| # ======================================== | |
| # CONFIGURATION SECTION | |
| # ======================================== | |
| # Modify these paths according to your setup | |
| video_path = "../../output_selected/free_fall/large/03/video.mp4" # Set to video path if processing from video | |
| weights_path = "yolo11m.pt" # YOLO model weights | |
| pixel_to_meter = 0.001 # Conversion factor: adjust based on your setup (m/pixel) | |
| # Save results in lar_v3 folder (like pendulum 45_v1, 90_v1, etc.) | |
| output_folder = "lar_v3" | |
| os.makedirs(output_folder, exist_ok=True) | |
| os.makedirs(f"{output_folder}/output", exist_ok=True) # Visual outputs directory | |
| os.makedirs(f"{output_folder}/data", exist_ok=True) # Data files directory | |
| try: | |
| # ======================================== | |
| # STEP 1: VIDEO PROCESSING (if video provided) | |
| # ======================================== | |
| # Check if video processing is needed | |
| ydata_path = os.path.join(output_folder, "data", "yData.txt") | |
| if video_path and os.path.exists(video_path): | |
| print("\n" + "=" * 40) | |
| print("STEP 1: VIDEO PROCESSING") | |
| print("=" * 40) | |
| print("Detecting and tracking falling object in video frames...") | |
| output_video = os.path.join(output_folder, "output", "free_fall_annotated.mp4") | |
| output_csv = os.path.join(output_folder, "data", "free_fall_trajectory.csv") | |
| num_frames = process_free_fall_video( | |
| video_path=video_path, | |
| weights_path=weights_path, | |
| output_video=output_video, | |
| output_csv=output_csv, | |
| conf=0.15, | |
| pixel_to_meter=pixel_to_meter | |
| ) | |
| if num_frames == 0: | |
| print("⚠️ Warning: No position data extracted from video") | |
| print(" Falling back to existing yData.txt if available") | |
| else: | |
| print(f"✅ Successfully extracted {num_frames} position measurements") | |
| elif os.path.exists(ydata_path): | |
| print("\n" + "=" * 40) | |
| print("STEP 1: SKIPPED (Using existing position data)") | |
| print("=" * 40) | |
| print(f"Found existing yData.txt at: {ydata_path}") | |
| print("Skipping video processing...") | |
| else: | |
| print("\n" + "=" * 40) | |
| print("STEP 1: SKIPPED (No video or data found)") | |
| print("=" * 40) | |
| print("⚠️ No video path provided and yData.txt not found") | |
| print(" Please either:") | |
| print(" 1. Set video_path in main() function, or") | |
| print(" 2. Place yData.txt in data/ directory") | |
| print(" Continuing with existing data if available...") | |
| # ======================================== | |
| # STEP 2: EMMA PARAMETER ESTIMATION | |
| # ======================================== | |
| print("\n" + "=" * 40) | |
| print("STEP 2: EMMA PARAMETER ESTIMATION") | |
| print("=" * 40) | |
| print("Loading position trajectory data...") | |
| print("Training LTC neural network...") | |
| print("Estimating g constant...") | |
| run_free_fall_emma_optimization(output_folder=output_folder) | |
| # ======================================== | |
| # PIPELINE COMPLETION SUMMARY | |
| # ======================================== | |
| print("\n" + "=" * 60) | |
| print(" PIPELINE COMPLETED SUCCESSFULLY!") | |
| print("=" * 60) | |
| print(" OUTPUT SUMMARY:") | |
| if video_path and os.path.exists(video_path): | |
| print(" Annotated video: output/free_fall_annotated.mp4") | |
| print(" Trajectory CSV: data/free_fall_trajectory.csv") | |
| print(" Position data: data/yData.txt") | |
| print(" EMMA parameters: free_fall_coefficients.csv") | |
| print(" EMMA model: free_fall_emma_final_model.pth") | |
| print("\n All outputs organized in output/, data/, and root directories") | |
| except Exception as e: | |
| print(f"\n PIPELINE FAILED: {e}") | |
| print(" Check that yData.txt exists in data/ directory") | |
| print(" Ensure all required dependencies are installed") | |
| raise | |
| # Main execution block | |
| if __name__ == "__main__": | |
| """ | |
| Main execution entry point for the free fall analysis pipeline. | |
| """ | |
| main() | |